You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by su...@apache.org on 2021/02/23 01:57:45 UTC

[hbase] 05/10: HBASE-24982 Disassemble the method replicateWALEntry from AdminService to a new interface ReplicationServerService (#2360)

This is an automated email from the ASF dual-hosted git repository.

sunxin pushed a commit to branch HBASE-24666
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 1f11ee4ad71b032555b2e46f56d744ccf621fa61
Author: XinSun <dd...@gmail.com>
AuthorDate: Wed Sep 9 15:00:37 2020 +0800

    HBASE-24982 Disassemble the method replicateWALEntry from AdminService to a new interface ReplicationServerService (#2360)
    
    Signed-off-by: Wellington Chevreuil <wc...@apache.org>
---
 .../hadoop/hbase/client/AsyncConnectionImpl.java   |  16 ++
 .../server/replication/ReplicationServer.proto     |  32 ++++
 .../hadoop/hbase/replication/ReplicationUtils.java |  19 ++
 .../hbase/client/AsyncClusterConnection.java       |   5 +
 .../hbase/client/AsyncClusterConnectionImpl.java   |   5 +
 .../hbase/client/AsyncReplicationServerAdmin.java  |  80 +++++++++
 .../hbase/protobuf/ReplicationProtobufUtil.java    |  18 ++
 .../hadoop/hbase/regionserver/RSRpcServices.java   |   8 +-
 .../replication/HBaseReplicationEndpoint.java      |  57 +++++-
 .../replication/ReplicationServerRpcServices.java  | 200 +--------------------
 .../HBaseInterClusterReplicationEndpoint.java      |   7 +-
 .../hbase/client/DummyAsyncClusterConnection.java  |   5 +
 .../replication/TestHBaseReplicationEndpoint.java  |  17 +-
 .../hbase/replication/TestReplicationServer.java   |  43 ++++-
 14 files changed, 288 insertions(+), 224 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
index 8a1ac5a..5a332d8 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
@@ -65,6 +65,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminServic
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationServerProtos.ReplicationServerService;
 
 /**
  * The implementation of AsyncConnection.
@@ -101,6 +102,8 @@ class AsyncConnectionImpl implements AsyncConnection {
 
   private final ConcurrentMap<String, ClientService.Interface> rsStubs = new ConcurrentHashMap<>();
   private final ConcurrentMap<String, AdminService.Interface> adminSubs = new ConcurrentHashMap<>();
+  private final ConcurrentMap<String, ReplicationServerService.Interface> replStubs =
+      new ConcurrentHashMap<>();
 
   private final AtomicReference<MasterService.Interface> masterStub = new AtomicReference<>();
 
@@ -278,12 +281,25 @@ class AsyncConnectionImpl implements AsyncConnection {
     return AdminService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout));
   }
 
+  private ReplicationServerService.Interface createReplicationServerStub(ServerName serverName)
+      throws IOException {
+    return ReplicationServerService.newStub(
+        rpcClient.createRpcChannel(serverName, user, rpcTimeout));
+  }
+
   AdminService.Interface getAdminStub(ServerName serverName) throws IOException {
     return ConcurrentMapUtils.computeIfAbsentEx(adminSubs,
       getStubKey(AdminService.getDescriptor().getName(), serverName),
       () -> createAdminServerStub(serverName));
   }
 
+  ReplicationServerService.Interface getReplicationServerStub(ServerName serverName)
+      throws IOException {
+    return ConcurrentMapUtils.computeIfAbsentEx(replStubs,
+        getStubKey(ReplicationServerService.Interface.class.getSimpleName(), serverName,
+            hostnameCanChange), () -> createReplicationServerStub(serverName));
+  }
+
   CompletableFuture<MasterService.Interface> getMasterStub() {
     return ConnectionUtils.getOrFetch(masterStub, masterStubMakeFuture, false, () -> {
       CompletableFuture<MasterService.Interface> future = new CompletableFuture<>();
diff --git a/hbase-protocol-shaded/src/main/protobuf/server/replication/ReplicationServer.proto b/hbase-protocol-shaded/src/main/protobuf/server/replication/ReplicationServer.proto
new file mode 100644
index 0000000..ed334c4
--- /dev/null
+++ b/hbase-protocol-shaded/src/main/protobuf/server/replication/ReplicationServer.proto
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+syntax = "proto2";
+package hbase.pb;
+
+option java_package = "org.apache.hadoop.hbase.shaded.protobuf.generated";
+option java_outer_classname = "ReplicationServerProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+option optimize_for = SPEED;
+
+import "server/region/Admin.proto";
+
+service ReplicationServerService {
+  rpc ReplicateWALEntry(ReplicateWALEntryRequest)
+    returns(ReplicateWALEntryResponse);
+}
\ No newline at end of file
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
index a786206..7bafbc2 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
@@ -30,6 +30,9 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.AsyncAdmin;
+import org.apache.hadoop.hbase.client.AsyncConnection;
+import org.apache.hadoop.hbase.util.FutureUtils;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -212,4 +215,20 @@ public final class ReplicationUtils {
     }
     return initialValue * HConstants.RETRY_BACKOFF[ntries];
   }
+
+  /**
+   * Check whether peer cluster supports replication offload.
+   * @param peerConn connection for peer cluster
+   * @return true if peer cluster version >= 3
+   * @throws IOException exception
+   */
+  public static boolean isPeerClusterSupportReplicationOffload(AsyncConnection peerConn)
+      throws IOException {
+    AsyncAdmin admin = peerConn.getAdmin();
+    String version = FutureUtils.get(admin.getClusterMetrics()).getHBaseVersion();
+    if (Integer.parseInt(version.split("\\.")[0]) >= 3) {
+      return true;
+    }
+    return false;
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
index 92118ac..b6a3b97 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
@@ -42,6 +42,11 @@ public interface AsyncClusterConnection extends AsyncConnection {
   AsyncRegionServerAdmin getRegionServerAdmin(ServerName serverName);
 
   /**
+   * Get the admin service for the give replication server.
+   */
+  AsyncReplicationServerAdmin getReplicationServerAdmin(ServerName serverName);
+
+  /**
    * Get the nonce generator for this connection.
    */
   NonceGenerator getNonceGenerator();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java
index 39fc3a2..e4c2ee3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java
@@ -71,6 +71,11 @@ class AsyncClusterConnectionImpl extends AsyncConnectionImpl implements AsyncClu
   }
 
   @Override
+  public AsyncReplicationServerAdmin getReplicationServerAdmin(ServerName serverName) {
+    return new AsyncReplicationServerAdmin(serverName, this);
+  }
+
+  @Override
   public CompletableFuture<FlushRegionResponse> flush(byte[] regionName,
       boolean writeFlushWALMarker) {
     RawAsyncHBaseAdmin admin = (RawAsyncHBaseAdmin) getAdmin();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncReplicationServerAdmin.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncReplicationServerAdmin.java
new file mode 100644
index 0000000..7511a64
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncReplicationServerAdmin.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationServerProtos.ReplicationServerService;
+
+/**
+ * A simple wrapper of the {@link ReplicationServerService} for a replication server.
+ * <p/>
+ * Notice that there is no retry, and this is intentional.
+ */
+@InterfaceAudience.Private
+public class AsyncReplicationServerAdmin {
+
+  private final ServerName server;
+
+  private final AsyncConnectionImpl conn;
+
+  AsyncReplicationServerAdmin(ServerName server, AsyncConnectionImpl conn) {
+    this.server = server;
+    this.conn = conn;
+  }
+
+  @FunctionalInterface
+  private interface RpcCall<RESP> {
+    void call(ReplicationServerService.Interface stub, HBaseRpcController controller,
+        RpcCallback<RESP> done);
+  }
+
+  private <RESP> CompletableFuture<RESP> call(RpcCall<RESP> rpcCall, CellScanner cellScanner) {
+    CompletableFuture<RESP> future = new CompletableFuture<>();
+    HBaseRpcController controller = conn.rpcControllerFactory.newController(cellScanner);
+    try {
+      rpcCall.call(conn.getReplicationServerStub(server), controller, resp -> {
+        if (controller.failed()) {
+          future.completeExceptionally(controller.getFailed());
+        } else {
+          future.complete(resp);
+        }
+      });
+    } catch (IOException e) {
+      future.completeExceptionally(e);
+    }
+    return future;
+  }
+
+  public CompletableFuture<AdminProtos.ReplicateWALEntryResponse> replicateWALEntry(
+      AdminProtos.ReplicateWALEntryRequest request, CellScanner cellScanner, int timeout) {
+    return call((stub, controller, done) -> {
+      controller.setCallTimeout(timeout);
+      stub.replicateWALEntry(controller, request, done);
+    }, cellScanner);
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtobufUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtobufUtil.java
index e47c929..17f48a6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtobufUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtobufUtil.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.PrivateCellUtil;
 import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
+import org.apache.hadoop.hbase.client.AsyncReplicationServerAdmin;
 import org.apache.hadoop.hbase.io.SizedCellScanner;
 import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
 import org.apache.hadoop.hbase.util.FutureUtils;
@@ -62,6 +63,23 @@ public class ReplicationProtobufUtil {
   }
 
   /**
+   * A helper to replicate a list of WAL entries using replication server admin
+   * @param admin the replication server admin
+   * @param entries Array of WAL entries to be replicated
+   * @param replicationClusterId Id which will uniquely identify source cluster FS client
+   *          configurations in the replication configuration directory
+   * @param sourceBaseNamespaceDir Path to source cluster base namespace directory
+   * @param sourceHFileArchiveDir Path to the source cluster hfile archive directory
+   */
+  public static void replicateWALEntry(AsyncReplicationServerAdmin admin, Entry[] entries,
+      String replicationClusterId, Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir,
+      int timeout) throws IOException {
+    Pair<ReplicateWALEntryRequest, CellScanner> p = buildReplicateWALEntryRequest(entries, null,
+        replicationClusterId, sourceBaseNamespaceDir, sourceHFileArchiveDir);
+    FutureUtils.get(admin.replicateWALEntry(p.getFirst(), p.getSecond(), timeout));
+  }
+
+  /**
    * Create a new ReplicateWALEntryRequest from a list of WAL entries
    * @param entries the WAL entries to be replicated
    * @return a pair of ReplicateWALEntryRequest and a CellScanner over all the WALEdit values found.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 54395c3..a4a735b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -257,6 +257,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuo
 import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse.TableQuotaSnapshot;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationServerProtos.ReplicationServerService;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
@@ -1566,8 +1567,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     }
     if (admin) {
       bssi.add(new BlockingServiceAndInterface(
-      AdminService.newReflectiveBlockingService(this),
-      AdminService.BlockingInterface.class));
+          AdminService.newReflectiveBlockingService(this),
+          AdminService.BlockingInterface.class));
+      bssi.add(new BlockingServiceAndInterface(
+          ReplicationServerService.newReflectiveBlockingService(this),
+          ReplicationServerService.BlockingInterface.class));
     }
     return new org.apache.hbase.thirdparty.com.google.common.collect.
         ImmutableList.Builder<BlockingServiceAndInterface>().addAll(bssi).build();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java
index 8678685..f38fd08 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java
@@ -26,11 +26,15 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.client.AsyncClusterConnection;
 import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
+import org.apache.hadoop.hbase.client.AsyncReplicationServerAdmin;
 import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
+import org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil;
 import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.zookeeper.ZKListener;
 import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.ServerName;
@@ -278,7 +282,7 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
     }
     ServerName serverName =
       sinkServers.get(ThreadLocalRandom.current().nextInt(sinkServers.size()));
-    return new SinkPeer(serverName, conn.getRegionServerAdmin(serverName));
+    return createSinkPeer(serverName);
   }
 
   /**
@@ -340,21 +344,60 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
   /**
    * Wraps a replication region server sink to provide the ability to identify it.
    */
-  public static class SinkPeer {
+  public static abstract class SinkPeer {
     private ServerName serverName;
-    private AsyncRegionServerAdmin regionServer;
 
-    public SinkPeer(ServerName serverName, AsyncRegionServerAdmin regionServer) {
+    public SinkPeer(ServerName serverName) {
       this.serverName = serverName;
-      this.regionServer = regionServer;
     }
 
     ServerName getServerName() {
       return serverName;
     }
 
-    public AsyncRegionServerAdmin getRegionServer() {
-      return regionServer;
+    public abstract void replicateWALEntry(WAL.Entry[] entries, String replicationClusterId,
+      Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir, int timeout) throws IOException;
+  }
+
+  public static class RegionServerSinkPeer extends SinkPeer {
+
+    private AsyncRegionServerAdmin regionServer;
+
+    public RegionServerSinkPeer(ServerName serverName,
+      AsyncRegionServerAdmin replicationServer) {
+      super(serverName);
+      this.regionServer = replicationServer;
+    }
+
+    public void replicateWALEntry(WAL.Entry[] entries, String replicationClusterId,
+      Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir, int timeout) throws IOException {
+      ReplicationProtobufUtil.replicateWALEntry(regionServer, entries, replicationClusterId,
+        sourceBaseNamespaceDir, sourceHFileArchiveDir, timeout);
+    }
+  }
+
+  public static class ReplicationServerSinkPeer extends SinkPeer {
+
+    private AsyncReplicationServerAdmin replicationServer;
+
+    public ReplicationServerSinkPeer(ServerName serverName,
+      AsyncReplicationServerAdmin replicationServer) {
+      super(serverName);
+      this.replicationServer = replicationServer;
+    }
+
+    public void replicateWALEntry(WAL.Entry[] entries, String replicationClusterId,
+      Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir, int timeout) throws IOException {
+      ReplicationProtobufUtil.replicateWALEntry(replicationServer, entries, replicationClusterId,
+        sourceBaseNamespaceDir, sourceHFileArchiveDir, timeout);
+    }
+  }
+
+  private SinkPeer createSinkPeer(ServerName serverName) throws IOException {
+    if (ReplicationUtils.isPeerClusterSupportReplicationOffload(conn)) {
+      return new ReplicationServerSinkPeer(serverName, conn.getReplicationServerAdmin(serverName));
+    } else {
+      return new RegionServerSinkPeer(serverName, conn.getRegionServerAdmin(serverName));
     }
   }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationServerRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationServerRpcServices.java
index 1b9b699..15d4f8c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationServerRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationServerRpcServices.java
@@ -27,14 +27,12 @@ import java.util.concurrent.atomic.LongAdder;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.CellScanner;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.client.ConnectionUtils;
 import org.apache.hadoop.hbase.io.ByteBuffAllocator;
 import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
 import org.apache.hadoop.hbase.ipc.HBaseRpcController;
 import org.apache.hadoop.hbase.ipc.PriorityFunction;
-import org.apache.hadoop.hbase.ipc.QosPriority;
 import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
 import org.apache.hadoop.hbase.ipc.RpcServerFactory;
 import org.apache.hadoop.hbase.ipc.RpcServerInterface;
@@ -58,53 +56,11 @@ import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponseRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponses;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SlowLogResponseRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SlowLogResponses;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationServerProtos.ReplicationServerService;
 
 import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
 import org.apache.hbase.thirdparty.com.google.protobuf.Message;
@@ -117,7 +73,7 @@ import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
 @InterfaceAudience.Private
 @SuppressWarnings("deprecation")
 public class ReplicationServerRpcServices implements HBaseRPCErrorHandler,
-    AdminService.BlockingInterface, PriorityFunction {
+    ReplicationServerService.BlockingInterface, PriorityFunction {
 
   protected static final Logger LOG = LoggerFactory.getLogger(ReplicationServerRpcServices.class);
 
@@ -256,8 +212,8 @@ public class ReplicationServerRpcServices implements HBaseRPCErrorHandler,
   protected List<BlockingServiceAndInterface> getServices() {
     List<BlockingServiceAndInterface> bssi = new ArrayList<>();
     bssi.add(new BlockingServiceAndInterface(
-      AdminService.newReflectiveBlockingService(this),
-      AdminService.BlockingInterface.class));
+      ReplicationServerService.newReflectiveBlockingService(this),
+        ReplicationServerService.BlockingInterface.class));
     return new ImmutableList.Builder<BlockingServiceAndInterface>().addAll(bssi).build();
   }
 
@@ -325,154 +281,6 @@ public class ReplicationServerRpcServices implements HBaseRPCErrorHandler,
     }
   }
 
-  @Override
-  public GetRegionInfoResponse getRegionInfo(RpcController controller, GetRegionInfoRequest request)
-      throws ServiceException {
-    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
-  }
-
-  @Override
-  public GetStoreFileResponse getStoreFile(RpcController controller, GetStoreFileRequest request)
-      throws ServiceException {
-    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
-  }
-
-  @Override
-  public GetOnlineRegionResponse getOnlineRegion(RpcController controller,
-      GetOnlineRegionRequest request) throws ServiceException {
-    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
-  }
-
-  @Override
-  public OpenRegionResponse openRegion(RpcController controller, OpenRegionRequest request)
-      throws ServiceException {
-    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
-  }
-
-  @Override
-  public WarmupRegionResponse warmupRegion(RpcController controller, WarmupRegionRequest request)
-      throws ServiceException {
-    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
-  }
-
-  @Override
-  public CloseRegionResponse closeRegion(RpcController controller, CloseRegionRequest request)
-      throws ServiceException {
-    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
-  }
-
-  @Override
-  public FlushRegionResponse flushRegion(RpcController controller, FlushRegionRequest request)
-      throws ServiceException {
-    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
-  }
-
-  @Override
-  public CompactionSwitchResponse compactionSwitch(RpcController controller,
-      CompactionSwitchRequest request) throws ServiceException {
-    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
-  }
-
-  @Override
-  public CompactRegionResponse compactRegion(RpcController controller,
-      CompactRegionRequest request) throws ServiceException {
-    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
-  }
-
-  @Override
-  public ReplicateWALEntryResponse replay(RpcController controller,
-      ReplicateWALEntryRequest request) throws ServiceException {
-    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
-  }
-
-  @Override
-  public RollWALWriterResponse rollWALWriter(RpcController controller, RollWALWriterRequest request)
-      throws ServiceException {
-    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
-  }
-
-  @Override
-  public GetServerInfoResponse getServerInfo(RpcController controller, GetServerInfoRequest request)
-      throws ServiceException {
-    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
-  }
-
-  /**
-   * Stop the replication server.
-   *
-   * @param controller the RPC controller
-   * @param request the request
-   */
-  @Override
-  @QosPriority(priority=HConstants.ADMIN_QOS)
-  public StopServerResponse stopServer(final RpcController controller,
-      final StopServerRequest request) {
-    requestCount.increment();
-    String reason = request.getReason();
-    replicationServer.stop(reason);
-    return StopServerResponse.newBuilder().build();
-  }
-
-  @Override
-  public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller,
-      UpdateFavoredNodesRequest request) throws ServiceException {
-    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
-  }
-
-  @Override
-  public UpdateConfigurationResponse updateConfiguration(RpcController controller,
-      UpdateConfigurationRequest request) throws ServiceException {
-    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
-  }
-
-  @Override
-  public GetRegionLoadResponse getRegionLoad(RpcController controller,
-      GetRegionLoadRequest request) throws ServiceException {
-    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
-  }
-
-  @Override
-  public ClearCompactionQueuesResponse clearCompactionQueues(RpcController controller,
-      ClearCompactionQueuesRequest request) throws ServiceException {
-    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
-  }
-
-  @Override
-  public ClearRegionBlockCacheResponse clearRegionBlockCache(RpcController controller,
-      ClearRegionBlockCacheRequest request) throws ServiceException {
-    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
-  }
-
-  @Override
-  public GetSpaceQuotaSnapshotsResponse getSpaceQuotaSnapshots(RpcController controller,
-      GetSpaceQuotaSnapshotsRequest request) throws ServiceException {
-    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
-  }
-
-  @Override
-  public ExecuteProceduresResponse executeProcedures(RpcController controller,
-      ExecuteProceduresRequest request) throws ServiceException {
-    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
-  }
-
-  @Override
-  public SlowLogResponses getSlowLogResponses(RpcController controller,
-      SlowLogResponseRequest request) throws ServiceException {
-    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
-  }
-
-  @Override
-  public SlowLogResponses getLargeLogResponses(RpcController controller,
-      SlowLogResponseRequest request) throws ServiceException {
-    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
-  }
-
-  @Override
-  public ClearSlowLogResponses clearSlowLogsResponses(RpcController controller,
-      ClearSlowLogResponseRequest request) throws ServiceException {
-    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
-  }
-
   protected AccessChecker getAccessChecker() {
     return accessChecker;
   }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
index c77f74f..d8517b0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
@@ -51,7 +51,6 @@ import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.ipc.RpcServer;
-import org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil;
 import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
 import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
 import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
@@ -561,11 +560,9 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
           logPeerId(), entriesHashCode, entries.size(), size, replicationClusterId);
       }
       sinkPeer = getReplicationSink();
-      AsyncRegionServerAdmin rsAdmin = sinkPeer.getRegionServer();
       try {
-        ReplicationProtobufUtil.replicateWALEntry(rsAdmin,
-          entries.toArray(new Entry[entries.size()]), replicationClusterId, baseNamespaceDir,
-          hfileArchiveDir, timeout);
+        sinkPeer.replicateWALEntry(entries.toArray(new Entry[entries.size()]), replicationClusterId,
+            baseNamespaceDir, hfileArchiveDir, timeout);
         if (LOG.isTraceEnabled()) {
           LOG.trace("{} Completed replicating batch {}", logPeerId(), entriesHashCode);
         }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java
index 8755749..5af4086 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java
@@ -109,6 +109,11 @@ public class DummyAsyncClusterConnection implements AsyncClusterConnection {
   }
 
   @Override
+  public AsyncReplicationServerAdmin getReplicationServerAdmin(ServerName serverName) {
+    return null;
+  }
+
+  @Override
   public NonceGenerator getNonceGenerator() {
     return null;
   }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java
index 4160141..4182eaf 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java
@@ -28,7 +28,8 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.client.AsyncClusterConnection;
-import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
+import org.apache.hadoop.hbase.client.AsyncReplicationServerAdmin;
+import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint.ReplicationServerSinkPeer;
 import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint.SinkPeer;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
@@ -99,7 +100,7 @@ public class TestHBaseReplicationEndpoint {
     // Sanity check
     assertEquals(1, endpoint.getNumSinks());
 
-    SinkPeer sinkPeer = new SinkPeer(serverNameA, mock(AsyncRegionServerAdmin.class));
+    SinkPeer sinkPeer = mockSinkPeer(serverNameA);
     endpoint.reportBadSink(sinkPeer);
     // Just reporting a bad sink once shouldn't have an effect
     assertEquals(1, endpoint.getNumSinks());
@@ -123,7 +124,7 @@ public class TestHBaseReplicationEndpoint {
     assertEquals(expected, endpoint.getNumSinks());
 
     ServerName badSinkServer0 = endpoint.getSinkServers().get(0);
-    SinkPeer sinkPeer = new SinkPeer(badSinkServer0, mock(AsyncRegionServerAdmin.class));
+    SinkPeer sinkPeer = mockSinkPeer(badSinkServer0);
     for (int i = 0; i <= HBaseReplicationEndpoint.DEFAULT_BAD_SINK_THRESHOLD; i++) {
       endpoint.reportBadSink(sinkPeer);
     }
@@ -133,7 +134,7 @@ public class TestHBaseReplicationEndpoint {
 
     // now try a sink that has some successes
     ServerName badSinkServer1 = endpoint.getSinkServers().get(0);
-    sinkPeer = new SinkPeer(badSinkServer1, mock(AsyncRegionServerAdmin.class));
+    sinkPeer = mockSinkPeer(badSinkServer1);
     for (int i = 0; i < HBaseReplicationEndpoint.DEFAULT_BAD_SINK_THRESHOLD; i++) {
       endpoint.reportBadSink(sinkPeer);
     }
@@ -168,8 +169,8 @@ public class TestHBaseReplicationEndpoint {
     ServerName serverNameA = endpoint.getSinkServers().get(0);
     ServerName serverNameB = endpoint.getSinkServers().get(1);
 
-    SinkPeer sinkPeerA = new SinkPeer(serverNameA, mock(AsyncRegionServerAdmin.class));
-    SinkPeer sinkPeerB = new SinkPeer(serverNameB, mock(AsyncRegionServerAdmin.class));
+    SinkPeer sinkPeerA = mockSinkPeer(serverNameA);
+    SinkPeer sinkPeerB = mockSinkPeer(serverNameB);
 
     for (int i = 0; i <= HBaseReplicationEndpoint.DEFAULT_BAD_SINK_THRESHOLD; i++) {
       endpoint.reportBadSink(sinkPeerA);
@@ -207,4 +208,8 @@ public class TestHBaseReplicationEndpoint {
       return null;
     }
   }
+
+  private SinkPeer mockSinkPeer(ServerName serverName) {
+    return new ReplicationServerSinkPeer(serverName, mock(AsyncReplicationServerAdmin.class));
+  }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServer.java
index 6a0ef3d..0ef23f2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServer.java
@@ -30,14 +30,15 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.AsyncClusterConnection;
-import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
+import org.apache.hadoop.hbase.client.AsyncReplicationServerAdmin;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.master.HMaster;
-import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
+import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint.ReplicationServerSinkPeer;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -116,22 +117,48 @@ public class TestReplicationServer {
     TEST_UTIL.deleteTableIfAny(TABLENAME);
   }
 
+  /**
+   * Requests replication server using {@link AsyncReplicationServerAdmin}
+   */
   @Test
   public void testReplicateWAL() throws Exception {
-    AsyncClusterConnection conn = TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().get(0)
-        .getRegionServer().getAsyncClusterConnection();
-    AsyncRegionServerAdmin rsAdmin = conn.getRegionServerAdmin(replicationServer.getServerName());
+    AsyncClusterConnection conn =
+        TEST_UTIL.getHBaseCluster().getMaster().getAsyncClusterConnection();
+    AsyncReplicationServerAdmin replAdmin =
+        conn.getReplicationServerAdmin(replicationServer.getServerName());
+
+    ReplicationServerSinkPeer sinkPeer =
+        new ReplicationServerSinkPeer(replicationServer.getServerName(), replAdmin);
+    replicateWALEntryAndVerify(sinkPeer);
+  }
+
+  /**
+   * Requests region server using {@link AsyncReplicationServerAdmin}
+   */
+  @Test
+  public void testReplicateWAL2() throws Exception {
+    AsyncClusterConnection conn =
+        TEST_UTIL.getHBaseCluster().getMaster().getAsyncClusterConnection();
+    ServerName rs = TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().get(0)
+        .getRegionServer().getServerName();
+    AsyncReplicationServerAdmin replAdmin = conn.getReplicationServerAdmin(rs);
+
+    ReplicationServerSinkPeer
+      sinkPeer = new ReplicationServerSinkPeer(rs, replAdmin);
+    replicateWALEntryAndVerify(sinkPeer);
+  }
 
+  private void replicateWALEntryAndVerify(ReplicationServerSinkPeer sinkPeer) throws Exception {
     Entry[] entries = new Entry[BATCH_SIZE];
     for(int i = 0; i < BATCH_SIZE; i++) {
       entries[i] = generateEdit(i, TABLENAME, Bytes.toBytes(i));
     }
 
-    ReplicationProtbufUtil.replicateWALEntry(rsAdmin, entries, replicationClusterId,
-        baseNamespaceDir, hfileArchiveDir, 1000);
+    sinkPeer.replicateWALEntry(entries, replicationClusterId, baseNamespaceDir, hfileArchiveDir,
+        1000);
 
+    Table table = TEST_UTIL.getConnection().getTable(TABLENAME);
     for (int i = 0; i < BATCH_SIZE; i++) {
-      Table table = TEST_UTIL.getConnection().getTable(TABLENAME);
       Result result = table.get(new Get(Bytes.toBytes(i)));
       Cell cell = result.getColumnLatestCell(Bytes.toBytes(FAMILY), Bytes.toBytes(FAMILY));
       assertNotNull(cell);