You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zg...@apache.org on 2020/09/29 00:43:15 UTC

[hbase] branch HBASE-24666 updated (662f5e1 -> 827ac96)

This is an automated email from the ASF dual-hosted git repository.

zghao pushed a change to branch HBASE-24666
in repository https://gitbox.apache.org/repos/asf/hbase.git.


 discard 662f5e1  HBASE-24684 Fetch ReplicationSink servers list from HMaster instead o… (#2077)
 discard 0dc4d5d  HBASE-24998 Introduce a ReplicationSourceOverallController interface and decouple ReplicationSourceManager and ReplicationSource (#2364)
 discard 24fc4d4  HBASE-24982 Disassemble the method replicateWALEntry from AdminService to a new interface ReplicationServerService (#2360)
 discard e9c131c  HBASE-24683 Add a basic ReplicationServer which only implement ReplicationSink Service (#2111)
 discard 43b7be1  HBASE-24735: Refactor ReplicationSourceManager: move logPositionAndCleanOldLogs/cleanUpHFileRefs to ReplicationSource inside (#2064)
 discard 38acdaa  HBASE-24681 Remove the cache walsById/walsByIdRecoveredQueues from ReplicationSourceManager (#2019)
 discard 1d4584f  HBASE-24682 Refactor ReplicationSource#addHFileRefs method: move it to ReplicationSourceManager (#2020)
     add c312760  HBASE-25045 : Add 2.3.2 to the downloads page
     add b57bef5  HBASE-25077: hbck.jsp page loading fails, logs NPE in master log. (#2433)
     add 86557ed  HBASE-25096 WAL size in RegionServer UI is wrong (#2456)
     add 80ffac2  HBASE-25070 : With new generic API getLogEntries, cleaning up unused RPC APIs
     add fbef545  HBASE-25017 Migrate flaky reporting jenkins job from Hadoop to hbase (#2466)
     add 68b56be  HBASE-25100 conf and conn are assigned twice in HBaseReplicationEndpoint and HBaseInterClusterReplicationEndpoint (#2463)
     new c315e68  HBASE-24682 Refactor ReplicationSource#addHFileRefs method: move it to ReplicationSourceManager (#2020)
     new 5646555  HBASE-24681 Remove the cache walsById/walsByIdRecoveredQueues from ReplicationSourceManager (#2019)
     new 5a62429  HBASE-24735: Refactor ReplicationSourceManager: move logPositionAndCleanOldLogs/cleanUpHFileRefs to ReplicationSource inside (#2064)
     new 6625e92  HBASE-24683 Add a basic ReplicationServer which only implement ReplicationSink Service (#2111)
     new 56cd97f  HBASE-24982 Disassemble the method replicateWALEntry from AdminService to a new interface ReplicationServerService (#2360)
     new 17621c8  HBASE-24998 Introduce a ReplicationSourceOverallController interface and decouple ReplicationSourceManager and ReplicationSource (#2364)
     new 827ac96  HBASE-24684 Fetch ReplicationSink servers list from HMaster instead o… (#2077)

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (662f5e1)
            \
             N -- N -- N   refs/heads/HBASE-24666 (827ac96)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 7 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flaky-tests/flaky-reporting.Jenkinsfile        |  2 +-
 .../java/org/apache/hadoop/hbase/client/Admin.java |  4 +-
 .../org/apache/hadoop/hbase/client/AsyncAdmin.java |  4 +-
 .../src/main/protobuf/server/region/Admin.proto    |  6 ---
 .../org/apache/hadoop/hbase/master/HbckChore.java  |  6 +--
 .../MetricsRegionServerWrapperImpl.java            |  2 +-
 .../hadoop/hbase/regionserver/RSRpcServices.java   | 27 +---------
 .../replication/HBaseReplicationEndpoint.java      | 63 +++++++++++++++-------
 .../HBaseInterClusterReplicationEndpoint.java      | 34 +-----------
 .../hadoop/hbase/master/MockRegionServer.java      | 14 -----
 src/site/xdoc/downloads.xml                        | 16 +++---
 11 files changed, 62 insertions(+), 116 deletions(-)


[hbase] 05/07: HBASE-24982 Disassemble the method replicateWALEntry from AdminService to a new interface ReplicationServerService (#2360)

Posted by zg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zghao pushed a commit to branch HBASE-24666
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 56cd97fbd8db99420eff5a6772c4bbda1ac28dfc
Author: XinSun <dd...@gmail.com>
AuthorDate: Wed Sep 9 15:00:37 2020 +0800

    HBASE-24982 Disassemble the method replicateWALEntry from AdminService to a new interface ReplicationServerService (#2360)
    
    Signed-off-by: Wellington Chevreuil <wc...@apache.org>
---
 .../hadoop/hbase/client/AsyncConnectionImpl.java   |  16 ++
 .../server/replication/ReplicationServer.proto     |  32 ++++
 .../hadoop/hbase/replication/ReplicationUtils.java |  19 ++
 .../hbase/client/AsyncClusterConnection.java       |   5 +
 .../hbase/client/AsyncClusterConnectionImpl.java   |   5 +
 .../hbase/client/AsyncReplicationServerAdmin.java  |  80 +++++++++
 .../hbase/protobuf/ReplicationProtobufUtil.java    |  18 ++
 .../hadoop/hbase/regionserver/RSRpcServices.java   |  10 +-
 .../replication/HBaseReplicationEndpoint.java      |  57 +++++-
 .../replication/ReplicationServerRpcServices.java  | 200 +--------------------
 .../HBaseInterClusterReplicationEndpoint.java      |   7 +-
 .../regionserver/ReplicationSource.java            |   2 +-
 .../hbase/client/DummyAsyncClusterConnection.java  |   5 +
 .../replication/TestHBaseReplicationEndpoint.java  |  17 +-
 .../hbase/replication/TestReplicationServer.java   |  43 ++++-
 .../regionserver/TestReplicationSource.java        |   4 +-
 16 files changed, 292 insertions(+), 228 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
index 406af0d..e8f0fb0 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
@@ -66,6 +66,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminServic
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationServerProtos.ReplicationServerService;
 
 /**
  * The implementation of AsyncConnection.
@@ -107,6 +108,8 @@ class AsyncConnectionImpl implements AsyncConnection {
 
   private final ConcurrentMap<String, ClientService.Interface> rsStubs = new ConcurrentHashMap<>();
   private final ConcurrentMap<String, AdminService.Interface> adminSubs = new ConcurrentHashMap<>();
+  private final ConcurrentMap<String, ReplicationServerService.Interface> replStubs =
+      new ConcurrentHashMap<>();
 
   private final AtomicReference<MasterService.Interface> masterStub = new AtomicReference<>();
 
@@ -266,12 +269,25 @@ class AsyncConnectionImpl implements AsyncConnection {
     return AdminService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout));
   }
 
+  private ReplicationServerService.Interface createReplicationServerStub(ServerName serverName)
+      throws IOException {
+    return ReplicationServerService.newStub(
+        rpcClient.createRpcChannel(serverName, user, rpcTimeout));
+  }
+
   AdminService.Interface getAdminStub(ServerName serverName) throws IOException {
     return ConcurrentMapUtils.computeIfAbsentEx(adminSubs,
       getStubKey(AdminService.getDescriptor().getName(), serverName, hostnameCanChange),
       () -> createAdminServerStub(serverName));
   }
 
+  ReplicationServerService.Interface getReplicationServerStub(ServerName serverName)
+      throws IOException {
+    return ConcurrentMapUtils.computeIfAbsentEx(replStubs,
+        getStubKey(ReplicationServerService.Interface.class.getSimpleName(), serverName,
+            hostnameCanChange), () -> createReplicationServerStub(serverName));
+  }
+
   CompletableFuture<MasterService.Interface> getMasterStub() {
     return ConnectionUtils.getOrFetch(masterStub, masterStubMakeFuture, false, () -> {
       CompletableFuture<MasterService.Interface> future = new CompletableFuture<>();
diff --git a/hbase-protocol-shaded/src/main/protobuf/server/replication/ReplicationServer.proto b/hbase-protocol-shaded/src/main/protobuf/server/replication/ReplicationServer.proto
new file mode 100644
index 0000000..ed334c4
--- /dev/null
+++ b/hbase-protocol-shaded/src/main/protobuf/server/replication/ReplicationServer.proto
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+syntax = "proto2";
+package hbase.pb;
+
+option java_package = "org.apache.hadoop.hbase.shaded.protobuf.generated";
+option java_outer_classname = "ReplicationServerProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+option optimize_for = SPEED;
+
+import "server/region/Admin.proto";
+
+service ReplicationServerService {
+  rpc ReplicateWALEntry(ReplicateWALEntryRequest)
+    returns(ReplicateWALEntryResponse);
+}
\ No newline at end of file
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
index a786206..7bafbc2 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
@@ -30,6 +30,9 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.AsyncAdmin;
+import org.apache.hadoop.hbase.client.AsyncConnection;
+import org.apache.hadoop.hbase.util.FutureUtils;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -212,4 +215,20 @@ public final class ReplicationUtils {
     }
     return initialValue * HConstants.RETRY_BACKOFF[ntries];
   }
+
+  /**
+   * Check whether peer cluster supports replication offload.
+   * @param peerConn connection for peer cluster
+   * @return true if peer cluster version >= 3
+   * @throws IOException exception
+   */
+  public static boolean isPeerClusterSupportReplicationOffload(AsyncConnection peerConn)
+      throws IOException {
+    AsyncAdmin admin = peerConn.getAdmin();
+    String version = FutureUtils.get(admin.getClusterMetrics()).getHBaseVersion();
+    if (Integer.parseInt(version.split("\\.")[0]) >= 3) {
+      return true;
+    }
+    return false;
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
index 92118ac..b6a3b97 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
@@ -42,6 +42,11 @@ public interface AsyncClusterConnection extends AsyncConnection {
   AsyncRegionServerAdmin getRegionServerAdmin(ServerName serverName);
 
   /**
+   * Get the admin service for the give replication server.
+   */
+  AsyncReplicationServerAdmin getReplicationServerAdmin(ServerName serverName);
+
+  /**
    * Get the nonce generator for this connection.
    */
   NonceGenerator getNonceGenerator();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java
index 39fc3a2..e4c2ee3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java
@@ -71,6 +71,11 @@ class AsyncClusterConnectionImpl extends AsyncConnectionImpl implements AsyncClu
   }
 
   @Override
+  public AsyncReplicationServerAdmin getReplicationServerAdmin(ServerName serverName) {
+    return new AsyncReplicationServerAdmin(serverName, this);
+  }
+
+  @Override
   public CompletableFuture<FlushRegionResponse> flush(byte[] regionName,
       boolean writeFlushWALMarker) {
     RawAsyncHBaseAdmin admin = (RawAsyncHBaseAdmin) getAdmin();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncReplicationServerAdmin.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncReplicationServerAdmin.java
new file mode 100644
index 0000000..7511a64
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncReplicationServerAdmin.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationServerProtos.ReplicationServerService;
+
+/**
+ * A simple wrapper of the {@link ReplicationServerService} for a replication server.
+ * <p/>
+ * Notice that there is no retry, and this is intentional.
+ */
+@InterfaceAudience.Private
+public class AsyncReplicationServerAdmin {
+
+  private final ServerName server;
+
+  private final AsyncConnectionImpl conn;
+
+  AsyncReplicationServerAdmin(ServerName server, AsyncConnectionImpl conn) {
+    this.server = server;
+    this.conn = conn;
+  }
+
+  @FunctionalInterface
+  private interface RpcCall<RESP> {
+    void call(ReplicationServerService.Interface stub, HBaseRpcController controller,
+        RpcCallback<RESP> done);
+  }
+
+  private <RESP> CompletableFuture<RESP> call(RpcCall<RESP> rpcCall, CellScanner cellScanner) {
+    CompletableFuture<RESP> future = new CompletableFuture<>();
+    HBaseRpcController controller = conn.rpcControllerFactory.newController(cellScanner);
+    try {
+      rpcCall.call(conn.getReplicationServerStub(server), controller, resp -> {
+        if (controller.failed()) {
+          future.completeExceptionally(controller.getFailed());
+        } else {
+          future.complete(resp);
+        }
+      });
+    } catch (IOException e) {
+      future.completeExceptionally(e);
+    }
+    return future;
+  }
+
+  public CompletableFuture<AdminProtos.ReplicateWALEntryResponse> replicateWALEntry(
+      AdminProtos.ReplicateWALEntryRequest request, CellScanner cellScanner, int timeout) {
+    return call((stub, controller, done) -> {
+      controller.setCallTimeout(timeout);
+      stub.replicateWALEntry(controller, request, done);
+    }, cellScanner);
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtobufUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtobufUtil.java
index e47c929..17f48a6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtobufUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtobufUtil.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.PrivateCellUtil;
 import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
+import org.apache.hadoop.hbase.client.AsyncReplicationServerAdmin;
 import org.apache.hadoop.hbase.io.SizedCellScanner;
 import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
 import org.apache.hadoop.hbase.util.FutureUtils;
@@ -62,6 +63,23 @@ public class ReplicationProtobufUtil {
   }
 
   /**
+   * A helper to replicate a list of WAL entries using replication server admin
+   * @param admin the replication server admin
+   * @param entries Array of WAL entries to be replicated
+   * @param replicationClusterId Id which will uniquely identify source cluster FS client
+   *          configurations in the replication configuration directory
+   * @param sourceBaseNamespaceDir Path to source cluster base namespace directory
+   * @param sourceHFileArchiveDir Path to the source cluster hfile archive directory
+   */
+  public static void replicateWALEntry(AsyncReplicationServerAdmin admin, Entry[] entries,
+      String replicationClusterId, Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir,
+      int timeout) throws IOException {
+    Pair<ReplicateWALEntryRequest, CellScanner> p = buildReplicateWALEntryRequest(entries, null,
+        replicationClusterId, sourceBaseNamespaceDir, sourceHFileArchiveDir);
+    FutureUtils.get(admin.replicateWALEntry(p.getFirst(), p.getSecond(), timeout));
+  }
+
+  /**
    * Create a new ReplicateWALEntryRequest from a list of WAL entries
    * @param entries the WAL entries to be replicated
    * @return a pair of ReplicateWALEntryRequest and a CellScanner over all the WALEdit values found.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index a59f5e6..bc138c4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -257,6 +257,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuo
 import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse.TableQuotaSnapshot;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationServerProtos.ReplicationServerService;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
@@ -270,7 +271,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDe
 @SuppressWarnings("deprecation")
 public class RSRpcServices implements HBaseRPCErrorHandler,
     AdminService.BlockingInterface, ClientService.BlockingInterface, PriorityFunction,
-    ConfigurationObserver {
+    ConfigurationObserver, ReplicationServerService.BlockingInterface {
   protected static final Logger LOG = LoggerFactory.getLogger(RSRpcServices.class);
 
   /** RPC scheduler to use for the region server. */
@@ -1488,8 +1489,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     }
     if (admin) {
       bssi.add(new BlockingServiceAndInterface(
-      AdminService.newReflectiveBlockingService(this),
-      AdminService.BlockingInterface.class));
+          AdminService.newReflectiveBlockingService(this),
+          AdminService.BlockingInterface.class));
+      bssi.add(new BlockingServiceAndInterface(
+          ReplicationServerService.newReflectiveBlockingService(this),
+          ReplicationServerService.BlockingInterface.class));
     }
     return new org.apache.hbase.thirdparty.com.google.common.collect.
         ImmutableList.Builder<BlockingServiceAndInterface>().addAll(bssi).build();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java
index b08c990..8d60f23 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java
@@ -27,11 +27,15 @@ import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.client.AsyncClusterConnection;
 import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
+import org.apache.hadoop.hbase.client.AsyncReplicationServerAdmin;
 import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
+import org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil;
 import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.zookeeper.ZKListener;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.Abortable;
@@ -280,7 +284,7 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
     }
     ServerName serverName =
       sinkServers.get(ThreadLocalRandom.current().nextInt(sinkServers.size()));
-    return new SinkPeer(serverName, conn.getRegionServerAdmin(serverName));
+    return createSinkPeer(serverName);
   }
 
   /**
@@ -343,21 +347,60 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
   /**
    * Wraps a replication region server sink to provide the ability to identify it.
    */
-  public static class SinkPeer {
+  public static abstract class SinkPeer {
     private ServerName serverName;
-    private AsyncRegionServerAdmin regionServer;
 
-    public SinkPeer(ServerName serverName, AsyncRegionServerAdmin regionServer) {
+    public SinkPeer(ServerName serverName) {
       this.serverName = serverName;
-      this.regionServer = regionServer;
     }
 
     ServerName getServerName() {
       return serverName;
     }
 
-    public AsyncRegionServerAdmin getRegionServer() {
-      return regionServer;
+    public abstract void replicateWALEntry(WAL.Entry[] entries, String replicationClusterId,
+      Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir, int timeout) throws IOException;
+  }
+
+  public static class RegionServerSinkPeer extends SinkPeer {
+
+    private AsyncRegionServerAdmin regionServer;
+
+    public RegionServerSinkPeer(ServerName serverName,
+      AsyncRegionServerAdmin replicationServer) {
+      super(serverName);
+      this.regionServer = replicationServer;
+    }
+
+    public void replicateWALEntry(WAL.Entry[] entries, String replicationClusterId,
+      Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir, int timeout) throws IOException {
+      ReplicationProtobufUtil.replicateWALEntry(regionServer, entries, replicationClusterId,
+        sourceBaseNamespaceDir, sourceHFileArchiveDir, timeout);
+    }
+  }
+
+  public static class ReplicationServerSinkPeer extends SinkPeer {
+
+    private AsyncReplicationServerAdmin replicationServer;
+
+    public ReplicationServerSinkPeer(ServerName serverName,
+      AsyncReplicationServerAdmin replicationServer) {
+      super(serverName);
+      this.replicationServer = replicationServer;
+    }
+
+    public void replicateWALEntry(WAL.Entry[] entries, String replicationClusterId,
+      Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir, int timeout) throws IOException {
+      ReplicationProtobufUtil.replicateWALEntry(replicationServer, entries, replicationClusterId,
+        sourceBaseNamespaceDir, sourceHFileArchiveDir, timeout);
+    }
+  }
+
+  private SinkPeer createSinkPeer(ServerName serverName) throws IOException {
+    if (ReplicationUtils.isPeerClusterSupportReplicationOffload(conn)) {
+      return new ReplicationServerSinkPeer(serverName, conn.getReplicationServerAdmin(serverName));
+    } else {
+      return new RegionServerSinkPeer(serverName, conn.getRegionServerAdmin(serverName));
     }
   }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationServerRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationServerRpcServices.java
index 1b9b699..15d4f8c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationServerRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationServerRpcServices.java
@@ -27,14 +27,12 @@ import java.util.concurrent.atomic.LongAdder;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.CellScanner;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.client.ConnectionUtils;
 import org.apache.hadoop.hbase.io.ByteBuffAllocator;
 import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
 import org.apache.hadoop.hbase.ipc.HBaseRpcController;
 import org.apache.hadoop.hbase.ipc.PriorityFunction;
-import org.apache.hadoop.hbase.ipc.QosPriority;
 import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
 import org.apache.hadoop.hbase.ipc.RpcServerFactory;
 import org.apache.hadoop.hbase.ipc.RpcServerInterface;
@@ -58,53 +56,11 @@ import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponseRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponses;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SlowLogResponseRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SlowLogResponses;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationServerProtos.ReplicationServerService;
 
 import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
 import org.apache.hbase.thirdparty.com.google.protobuf.Message;
@@ -117,7 +73,7 @@ import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
 @InterfaceAudience.Private
 @SuppressWarnings("deprecation")
 public class ReplicationServerRpcServices implements HBaseRPCErrorHandler,
-    AdminService.BlockingInterface, PriorityFunction {
+    ReplicationServerService.BlockingInterface, PriorityFunction {
 
   protected static final Logger LOG = LoggerFactory.getLogger(ReplicationServerRpcServices.class);
 
@@ -256,8 +212,8 @@ public class ReplicationServerRpcServices implements HBaseRPCErrorHandler,
   protected List<BlockingServiceAndInterface> getServices() {
     List<BlockingServiceAndInterface> bssi = new ArrayList<>();
     bssi.add(new BlockingServiceAndInterface(
-      AdminService.newReflectiveBlockingService(this),
-      AdminService.BlockingInterface.class));
+      ReplicationServerService.newReflectiveBlockingService(this),
+        ReplicationServerService.BlockingInterface.class));
     return new ImmutableList.Builder<BlockingServiceAndInterface>().addAll(bssi).build();
   }
 
@@ -325,154 +281,6 @@ public class ReplicationServerRpcServices implements HBaseRPCErrorHandler,
     }
   }
 
-  @Override
-  public GetRegionInfoResponse getRegionInfo(RpcController controller, GetRegionInfoRequest request)
-      throws ServiceException {
-    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
-  }
-
-  @Override
-  public GetStoreFileResponse getStoreFile(RpcController controller, GetStoreFileRequest request)
-      throws ServiceException {
-    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
-  }
-
-  @Override
-  public GetOnlineRegionResponse getOnlineRegion(RpcController controller,
-      GetOnlineRegionRequest request) throws ServiceException {
-    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
-  }
-
-  @Override
-  public OpenRegionResponse openRegion(RpcController controller, OpenRegionRequest request)
-      throws ServiceException {
-    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
-  }
-
-  @Override
-  public WarmupRegionResponse warmupRegion(RpcController controller, WarmupRegionRequest request)
-      throws ServiceException {
-    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
-  }
-
-  @Override
-  public CloseRegionResponse closeRegion(RpcController controller, CloseRegionRequest request)
-      throws ServiceException {
-    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
-  }
-
-  @Override
-  public FlushRegionResponse flushRegion(RpcController controller, FlushRegionRequest request)
-      throws ServiceException {
-    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
-  }
-
-  @Override
-  public CompactionSwitchResponse compactionSwitch(RpcController controller,
-      CompactionSwitchRequest request) throws ServiceException {
-    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
-  }
-
-  @Override
-  public CompactRegionResponse compactRegion(RpcController controller,
-      CompactRegionRequest request) throws ServiceException {
-    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
-  }
-
-  @Override
-  public ReplicateWALEntryResponse replay(RpcController controller,
-      ReplicateWALEntryRequest request) throws ServiceException {
-    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
-  }
-
-  @Override
-  public RollWALWriterResponse rollWALWriter(RpcController controller, RollWALWriterRequest request)
-      throws ServiceException {
-    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
-  }
-
-  @Override
-  public GetServerInfoResponse getServerInfo(RpcController controller, GetServerInfoRequest request)
-      throws ServiceException {
-    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
-  }
-
-  /**
-   * Stop the replication server.
-   *
-   * @param controller the RPC controller
-   * @param request the request
-   */
-  @Override
-  @QosPriority(priority=HConstants.ADMIN_QOS)
-  public StopServerResponse stopServer(final RpcController controller,
-      final StopServerRequest request) {
-    requestCount.increment();
-    String reason = request.getReason();
-    replicationServer.stop(reason);
-    return StopServerResponse.newBuilder().build();
-  }
-
-  @Override
-  public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller,
-      UpdateFavoredNodesRequest request) throws ServiceException {
-    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
-  }
-
-  @Override
-  public UpdateConfigurationResponse updateConfiguration(RpcController controller,
-      UpdateConfigurationRequest request) throws ServiceException {
-    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
-  }
-
-  @Override
-  public GetRegionLoadResponse getRegionLoad(RpcController controller,
-      GetRegionLoadRequest request) throws ServiceException {
-    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
-  }
-
-  @Override
-  public ClearCompactionQueuesResponse clearCompactionQueues(RpcController controller,
-      ClearCompactionQueuesRequest request) throws ServiceException {
-    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
-  }
-
-  @Override
-  public ClearRegionBlockCacheResponse clearRegionBlockCache(RpcController controller,
-      ClearRegionBlockCacheRequest request) throws ServiceException {
-    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
-  }
-
-  @Override
-  public GetSpaceQuotaSnapshotsResponse getSpaceQuotaSnapshots(RpcController controller,
-      GetSpaceQuotaSnapshotsRequest request) throws ServiceException {
-    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
-  }
-
-  @Override
-  public ExecuteProceduresResponse executeProcedures(RpcController controller,
-      ExecuteProceduresRequest request) throws ServiceException {
-    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
-  }
-
-  @Override
-  public SlowLogResponses getSlowLogResponses(RpcController controller,
-      SlowLogResponseRequest request) throws ServiceException {
-    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
-  }
-
-  @Override
-  public SlowLogResponses getLargeLogResponses(RpcController controller,
-      SlowLogResponseRequest request) throws ServiceException {
-    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
-  }
-
-  @Override
-  public ClearSlowLogResponses clearSlowLogsResponses(RpcController controller,
-      ClearSlowLogResponseRequest request) throws ServiceException {
-    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
-  }
-
   protected AccessChecker getAccessChecker() {
     return accessChecker;
   }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
index b127b46..942a56f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
@@ -52,7 +52,6 @@ import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.ipc.RpcServer;
-import org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil;
 import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
 import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
 import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
@@ -567,11 +566,9 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
           logPeerId(), entriesHashCode, entries.size(), size, replicationClusterId);
       }
       sinkPeer = getReplicationSink();
-      AsyncRegionServerAdmin rsAdmin = sinkPeer.getRegionServer();
       try {
-        ReplicationProtobufUtil.replicateWALEntry(rsAdmin,
-          entries.toArray(new Entry[entries.size()]), replicationClusterId, baseNamespaceDir,
-          hfileArchiveDir, timeout);
+        sinkPeer.replicateWALEntry(entries.toArray(new Entry[entries.size()]), replicationClusterId,
+            baseNamespaceDir, hfileArchiveDir, timeout);
         if (LOG.isTraceEnabled()) {
           LOG.trace("{} Completed replicating batch {}", logPeerId(), entriesHashCode);
         }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 1c7f093..dc7f1a5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -441,7 +441,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
             t.getName());
           manager.refreshSources(peerId);
           break;
-        } catch (IOException e1) {
+        } catch (ReplicationException | IOException e1) {
           LOG.error("Replication sources refresh failed.", e1);
           sleepForRetries("Sleeping before try refreshing sources again",
             maxRetriesMultiplier);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java
index 8755749..5af4086 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java
@@ -109,6 +109,11 @@ public class DummyAsyncClusterConnection implements AsyncClusterConnection {
   }
 
   @Override
+  public AsyncReplicationServerAdmin getReplicationServerAdmin(ServerName serverName) {
+    return null;
+  }
+
+  @Override
   public NonceGenerator getNonceGenerator() {
     return null;
   }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java
index 4160141..4182eaf 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java
@@ -28,7 +28,8 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.client.AsyncClusterConnection;
-import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
+import org.apache.hadoop.hbase.client.AsyncReplicationServerAdmin;
+import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint.ReplicationServerSinkPeer;
 import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint.SinkPeer;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
@@ -99,7 +100,7 @@ public class TestHBaseReplicationEndpoint {
     // Sanity check
     assertEquals(1, endpoint.getNumSinks());
 
-    SinkPeer sinkPeer = new SinkPeer(serverNameA, mock(AsyncRegionServerAdmin.class));
+    SinkPeer sinkPeer = mockSinkPeer(serverNameA);
     endpoint.reportBadSink(sinkPeer);
     // Just reporting a bad sink once shouldn't have an effect
     assertEquals(1, endpoint.getNumSinks());
@@ -123,7 +124,7 @@ public class TestHBaseReplicationEndpoint {
     assertEquals(expected, endpoint.getNumSinks());
 
     ServerName badSinkServer0 = endpoint.getSinkServers().get(0);
-    SinkPeer sinkPeer = new SinkPeer(badSinkServer0, mock(AsyncRegionServerAdmin.class));
+    SinkPeer sinkPeer = mockSinkPeer(badSinkServer0);
     for (int i = 0; i <= HBaseReplicationEndpoint.DEFAULT_BAD_SINK_THRESHOLD; i++) {
       endpoint.reportBadSink(sinkPeer);
     }
@@ -133,7 +134,7 @@ public class TestHBaseReplicationEndpoint {
 
     // now try a sink that has some successes
     ServerName badSinkServer1 = endpoint.getSinkServers().get(0);
-    sinkPeer = new SinkPeer(badSinkServer1, mock(AsyncRegionServerAdmin.class));
+    sinkPeer = mockSinkPeer(badSinkServer1);
     for (int i = 0; i < HBaseReplicationEndpoint.DEFAULT_BAD_SINK_THRESHOLD; i++) {
       endpoint.reportBadSink(sinkPeer);
     }
@@ -168,8 +169,8 @@ public class TestHBaseReplicationEndpoint {
     ServerName serverNameA = endpoint.getSinkServers().get(0);
     ServerName serverNameB = endpoint.getSinkServers().get(1);
 
-    SinkPeer sinkPeerA = new SinkPeer(serverNameA, mock(AsyncRegionServerAdmin.class));
-    SinkPeer sinkPeerB = new SinkPeer(serverNameB, mock(AsyncRegionServerAdmin.class));
+    SinkPeer sinkPeerA = mockSinkPeer(serverNameA);
+    SinkPeer sinkPeerB = mockSinkPeer(serverNameB);
 
     for (int i = 0; i <= HBaseReplicationEndpoint.DEFAULT_BAD_SINK_THRESHOLD; i++) {
       endpoint.reportBadSink(sinkPeerA);
@@ -207,4 +208,8 @@ public class TestHBaseReplicationEndpoint {
       return null;
     }
   }
+
+  private SinkPeer mockSinkPeer(ServerName serverName) {
+    return new ReplicationServerSinkPeer(serverName, mock(AsyncReplicationServerAdmin.class));
+  }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServer.java
index 6a0ef3d..0ef23f2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServer.java
@@ -30,14 +30,15 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.AsyncClusterConnection;
-import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
+import org.apache.hadoop.hbase.client.AsyncReplicationServerAdmin;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.master.HMaster;
-import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
+import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint.ReplicationServerSinkPeer;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -116,22 +117,48 @@ public class TestReplicationServer {
     TEST_UTIL.deleteTableIfAny(TABLENAME);
   }
 
+  /**
+   * Requests replication server using {@link AsyncReplicationServerAdmin}
+   */
   @Test
   public void testReplicateWAL() throws Exception {
-    AsyncClusterConnection conn = TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().get(0)
-        .getRegionServer().getAsyncClusterConnection();
-    AsyncRegionServerAdmin rsAdmin = conn.getRegionServerAdmin(replicationServer.getServerName());
+    AsyncClusterConnection conn =
+        TEST_UTIL.getHBaseCluster().getMaster().getAsyncClusterConnection();
+    AsyncReplicationServerAdmin replAdmin =
+        conn.getReplicationServerAdmin(replicationServer.getServerName());
+
+    ReplicationServerSinkPeer sinkPeer =
+        new ReplicationServerSinkPeer(replicationServer.getServerName(), replAdmin);
+    replicateWALEntryAndVerify(sinkPeer);
+  }
+
+  /**
+   * Requests region server using {@link AsyncReplicationServerAdmin}
+   */
+  @Test
+  public void testReplicateWAL2() throws Exception {
+    AsyncClusterConnection conn =
+        TEST_UTIL.getHBaseCluster().getMaster().getAsyncClusterConnection();
+    ServerName rs = TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().get(0)
+        .getRegionServer().getServerName();
+    AsyncReplicationServerAdmin replAdmin = conn.getReplicationServerAdmin(rs);
+
+    ReplicationServerSinkPeer
+      sinkPeer = new ReplicationServerSinkPeer(rs, replAdmin);
+    replicateWALEntryAndVerify(sinkPeer);
+  }
 
+  private void replicateWALEntryAndVerify(ReplicationServerSinkPeer sinkPeer) throws Exception {
     Entry[] entries = new Entry[BATCH_SIZE];
     for(int i = 0; i < BATCH_SIZE; i++) {
       entries[i] = generateEdit(i, TABLENAME, Bytes.toBytes(i));
     }
 
-    ReplicationProtbufUtil.replicateWALEntry(rsAdmin, entries, replicationClusterId,
-        baseNamespaceDir, hfileArchiveDir, 1000);
+    sinkPeer.replicateWALEntry(entries, replicationClusterId, baseNamespaceDir, hfileArchiveDir,
+        1000);
 
+    Table table = TEST_UTIL.getConnection().getTable(TABLENAME);
     for (int i = 0; i < BATCH_SIZE; i++) {
-      Table table = TEST_UTIL.getConnection().getTable(TABLENAME);
       Result result = table.get(new Get(Bytes.toBytes(i)));
       Cell cell = result.getColumnLatestCell(Bytes.toBytes(FAMILY), Bytes.toBytes(FAMILY));
       assertNotNull(cell);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
index 60624e2..5817c00 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
@@ -510,7 +510,7 @@ public class TestReplicationSource {
     String queueId = "qid";
     RegionServerServices rss =
       TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
-    rs.init(conf, null, manager, null, mockPeer, rss, queueId, null,
+    rs.init(conf, null, null, manager, null, mockPeer, rss, queueId, null,
       p -> OptionalLong.empty(), new MetricsSource(queueId));
     try {
       rs.startup();
@@ -547,7 +547,7 @@ public class TestReplicationSource {
     String queueId = "qid";
     RegionServerServices rss =
       TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
-    rs.init(conf, null, manager, null, mockPeer, rss, queueId, null,
+    rs.init(conf, null, null, manager, null, mockPeer, rss, queueId, null,
       p -> OptionalLong.empty(), new MetricsSource(queueId));
     try {
       rs.startup();


[hbase] 03/07: HBASE-24735: Refactor ReplicationSourceManager: move logPositionAndCleanOldLogs/cleanUpHFileRefs to ReplicationSource inside (#2064)

Posted by zg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zghao pushed a commit to branch HBASE-24666
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 5a62429a67577fda09ddbac2d954473d05b172f3
Author: Guanghao Zhang <zg...@apache.org>
AuthorDate: Tue Aug 11 20:07:09 2020 +0800

    HBASE-24735: Refactor ReplicationSourceManager: move logPositionAndCleanOldLogs/cleanUpHFileRefs to ReplicationSource inside (#2064)
    
    Signed-off-by: Wellington Chevreuil <wc...@apache.org>
---
 .../regionserver/RecoveredReplicationSource.java   |  18 ++-
 .../regionserver/ReplicationSource.java            | 160 +++++++++++++++++++--
 .../regionserver/ReplicationSourceInterface.java   |  33 +++--
 .../regionserver/ReplicationSourceManager.java     | 158 ++------------------
 .../regionserver/ReplicationSourceShipper.java     |  17 +--
 .../regionserver/ReplicationSourceWALReader.java   |  17 +--
 .../SerialReplicationSourceWALReader.java          |   1 +
 .../replication/regionserver/WALEntryBatch.java    |   2 +-
 .../hbase/replication/ReplicationSourceDummy.java  |  24 ++--
 .../regionserver/TestReplicationSource.java        |  12 +-
 .../regionserver/TestReplicationSourceManager.java |  50 +++----
 .../regionserver/TestWALEntryStream.java           |  20 +--
 12 files changed, 258 insertions(+), 254 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
index 00aa026..62685ee 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.PriorityBlockingQueue;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -44,15 +45,18 @@ public class RecoveredReplicationSource extends ReplicationSource {
 
   private static final Logger LOG = LoggerFactory.getLogger(RecoveredReplicationSource.class);
 
+  private Path walDir;
+
   private String actualPeerId;
 
   @Override
-  public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
-      ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
-      String peerClusterZnode, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
-      MetricsSource metrics) throws IOException {
-    super.init(conf, fs, manager, queueStorage, replicationPeer, server, peerClusterZnode,
+  public void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSourceManager manager,
+    ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
+    String peerClusterZnode, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
+    MetricsSource metrics) throws IOException {
+    super.init(conf, fs, walDir, manager, queueStorage, replicationPeer, server, peerClusterZnode,
       clusterId, walFileLengthProvider, metrics);
+    this.walDir = walDir;
     this.actualPeerId = this.replicationQueueInfo.getPeerId();
   }
 
@@ -93,7 +97,7 @@ public class RecoveredReplicationSource extends ReplicationSource {
               deadRsDirectory.suffix(AbstractFSWALProvider.SPLITTING_EXT), path.getName()) };
           for (Path possibleLogLocation : locs) {
             LOG.info("Possible location " + possibleLogLocation.toUri().toString());
-            if (manager.getFs().exists(possibleLogLocation)) {
+            if (this.fs.exists(possibleLogLocation)) {
               // We found the right new location
               LOG.info("Log " + path + " still exists at " + possibleLogLocation);
               newPaths.add(possibleLogLocation);
@@ -126,7 +130,7 @@ public class RecoveredReplicationSource extends ReplicationSource {
   // N.B. the ReplicationSyncUp tool sets the manager.getWALDir to the root of the wal
   // area rather than to the wal area for a particular region server.
   private Path getReplSyncUpPath(Path path) throws IOException {
-    FileStatus[] rss = fs.listStatus(manager.getLogDir());
+    FileStatus[] rss = fs.listStatus(walDir);
     for (FileStatus rs : rss) {
       Path p = rs.getPath();
       FileStatus[] logs = fs.listStatus(p);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 2716ade..1c7f093 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -28,7 +28,9 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.NavigableSet;
 import java.util.TreeMap;
+import java.util.TreeSet;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.PriorityBlockingQueue;
@@ -37,6 +39,7 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Predicate;
+import java.util.stream.Collectors;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.mutable.MutableBoolean;
@@ -54,16 +57,20 @@ import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
 import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
 import org.apache.hadoop.hbase.replication.ClusterMarkingEntryFilter;
 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
+import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationPeer;
 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ReplicationUtils;
 import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
 import org.apache.hadoop.hbase.replication.WALEntryFilter;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
@@ -95,7 +102,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
   protected ReplicationQueueInfo replicationQueueInfo;
 
   // The manager of all sources to which we ping back our progress
-  protected ReplicationSourceManager manager;
+  ReplicationSourceManager manager;
   // Should we stop everything?
   protected Server server;
   // How long should we sleep for each retry
@@ -140,8 +147,6 @@ public class ReplicationSource implements ReplicationSourceInterface {
   protected final ConcurrentHashMap<String, ReplicationSourceShipper> workerThreads =
       new ConcurrentHashMap<>();
 
-  private AtomicLong totalBufferUsed;
-
   public static final String WAIT_ON_ENDPOINT_SECONDS =
     "hbase.replication.wait.on.endpoint.seconds";
   public static final int DEFAULT_WAIT_ON_ENDPOINT_SECONDS = 30;
@@ -193,7 +198,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
    * @param metrics metrics for replication source
    */
   @Override
-  public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
+  public void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSourceManager manager,
       ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
       String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
       MetricsSource metrics) throws IOException {
@@ -221,7 +226,6 @@ public class ReplicationSource implements ReplicationSourceInterface {
     defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
     currentBandwidth = getCurrentBandwidth();
     this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0);
-    this.totalBufferUsed = manager.getTotalBufferUsed();
     this.walFileLengthProvider = walFileLengthProvider;
 
     this.abortOnError = this.conf.getBoolean("replication.source.regionserver.abort",
@@ -408,9 +412,9 @@ public class ReplicationSource implements ReplicationSourceInterface {
 
   private ReplicationSourceWALReader createNewWALReader(String walGroupId,
       PriorityBlockingQueue<Path> queue, long startPosition) {
-    return replicationPeer.getPeerConfig().isSerial()
-      ? new SerialReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this)
-      : new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this);
+    return replicationPeer.getPeerConfig().isSerial() ?
+      new SerialReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this) :
+      new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this);
   }
 
   /**
@@ -452,11 +456,6 @@ public class ReplicationSource implements ReplicationSourceInterface {
   }
 
   @Override
-  public ReplicationSourceManager getSourceManager() {
-    return this.manager;
-  }
-
-  @Override
   public void tryThrottle(int batchSize) throws InterruptedException {
     checkBandwidthChangeAndResetThrottler();
     if (throttler.isEnabled()) {
@@ -784,7 +783,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
       throttler.addPushSize(batchSize);
     }
     totalReplicatedEdits.addAndGet(entries.size());
-    long newBufferUsed = totalBufferUsed.addAndGet(-batchSize);
+    long newBufferUsed = manager.getTotalBufferUsed().addAndGet(-batchSize);
     // Record the new buffer usage
     this.manager.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
   }
@@ -819,4 +818,137 @@ public class ReplicationSource implements ReplicationSourceInterface {
   private String logPeerId(){
     return "peerId=" + this.getPeerId() + ",";
   }
+
+  @VisibleForTesting
+  public void setWALPosition(WALEntryBatch entryBatch) {
+    String fileName = entryBatch.getLastWalPath().getName();
+    interruptOrAbortWhenFail(() -> this.queueStorage
+      .setWALPosition(server.getServerName(), getQueueId(), fileName,
+        entryBatch.getLastWalPosition(), entryBatch.getLastSeqIds()));
+  }
+
+  @VisibleForTesting
+  public void cleanOldWALs(String log, boolean inclusive) {
+    NavigableSet<String> walsToRemove = getWalsToRemove(log, inclusive);
+    if (walsToRemove.isEmpty()) {
+      return;
+    }
+    // cleanOldWALs may spend some time, especially for sync replication where we may want to
+    // remove remote wals as the remote cluster may have already been down, so we do it outside
+    // the lock to avoid block preLogRoll
+    cleanOldWALs(walsToRemove);
+  }
+
+  private NavigableSet<String> getWalsToRemove(String log, boolean inclusive) {
+    NavigableSet<String> walsToRemove = new TreeSet<>();
+    String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(log);
+    try {
+      this.queueStorage.getWALsInQueue(this.server.getServerName(), getQueueId()).forEach(wal -> {
+        LOG.debug("getWalsToRemove wal {}", wal);
+        String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal);
+        if (walPrefix.equals(logPrefix)) {
+          walsToRemove.add(wal);
+        }
+      });
+    } catch (ReplicationException e) {
+      // Just log the exception here, as the recovered replication source will try to cleanup again.
+      LOG.warn("Failed to read wals in queue {}", getQueueId(), e);
+    }
+    return walsToRemove.headSet(log, inclusive);
+  }
+
+  private void removeRemoteWALs(String peerId, String remoteWALDir, Collection<String> wals)
+    throws IOException {
+    Path remoteWALDirForPeer = ReplicationUtils.getPeerRemoteWALDir(remoteWALDir, peerId);
+    FileSystem fs = ReplicationUtils.getRemoteWALFileSystem(conf, remoteWALDir);
+    for (String wal : wals) {
+      Path walFile = new Path(remoteWALDirForPeer, wal);
+      try {
+        if (!fs.delete(walFile, false) && fs.exists(walFile)) {
+          throw new IOException("Can not delete " + walFile);
+        }
+      } catch (FileNotFoundException e) {
+        // Just ignore since this means the file has already been deleted.
+        // The javadoc of the FileSystem.delete methods does not specify the behavior of deleting an
+        // inexistent file, so here we deal with both, i.e, check the return value of the
+        // FileSystem.delete, and also catch FNFE.
+        LOG.debug("The remote wal {} has already been deleted?", walFile, e);
+      }
+    }
+  }
+
+  private void cleanOldWALs(NavigableSet<String> wals) {
+    LOG.debug("Removing {} logs in the list: {}", wals.size(), wals);
+    // The intention here is that, we want to delete the remote wal files ASAP as it may effect the
+    // failover time if you want to transit the remote cluster from S to A. And the infinite retry
+    // is not a problem, as if we can not contact with the remote HDFS cluster, then usually we can
+    // not contact with the HBase cluster either, so the replication will be blocked either.
+    if (isSyncReplication()) {
+      String peerId = getPeerId();
+      String remoteWALDir = replicationPeer.getPeerConfig().getRemoteWALDir();
+      // Filter out the wals need to be removed from the remote directory. Its name should be the
+      // special format, and also, the peer id in its name should match the peer id for the
+      // replication source.
+      List<String> remoteWals = wals.stream().filter(w -> SyncReplicationWALProvider
+        .getSyncReplicationPeerIdFromWALName(w).map(peerId::equals).orElse(false))
+        .collect(Collectors.toList());
+      LOG.debug("Removing {} logs from remote dir {} in the list: {}", remoteWals.size(),
+        remoteWALDir, remoteWals);
+      if (!remoteWals.isEmpty()) {
+        for (int sleepMultiplier = 0;;) {
+          try {
+            removeRemoteWALs(peerId, remoteWALDir, remoteWals);
+            break;
+          } catch (IOException e) {
+            LOG.warn("Failed to delete remote wals from remote dir {} for peer {}", remoteWALDir,
+              peerId);
+          }
+          if (!isSourceActive()) {
+            // skip the following operations
+            return;
+          }
+          if (ReplicationUtils.sleepForRetries("Failed to delete remote wals", sleepForRetries,
+            sleepMultiplier, maxRetriesMultiplier)) {
+            sleepMultiplier++;
+          }
+        }
+      }
+    }
+    for (String wal : wals) {
+      interruptOrAbortWhenFail(
+        () -> this.queueStorage.removeWAL(server.getServerName(), getQueueId(), wal));
+    }
+  }
+
+  public void cleanUpHFileRefs(List<String> files) {
+    interruptOrAbortWhenFail(() -> this.queueStorage.removeHFileRefs(getPeerId(), files));
+  }
+
+  @FunctionalInterface
+  private interface ReplicationQueueOperation {
+    void exec() throws ReplicationException;
+  }
+
+  /**
+   * Refresh replication source will terminate the old source first, then the source thread will be
+   * interrupted. Need to handle it instead of abort the region server.
+   */
+  private void interruptOrAbortWhenFail(ReplicationQueueOperation op) {
+    try {
+      op.exec();
+    } catch (ReplicationException e) {
+      if (e.getCause() != null && e.getCause() instanceof KeeperException.SystemErrorException
+        && e.getCause().getCause() != null && e.getCause()
+        .getCause() instanceof InterruptedException) {
+        // ReplicationRuntimeException(a RuntimeException) is thrown out here. The reason is
+        // that thread is interrupted deep down in the stack, it should pass the following
+        // processing logic and propagate to the most top layer which can handle this exception
+        // properly. In this specific case, the top layer is ReplicationSourceShipper#run().
+        throw new ReplicationRuntimeException(
+          "Thread is interrupted, the replication source may be terminated",
+          e.getCause().getCause());
+      }
+      server.abort("Failed to operate on replication queue", e);
+    }
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
index 33a413f..321edc2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
@@ -43,15 +43,15 @@ public interface ReplicationSourceInterface {
 
   /**
    * Initializer for the source
-   * @param conf the configuration to use
-   * @param fs the file system to use
-   * @param manager the manager to use
+   *
+   * @param conf   the configuration to use
+   * @param fs     the file system to use
    * @param server the server for this region server
    */
-  void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
-      ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
-      String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
-      MetricsSource metrics) throws IOException;
+  void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSourceManager manager,
+    ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
+    String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
+    MetricsSource metrics) throws IOException;
 
   /**
    * Add a log to the list of logs to replicate
@@ -148,11 +148,6 @@ public interface ReplicationSourceInterface {
   ReplicationEndpoint getReplicationEndpoint();
 
   /**
-   * @return the replication source manager
-   */
-  ReplicationSourceManager getSourceManager();
-
-  /**
    * @return the wal file length provider
    */
   WALFileLengthProvider getWALFileLengthProvider();
@@ -192,4 +187,18 @@ public interface ReplicationSourceInterface {
   default boolean isRecovered() {
     return false;
   }
+
+  /**
+   * Set the current position of WAL to {@link ReplicationQueueStorage}
+   * @param entryBatch a batch of WAL entries to replicate
+   */
+  void setWALPosition(WALEntryBatch entryBatch);
+
+  /**
+   * Cleans a WAL and all older WALs from replication queue. Called when we are sure that a WAL is
+   * closed and has no more entries.
+   * @param walName the name of WAL
+   * @param inclusive whether we should also remove the given WAL
+   */
+  void cleanOldWALs(String walName, boolean inclusive);
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index a222f4b..3212697 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -17,10 +17,8 @@
  */
 package org.apache.hadoop.hbase.replication.regionserver;
 
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -59,14 +57,11 @@ import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
 import org.apache.hadoop.hbase.replication.ReplicationTracker;
-import org.apache.hadoop.hbase.replication.ReplicationUtils;
 import org.apache.hadoop.hbase.replication.SyncReplicationState;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
-import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider;
 import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -324,7 +319,7 @@ public class ReplicationSourceManager implements ReplicationListener {
     MetricsSource metrics = new MetricsSource(queueId);
     sourceMetrics.put(queueId, metrics);
     // init replication source
-    src.init(conf, fs, this, queueStorage, replicationPeer, server, queueId, clusterId,
+    src.init(conf, fs, logDir, this, queueStorage, replicationPeer, server, queueId, clusterId,
       walFileLengthProvider, metrics);
     return src;
   }
@@ -528,29 +523,6 @@ public class ReplicationSourceManager implements ReplicationListener {
     void exec() throws ReplicationException;
   }
 
-  /**
-   * Refresh replication source will terminate the old source first, then the source thread will be
-   * interrupted. Need to handle it instead of abort the region server.
-   */
-  private void interruptOrAbortWhenFail(ReplicationQueueOperation op) {
-    try {
-      op.exec();
-    } catch (ReplicationException e) {
-      if (e.getCause() != null && e.getCause() instanceof KeeperException.SystemErrorException
-          && e.getCause().getCause() != null && e.getCause()
-          .getCause() instanceof InterruptedException) {
-        // ReplicationRuntimeException(a RuntimeException) is thrown out here. The reason is
-        // that thread is interrupted deep down in the stack, it should pass the following
-        // processing logic and propagate to the most top layer which can handle this exception
-        // properly. In this specific case, the top layer is ReplicationSourceShipper#run().
-        throw new ReplicationRuntimeException(
-          "Thread is interrupted, the replication source may be terminated",
-          e.getCause().getCause());
-      }
-      server.abort("Failed to operate on replication queue", e);
-    }
-  }
-
   private void abortWhenFail(ReplicationQueueOperation op) {
     try {
       op.exec();
@@ -576,107 +548,6 @@ public class ReplicationSourceManager implements ReplicationListener {
     }
   }
 
-  /**
-   * This method will log the current position to storage. And also clean old logs from the
-   * replication queue.
-   * @param source the replication source
-   * @param entryBatch the wal entry batch we just shipped
-   */
-  public void logPositionAndCleanOldLogs(ReplicationSourceInterface source,
-      WALEntryBatch entryBatch) {
-    String fileName = entryBatch.getLastWalPath().getName();
-    interruptOrAbortWhenFail(() -> this.queueStorage.setWALPosition(server.getServerName(),
-      source.getQueueId(), fileName, entryBatch.getLastWalPosition(), entryBatch.getLastSeqIds()));
-    cleanOldLogs(fileName, entryBatch.isEndOfFile(), source);
-  }
-
-  /**
-   * Cleans a log file and all older logs from replication queue. Called when we are sure that a log
-   * file is closed and has no more entries.
-   * @param log Path to the log
-   * @param inclusive whether we should also remove the given log file
-   * @param source the replication source
-   */
-  @VisibleForTesting
-  void cleanOldLogs(String log, boolean inclusive,
-    ReplicationSourceInterface source) {
-    NavigableSet<String> walsToRemove;
-    synchronized (this.latestPaths) {
-      walsToRemove = getWalsToRemove(source.getQueueId(), log, inclusive);
-    }
-    if (walsToRemove.isEmpty()) {
-      return;
-    }
-    // cleanOldLogs may spend some time, especially for sync replication where we may want to
-    // remove remote wals as the remote cluster may have already been down, so we do it outside
-    // the lock to avoid block preLogRoll
-    cleanOldLogs(walsToRemove, source);
-  }
-
-  private void removeRemoteWALs(String peerId, String remoteWALDir, Collection<String> wals)
-      throws IOException {
-    Path remoteWALDirForPeer = ReplicationUtils.getPeerRemoteWALDir(remoteWALDir, peerId);
-    FileSystem fs = ReplicationUtils.getRemoteWALFileSystem(conf, remoteWALDir);
-    for (String wal : wals) {
-      Path walFile = new Path(remoteWALDirForPeer, wal);
-      try {
-        if (!fs.delete(walFile, false) && fs.exists(walFile)) {
-          throw new IOException("Can not delete " + walFile);
-        }
-      } catch (FileNotFoundException e) {
-        // Just ignore since this means the file has already been deleted.
-        // The javadoc of the FileSystem.delete methods does not specify the behavior of deleting an
-        // inexistent file, so here we deal with both, i.e, check the return value of the
-        // FileSystem.delete, and also catch FNFE.
-        LOG.debug("The remote wal {} has already been deleted?", walFile, e);
-      }
-    }
-  }
-
-  private void cleanOldLogs(NavigableSet<String> wals, ReplicationSourceInterface source) {
-    LOG.debug("Removing {} logs in the list: {}", wals.size(), wals);
-    // The intention here is that, we want to delete the remote wal files ASAP as it may effect the
-    // failover time if you want to transit the remote cluster from S to A. And the infinite retry
-    // is not a problem, as if we can not contact with the remote HDFS cluster, then usually we can
-    // not contact with the HBase cluster either, so the replication will be blocked either.
-    if (source.isSyncReplication()) {
-      String peerId = source.getPeerId();
-      String remoteWALDir = source.getPeer().getPeerConfig().getRemoteWALDir();
-      // Filter out the wals need to be removed from the remote directory. Its name should be the
-      // special format, and also, the peer id in its name should match the peer id for the
-      // replication source.
-      List<String> remoteWals = wals.stream().filter(w -> SyncReplicationWALProvider
-        .getSyncReplicationPeerIdFromWALName(w).map(peerId::equals).orElse(false))
-        .collect(Collectors.toList());
-      LOG.debug("Removing {} logs from remote dir {} in the list: {}", remoteWals.size(),
-        remoteWALDir, remoteWals);
-      if (!remoteWals.isEmpty()) {
-        for (int sleepMultiplier = 0;;) {
-          try {
-            removeRemoteWALs(peerId, remoteWALDir, remoteWals);
-            break;
-          } catch (IOException e) {
-            LOG.warn("Failed to delete remote wals from remote dir {} for peer {}", remoteWALDir,
-              peerId);
-          }
-          if (!source.isSourceActive()) {
-            // skip the following operations
-            return;
-          }
-          if (ReplicationUtils.sleepForRetries("Failed to delete remote wals", sleepForRetries,
-            sleepMultiplier, maxRetriesMultiplier)) {
-            sleepMultiplier++;
-          }
-        }
-      }
-    }
-    String queueId = source.getQueueId();
-    for (String wal : wals) {
-      interruptOrAbortWhenFail(
-        () -> this.queueStorage.removeWAL(server.getServerName(), queueId, wal));
-    }
-  }
-
   // public because of we call it in TestReplicationEmptyWALRecovery
   @VisibleForTesting
   public void preLogRoll(Path newLog) throws IOException {
@@ -1092,10 +963,6 @@ public class ReplicationSourceManager implements ReplicationListener {
     }
   }
 
-  public void cleanUpHFileRefs(String peerId, List<String> files) {
-    interruptOrAbortWhenFail(() -> this.queueStorage.removeHFileRefs(peerId, files));
-  }
-
   int activeFailoverTaskCount() {
     return executor.getActiveCount();
   }
@@ -1104,20 +971,13 @@ public class ReplicationSourceManager implements ReplicationListener {
     return this.globalMetrics;
   }
 
-  private NavigableSet<String> getWalsToRemove(String queueId, String log, boolean inclusive) {
-    NavigableSet<String> walsToRemove = new TreeSet<>();
-    String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(log);
-    try {
-      this.queueStorage.getWALsInQueue(this.server.getServerName(), queueId).forEach(wal -> {
-        String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal);
-        if (walPrefix.equals(logPrefix)) {
-          walsToRemove.add(wal);
-        }
-      });
-    } catch (ReplicationException e) {
-      // Just log the exception here, as the recovered replication source will try to cleanup again.
-      LOG.warn("Failed to read wals in queue {}", queueId, e);
-    }
-    return walsToRemove.headSet(log, inclusive);
+  @InterfaceAudience.Private
+  Server getServer() {
+    return this.server;
+  }
+
+  @InterfaceAudience.Private
+  ReplicationQueueStorage getQueueStorage() {
+    return this.queueStorage;
   }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
index bb55275..eb558e0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
@@ -242,12 +242,6 @@ public class ReplicationSourceShipper extends Thread {
   }
 
   private void cleanUpHFileRefs(WALEdit edit) throws IOException {
-    String peerId = source.getPeerId();
-    if (peerId.contains("-")) {
-      // peerClusterZnode will be in the form peerId + "-" + rsZNode.
-      // A peerId will not have "-" in its name, see HBASE-11394
-      peerId = peerId.split("-")[0];
-    }
     List<Cell> cells = edit.getCells();
     int totalCells = cells.size();
     for (int i = 0; i < totalCells; i++) {
@@ -258,7 +252,7 @@ public class ReplicationSourceShipper extends Thread {
         int totalStores = stores.size();
         for (int j = 0; j < totalStores; j++) {
           List<String> storeFileList = stores.get(j).getStoreFileList();
-          source.getSourceManager().cleanUpHFileRefs(peerId, storeFileList);
+          source.cleanUpHFileRefs(storeFileList);
           source.getSourceMetrics().decrSizeOfHFileRefsQueue(storeFileList.size());
         }
       }
@@ -270,10 +264,11 @@ public class ReplicationSourceShipper extends Thread {
     // if end of file is true, then the logPositionAndCleanOldLogs method will remove the file
     // record on zk, so let's call it. The last wal position maybe zero if end of file is true and
     // there is no entry in the batch. It is OK because that the queue storage will ignore the zero
-    // position and the file will be removed soon in cleanOldLogs.
-    if (batch.isEndOfFile() || !batch.getLastWalPath().equals(currentPath) ||
-      batch.getLastWalPosition() != currentPosition) {
-      source.getSourceManager().logPositionAndCleanOldLogs(source, batch);
+    // position and the file will be removed soon in cleanOldWALs.
+    if (batch.isEndOfFile() || !batch.getLastWalPath().equals(currentPath)
+      || batch.getLastWalPosition() != currentPosition) {
+      source.setWALPosition(batch);
+      source.cleanOldWALs(batch.getLastWalPath().getName(), batch.isEndOfFile());
       updated = true;
     }
     // if end of file is true, then we can just skip to the next file in queue.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index a6d8787..65afe48 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -25,7 +25,6 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -75,9 +74,6 @@ class ReplicationSourceWALReader extends Thread {
   //Indicates whether this particular worker is running
   private boolean isReaderRunning = true;
 
-  private AtomicLong totalBufferUsed;
-  private long totalBufferQuota;
-
   /**
    * Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the
    * entries, and puts them on a batch queue.
@@ -103,8 +99,6 @@ class ReplicationSourceWALReader extends Thread {
     // memory used will be batchSizeCapacity * (nb.batches + 1)
     // the +1 is for the current thread reading before placing onto the queue
     int batchCount = conf.getInt("replication.source.nb.batches", 1);
-    this.totalBufferUsed = source.getSourceManager().getTotalBufferUsed();
-    this.totalBufferQuota = source.getSourceManager().getTotalBufferLimit();
     this.sleepForRetries =
         this.conf.getLong("replication.source.sleepforretries", 1000);    // 1 second
     this.maxRetriesMultiplier =
@@ -274,9 +268,10 @@ class ReplicationSourceWALReader extends Thread {
   //returns false if we've already exceeded the global quota
   private boolean checkQuota() {
     // try not to go over total quota
-    if (totalBufferUsed.get() > totalBufferQuota) {
+    if (source.manager.getTotalBufferUsed().get() > source.manager.getTotalBufferLimit()) {
       LOG.warn("peer={}, can't read more edits from WAL as buffer usage {}B exceeds limit {}B",
-          this.source.getPeerId(), totalBufferUsed.get(), totalBufferQuota);
+        this.source.getPeerId(), source.manager.getTotalBufferUsed().get(),
+        source.manager.getTotalBufferLimit());
       Threads.sleep(sleepForRetries);
       return false;
     }
@@ -405,10 +400,10 @@ class ReplicationSourceWALReader extends Thread {
    * @return true if we should clear buffer and push all
    */
   private boolean acquireBufferQuota(long size) {
-    long newBufferUsed = totalBufferUsed.addAndGet(size);
+    long newBufferUsed = source.manager.getTotalBufferUsed().addAndGet(size);
     // Record the new buffer usage
-    this.source.getSourceManager().getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
-    return newBufferUsed >= totalBufferQuota;
+    source.manager.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
+    return newBufferUsed >= source.manager.getTotalBufferLimit();
   }
 
   /**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java
index 9edcc8a..2108ddc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.replication.regionserver;
 
 import java.io.IOException;
 import java.util.concurrent.PriorityBlockingQueue;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
index 4f96c96..591b44d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
@@ -31,7 +31,7 @@ import org.apache.yetus.audience.InterfaceAudience;
  * Holds a batch of WAL entries to replicate, along with some statistics
  */
 @InterfaceAudience.Private
-class WALEntryBatch {
+public class WALEntryBatch {
 
   // used by recovered replication queue to indicate that all the entries have been read.
   public static final WALEntryBatch NO_MORE_DATA = new WALEntryBatch(0, null);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
index 781a1da..b75a7ed 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
+import org.apache.hadoop.hbase.replication.regionserver.WALEntryBatch;
 import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 
@@ -38,7 +39,6 @@ import org.apache.hadoop.hbase.wal.WAL.Entry;
  */
 public class ReplicationSourceDummy implements ReplicationSourceInterface {
 
-  private ReplicationSourceManager manager;
   private ReplicationPeer replicationPeer;
   private String peerClusterId;
   private Path currentPath;
@@ -47,11 +47,10 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
   private AtomicBoolean startup = new AtomicBoolean(false);
 
   @Override
-  public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
-      ReplicationQueueStorage rq, ReplicationPeer rp, Server server, String peerClusterId,
-      UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics)
-      throws IOException {
-    this.manager = manager;
+  public void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSourceManager manager,
+    ReplicationQueueStorage rq, ReplicationPeer rp, Server server, String peerClusterId,
+    UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics)
+    throws IOException {
     this.peerClusterId = peerClusterId;
     this.metrics = metrics;
     this.walFileLengthProvider = walFileLengthProvider;
@@ -133,11 +132,6 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
   }
 
   @Override
-  public ReplicationSourceManager getSourceManager() {
-    return manager;
-  }
-
-  @Override
   public void tryThrottle(int batchSize) throws InterruptedException {
   }
 
@@ -156,6 +150,14 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
   }
 
   @Override
+  public void setWALPosition(WALEntryBatch entryBatch) {
+  }
+
+  @Override
+  public void cleanOldWALs(String walName, boolean inclusive) {
+  }
+
+  @Override
   public ReplicationPeer getPeer() {
     return replicationPeer;
   }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
index 8b8dcd8..60624e2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
@@ -130,11 +130,11 @@ public class TestReplicationSource {
       thenReturn(DoNothingReplicationEndpoint.class.getName());
     Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
     ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
-    Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
+    Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
     String queueId = "qid";
     RegionServerServices rss =
       TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
-    rs.init(conf, null, manager, null, mockPeer, rss, queueId, null,
+    rs.init(conf, null, null, manager, null, mockPeer, rss, queueId, null,
       p -> OptionalLong.empty(), new MetricsSource(queueId));
     try {
       rs.startup();
@@ -168,11 +168,11 @@ public class TestReplicationSource {
       thenReturn(DoNothingReplicationEndpoint.class.getName());
     Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
     ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
-    Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
+    Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
     String queueId = "qid";
     RegionServerServices rss =
       TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
-    rs.init(conf, null, manager, null, mockPeer, rss, queueId,
+    rs.init(conf, null, null, manager, null, mockPeer, rss, queueId,
       uuid, p -> OptionalLong.empty(), new MetricsSource(queueId));
     try {
       rs.startup();
@@ -259,8 +259,8 @@ public class TestReplicationSource {
       Configuration testConf = HBaseConfiguration.create();
       testConf.setInt("replication.source.maxretriesmultiplier", 1);
       ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
-      Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
-      source.init(testConf, null, manager, null, mockPeer, null, "testPeer",
+      Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
+      source.init(testConf, null, null, manager, null, mockPeer, null, "testPeer",
         null, p -> OptionalLong.empty(), null);
       ExecutorService executor = Executors.newSingleThreadExecutor();
       Future<?> future = executor.submit(
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index 8e38114..4b685ce 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -35,6 +35,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.NavigableSet;
+import java.util.OptionalLong;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeMap;
@@ -333,8 +334,9 @@ public abstract class TestReplicationSourceManager {
     when(source.getQueueId()).thenReturn("1");
     when(source.isRecovered()).thenReturn(false);
     when(source.isSyncReplication()).thenReturn(false);
-    manager.logPositionAndCleanOldLogs(source,
-      new WALEntryBatch(0, manager.getSources().get(0).getCurrentPath()));
+    WALEntryBatch batch = new WALEntryBatch(0, manager.getSources().get(0).getCurrentPath());
+    source.setWALPosition(batch);
+    source.cleanOldWALs(batch.getLastWalPath().getName(), batch.isEndOfFile());
 
     wal.appendData(hri,
       new WALKeyImpl(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes),
@@ -408,11 +410,10 @@ public abstract class TestReplicationSourceManager {
     assertEquals(1, manager.getWalsByIdRecoveredQueues().size());
     String id = "1-" + server.getServerName().getServerName();
     assertEquals(files, manager.getWalsByIdRecoveredQueues().get(id).get(group));
-    ReplicationSourceInterface source = mock(ReplicationSourceInterface.class);
-    when(source.getQueueId()).thenReturn(id);
-    when(source.isRecovered()).thenReturn(true);
-    when(source.isSyncReplication()).thenReturn(false);
-    manager.cleanOldLogs(file2, false, source);
+    ReplicationSourceInterface source = new ReplicationSource();
+    source.init(conf, fs, null, manager, manager.getQueueStorage(), rp1.getPeer("1"),
+      manager.getServer(), id, null, p -> OptionalLong.empty(), null);
+    source.cleanOldWALs(file2, false);
     // log1 should be deleted
     assertEquals(Sets.newHashSet(file2), manager.getWalsByIdRecoveredQueues().get(id).get(group));
   }
@@ -589,19 +590,15 @@ public abstract class TestReplicationSourceManager {
     }
   }
 
-  private ReplicationSourceInterface mockReplicationSource(String peerId) {
-    ReplicationSourceInterface source = mock(ReplicationSourceInterface.class);
-    when(source.getPeerId()).thenReturn(peerId);
-    when(source.getQueueId()).thenReturn(peerId);
-    when(source.isRecovered()).thenReturn(false);
-    when(source.isSyncReplication()).thenReturn(true);
+  private ReplicationPeer mockReplicationPeerForSyncReplication(String peerId) {
     ReplicationPeerConfig config = mock(ReplicationPeerConfig.class);
     when(config.getRemoteWALDir())
       .thenReturn(remoteLogDir.makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString());
+    when(config.isSyncReplication()).thenReturn(true);
     ReplicationPeer peer = mock(ReplicationPeer.class);
     when(peer.getPeerConfig()).thenReturn(config);
-    when(source.getPeer()).thenReturn(peer);
-    return source;
+    when(peer.getId()).thenReturn(peerId);
+    return peer;
   }
 
   @Test
@@ -630,13 +627,19 @@ public abstract class TestReplicationSourceManager {
       manager.preLogRoll(wal);
       manager.postLogRoll(wal);
 
-      ReplicationSourceInterface source = mockReplicationSource(peerId2);
-      manager.cleanOldLogs(walName, true, source);
+      ReplicationSourceInterface source = new ReplicationSource();
+      source.init(conf, fs, null, manager, manager.getQueueStorage(),
+        mockReplicationPeerForSyncReplication(peerId2), manager.getServer(), peerId2, null,
+        p -> OptionalLong.empty(), null);
+      source.cleanOldWALs(walName, true);
       // still there if peer id does not match
       assertTrue(fs.exists(remoteWAL));
 
-      source = mockReplicationSource(slaveId);
-      manager.cleanOldLogs(walName, true, source);
+      source = new ReplicationSource();
+      source.init(conf, fs, null, manager, manager.getQueueStorage(),
+        mockReplicationPeerForSyncReplication(slaveId), manager.getServer(), slaveId, null,
+        p -> OptionalLong.empty(), null);
+      source.cleanOldWALs(walName, true);
       assertFalse(fs.exists(remoteWAL));
     } finally {
       removePeerAndWait(peerId2);
@@ -813,11 +816,10 @@ public abstract class TestReplicationSourceManager {
 
   static class FailInitializeDummyReplicationSource extends ReplicationSourceDummy {
 
-    @Override
-    public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
-        ReplicationQueueStorage rq, ReplicationPeer rp, Server server, String peerClusterId,
-        UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics)
-        throws IOException {
+    @Override public void init(Configuration conf, FileSystem fs, Path walDir,
+      ReplicationSourceManager manager, ReplicationQueueStorage rq, ReplicationPeer rp,
+      Server server, String peerClusterId, UUID clusterId,
+      WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
       throw new IOException("Failing deliberately");
     }
   }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
index 63e7a8b..9410604 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
@@ -369,23 +369,27 @@ public class TestWALEntryStream {
   }
 
   private ReplicationSource mockReplicationSource(boolean recovered, Configuration conf) {
-    ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class);
-    when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
-    when(mockSourceManager.getTotalBufferLimit()).thenReturn(
-        (long) HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
     Server mockServer = Mockito.mock(Server.class);
     ReplicationSource source = Mockito.mock(ReplicationSource.class);
-    when(source.getSourceManager()).thenReturn(mockSourceManager);
     when(source.getSourceMetrics()).thenReturn(new MetricsSource("1"));
     when(source.getWALFileLengthProvider()).thenReturn(log);
     when(source.getServer()).thenReturn(mockServer);
     when(source.isRecovered()).thenReturn(recovered);
-    MetricsReplicationGlobalSourceSource globalMetrics = Mockito.mock(
-        MetricsReplicationGlobalSourceSource.class);
-    when(mockSourceManager.getGlobalMetrics()).thenReturn(globalMetrics);
+    source.manager = mockReplicationSourceManager();
     return source;
   }
 
+  private ReplicationSourceManager mockReplicationSourceManager() {
+    ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class);
+    MetricsReplicationGlobalSourceSource globalMetrics =
+      Mockito.mock(MetricsReplicationGlobalSourceSource.class);
+    when(mockSourceManager.getGlobalMetrics()).thenReturn(globalMetrics);
+    when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
+    when(mockSourceManager.getTotalBufferLimit())
+      .thenReturn((long) HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
+    return mockSourceManager;
+  }
+
   private ReplicationSourceWALReader createReader(boolean recovered, Configuration conf) {
     ReplicationSource source = mockReplicationSource(recovered, conf);
     when(source.isPeerEnabled()).thenReturn(true);


[hbase] 06/07: HBASE-24998 Introduce a ReplicationSourceOverallController interface and decouple ReplicationSourceManager and ReplicationSource (#2364)

Posted by zg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zghao pushed a commit to branch HBASE-24666
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 17621c871def9af08844c124886fb22b8c1fcfcf
Author: Guanghao Zhang <zg...@apache.org>
AuthorDate: Sun Sep 20 09:02:53 2020 +0800

    HBASE-24998 Introduce a ReplicationSourceOverallController interface and decouple ReplicationSourceManager and ReplicationSource (#2364)
    
    Signed-off-by: meiyi <my...@gmail.com>
---
 .../java/org/apache/hadoop/hbase/HConstants.java   |   2 +
 .../hbase/replication/ReplicationListener.java     |   2 +-
 .../replication/ReplicationSourceController.java   |  31 +++--
 .../regionserver/RecoveredReplicationSource.java   |  18 ++-
 .../regionserver/ReplicationSource.java            |  31 ++---
 .../regionserver/ReplicationSourceInterface.java   |  25 ++--
 .../regionserver/ReplicationSourceManager.java     | 141 +++++++++++----------
 .../regionserver/ReplicationSourceWALReader.java   |  13 +-
 .../hbase/replication/ReplicationSourceDummy.java  |  21 ++-
 .../regionserver/TestReplicationSourceManager.java |  11 +-
 .../regionserver/TestWALEntryStream.java           |  15 ++-
 11 files changed, 167 insertions(+), 143 deletions(-)

diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index 5b4b6fb..f61558d 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -979,6 +979,8 @@ public final class HConstants {
   /*
    * cluster replication constants.
    */
+  public static final String REPLICATION_OFFLOAD_ENABLE_KEY = "hbase.replication.offload.enabled";
+  public static final boolean REPLICATION_OFFLOAD_ENABLE_DEFAULT = false;
   public static final String
       REPLICATION_SOURCE_SERVICE_CLASSNAME = "hbase.replication.source.service";
   public static final String REPLICATION_SERVICE_CLASSNAME_DEFAULT =
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java
index f040bf9..6ecbb46 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java
@@ -33,5 +33,5 @@ public interface ReplicationListener {
    * A region server has been removed from the local cluster
    * @param regionServer the removed region server
    */
-  public void regionServerRemoved(String regionServer);
+  void regionServerRemoved(String regionServer);
 }
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSourceController.java
similarity index 50%
copy from hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java
copy to hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSourceController.java
index f040bf9..5bb9dd6 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSourceController.java
@@ -1,5 +1,4 @@
-/*
- *
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -18,20 +17,32 @@
  */
 package org.apache.hadoop.hbase.replication;
 
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSource;
+import org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSource;
 import org.apache.yetus.audience.InterfaceAudience;
 
 /**
- * The replication listener interface can be implemented if a class needs to subscribe to events
- * generated by the ReplicationTracker. These events include things like addition/deletion of peer
- * clusters or failure of a local region server. To receive events, the class also needs to register
- * itself with a Replication Tracker.
+ * Used to control all replication sources inside one RegionServer or ReplicationServer.
+ * Used by {@link org.apache.hadoop.hbase.replication.regionserver.ReplicationSource} or
+ * {@link RecoveredReplicationSource}.
  */
 @InterfaceAudience.Private
-public interface ReplicationListener {
+public interface ReplicationSourceController {
+
+  /**
+   * Returns the maximum size in bytes of edits held in memory which are pending replication
+   * across all sources inside this RegionServer or ReplicationServer.
+   */
+  long getTotalBufferLimit();
+
+  AtomicLong getTotalBufferUsed();
+
+  MetricsReplicationGlobalSourceSource getGlobalMetrics();
 
   /**
-   * A region server has been removed from the local cluster
-   * @param regionServer the removed region server
+   * Call this when the recovered replication source replicated all WALs.
    */
-  public void regionServerRemoved(String regionServer);
+  void finishRecoveredSource(RecoveredReplicationSource src);
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
index 62685ee..e0b626c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.replication.ReplicationPeer;
 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ReplicationSourceController;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -45,18 +46,15 @@ public class RecoveredReplicationSource extends ReplicationSource {
 
   private static final Logger LOG = LoggerFactory.getLogger(RecoveredReplicationSource.class);
 
-  private Path walDir;
-
   private String actualPeerId;
 
   @Override
-  public void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSourceManager manager,
-    ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
-    String peerClusterZnode, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
-    MetricsSource metrics) throws IOException {
-    super.init(conf, fs, walDir, manager, queueStorage, replicationPeer, server, peerClusterZnode,
-      clusterId, walFileLengthProvider, metrics);
-    this.walDir = walDir;
+  public void init(Configuration conf, FileSystem fs, Path walDir,
+    ReplicationSourceController overallController, ReplicationQueueStorage queueStorage,
+    ReplicationPeer replicationPeer, Server server, String peerClusterZnode, UUID clusterId,
+    WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
+    super.init(conf, fs, walDir, overallController, queueStorage, replicationPeer, server,
+      peerClusterZnode, clusterId, walFileLengthProvider, metrics);
     this.actualPeerId = this.replicationQueueInfo.getPeerId();
   }
 
@@ -149,7 +147,7 @@ public class RecoveredReplicationSource extends ReplicationSource {
   void tryFinish() {
     if (workerThreads.isEmpty()) {
       this.getSourceMetrics().clear();
-      manager.finishRecoveredSource(this);
+      controller.finishRecoveredSource(this);
     }
   }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index dc7f1a5..879c604 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -61,6 +61,7 @@ import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationPeer;
 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ReplicationSourceController;
 import org.apache.hadoop.hbase.replication.ReplicationUtils;
 import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
 import org.apache.hadoop.hbase.replication.WALEntryFilter;
@@ -101,8 +102,9 @@ public class ReplicationSource implements ReplicationSourceInterface {
   protected Configuration conf;
   protected ReplicationQueueInfo replicationQueueInfo;
 
-  // The manager of all sources to which we ping back our progress
-  ReplicationSourceManager manager;
+  protected Path walDir;
+
+  protected ReplicationSourceController controller;
   // Should we stop everything?
   protected Server server;
   // How long should we sleep for each retry
@@ -187,23 +189,14 @@ public class ReplicationSource implements ReplicationSourceInterface {
     this.baseFilterOutWALEntries = Collections.unmodifiableList(baseFilterOutWALEntries);
   }
 
-  /**
-   * Instantiation method used by region servers
-   * @param conf configuration to use
-   * @param fs file system to use
-   * @param manager replication manager to ping to
-   * @param server the server for this region server
-   * @param queueId the id of our replication queue
-   * @param clusterId unique UUID for the cluster
-   * @param metrics metrics for replication source
-   */
   @Override
-  public void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSourceManager manager,
-      ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
-      String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
-      MetricsSource metrics) throws IOException {
+  public void init(Configuration conf, FileSystem fs, Path walDir,
+    ReplicationSourceController overallController, ReplicationQueueStorage queueStorage,
+    ReplicationPeer replicationPeer, Server server, String queueId, UUID clusterId,
+    WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
     this.server = server;
     this.conf = HBaseConfiguration.create(conf);
+    this.walDir = walDir;
     this.waitOnEndpointSeconds =
       this.conf.getInt(WAIT_ON_ENDPOINT_SECONDS, DEFAULT_WAIT_ON_ENDPOINT_SECONDS);
     decorateConf();
@@ -214,7 +207,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
     this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32);
     this.queueStorage = queueStorage;
     this.replicationPeer = replicationPeer;
-    this.manager = manager;
+    this.controller = overallController;
     this.fs = fs;
     this.metrics = metrics;
     this.clusterId = clusterId;
@@ -783,9 +776,9 @@ public class ReplicationSource implements ReplicationSourceInterface {
       throttler.addPushSize(batchSize);
     }
     totalReplicatedEdits.addAndGet(entries.size());
-    long newBufferUsed = manager.getTotalBufferUsed().addAndGet(-batchSize);
+    long newBufferUsed = controller.getTotalBufferUsed().addAndGet(-batchSize);
     // Record the new buffer usage
-    this.manager.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
+    controller.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
   }
 
   @Override
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
index 321edc2..f3bf8a4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
 import org.apache.hadoop.hbase.replication.ReplicationPeer;
 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ReplicationSourceController;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.yetus.audience.InterfaceAudience;
 
@@ -44,14 +45,22 @@ public interface ReplicationSourceInterface {
   /**
    * Initializer for the source
    *
-   * @param conf   the configuration to use
-   * @param fs     the file system to use
-   * @param server the server for this region server
-   */
-  void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSourceManager manager,
-    ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
-    String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
-    MetricsSource metrics) throws IOException;
+   * @param conf configuration to use
+   * @param fs file system to use
+   * @param walDir the directory where the WAL is located
+   * @param overallController the overall controller of all replication sources
+   * @param queueStorage the replication queue storage
+   * @param replicationPeer the replication peer
+   * @param server the server which start and run this replication source
+   * @param queueId the id of our replication queue
+   * @param clusterId unique UUID for the cluster
+   * @param walFileLengthProvider used to get the WAL length
+   * @param metrics metrics for this replication source
+   */
+  void init(Configuration conf, FileSystem fs, Path walDir,
+    ReplicationSourceController overallController, ReplicationQueueStorage queueStorage,
+    ReplicationPeer replicationPeer, Server server, String queueId, UUID clusterId,
+    WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException;
 
   /**
    * Add a log to the list of logs to replicate
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 3212697..de9e21f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -56,6 +56,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ReplicationSourceController;
 import org.apache.hadoop.hbase.replication.ReplicationTracker;
 import org.apache.hadoop.hbase.replication.SyncReplicationState;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -92,7 +93,7 @@ import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFacto
  * </ul>
  */
 @InterfaceAudience.Private
-public class ReplicationSourceManager implements ReplicationListener {
+public class ReplicationSourceManager implements ReplicationListener, ReplicationSourceController {
   private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceManager.class);
   // all the sources that read this RS's logs and every peer only has one replication source
   private final ConcurrentMap<String, ReplicationSourceInterface> sources;
@@ -126,12 +127,6 @@ public class ReplicationSourceManager implements ReplicationListener {
 
   private AtomicLong totalBufferUsed = new AtomicLong();
 
-  // How long should we sleep for each retry when deleting remote wal files for sync replication
-  // peer.
-  private final long sleepForRetries;
-  // Maximum number of retries before taking bold actions when deleting remote wal files for sync
-  // replication peer.
-  private final int maxRetriesMultiplier;
   // Total buffer size on this RegionServer for holding batched edits to be shipped.
   private final long totalBufferLimit;
   private final MetricsReplicationGlobalSourceSource globalMetrics;
@@ -139,6 +134,12 @@ public class ReplicationSourceManager implements ReplicationListener {
   private final Map<String, MetricsSource> sourceMetrics = new HashMap<>();
 
   /**
+   * When enable replication offload, will not create replication source and only write WAL to
+   * replication queue storage. The replication source will be started by ReplicationServer.
+   */
+  private final boolean replicationOffload;
+
+  /**
    * Creates a replication manager and sets the watch on all the other registered region servers
    * @param queueStorage the interface for manipulating replication queues
    * @param replicationPeers
@@ -186,12 +187,11 @@ public class ReplicationSourceManager implements ReplicationListener {
     this.latestPaths = new HashMap<>();
     this.replicationForBulkLoadDataEnabled = conf.getBoolean(
       HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
-    this.sleepForRetries = this.conf.getLong("replication.source.sync.sleepforretries", 1000);
-    this.maxRetriesMultiplier =
-      this.conf.getInt("replication.source.sync.maxretriesmultiplier", 60);
     this.totalBufferLimit = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY,
         HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
     this.globalMetrics = globalMetrics;
+    this.replicationOffload = conf.getBoolean(HConstants.REPLICATION_OFFLOAD_ENABLE_KEY,
+      HConstants.REPLICATION_OFFLOAD_ENABLE_DEFAULT);
   }
 
   /**
@@ -212,6 +212,47 @@ public class ReplicationSourceManager implements ReplicationListener {
     return this.executor.submit(this::adoptAbandonedQueues);
   }
 
+  @VisibleForTesting
+  @Override
+  public AtomicLong getTotalBufferUsed() {
+    return totalBufferUsed;
+  }
+
+  @Override
+  public long getTotalBufferLimit() {
+    return totalBufferLimit;
+  }
+
+  @Override
+  public void finishRecoveredSource(RecoveredReplicationSource src) {
+    synchronized (oldsources) {
+      if (!removeRecoveredSource(src)) {
+        return;
+      }
+    }
+    LOG.info("Finished recovering queue {} with the following stats: {}", src.getQueueId(),
+      src.getStats());
+  }
+
+  @Override
+  public MetricsReplicationGlobalSourceSource getGlobalMetrics() {
+    return this.globalMetrics;
+  }
+
+  /**
+   * Clear the metrics and related replication queue of the specified old source
+   * @param src source to clear
+   */
+  private boolean removeRecoveredSource(ReplicationSourceInterface src) {
+    if (!this.oldsources.remove(src)) {
+      return false;
+    }
+    LOG.info("Done with the recovered queue {}", src.getQueueId());
+    // Delete queue from storage and memory
+    deleteQueue(src.getQueueId());
+    return true;
+  }
+
   private void adoptAbandonedQueues() {
     List<ServerName> currentReplicators = null;
     try {
@@ -331,8 +372,7 @@ public class ReplicationSourceManager implements ReplicationListener {
    * @param peerId the id of the replication peer
    * @return the source that was created
    */
-  @VisibleForTesting
-  ReplicationSourceInterface addSource(String peerId) throws IOException {
+  void addSource(String peerId) throws IOException {
     ReplicationPeer peer = replicationPeers.getPeer(peerId);
     ReplicationSourceInterface src = createSource(peerId, peer);
     // synchronized on latestPaths to avoid missing the new log
@@ -354,8 +394,9 @@ public class ReplicationSourceManager implements ReplicationListener {
     if (peerConfig.isSyncReplication()) {
       syncReplicationPeerMappingManager.add(peer.getId(), peerConfig);
     }
-    src.startup();
-    return src;
+    if (!replicationOffload) {
+      src.startup();
+    }
   }
 
   /**
@@ -373,7 +414,11 @@ public class ReplicationSourceManager implements ReplicationListener {
    * </p>
    * @param peerId the id of the sync replication peer
    */
-  public void drainSources(String peerId) throws IOException, ReplicationException {
+  void drainSources(String peerId) throws IOException, ReplicationException {
+    if (replicationOffload) {
+      throw new ReplicationException(
+        "Should not add use sync replication when replication offload enabled");
+    }
     String terminateMessage = "Sync replication peer " + peerId +
       " is transiting to STANDBY. Will close the previous replication source and open a new one";
     ReplicationPeer peer = replicationPeers.getPeer(peerId);
@@ -430,7 +475,7 @@ public class ReplicationSourceManager implements ReplicationListener {
    * replication queue storage and only to enqueue all logs to the new replication source
    * @param peerId the id of the replication peer
    */
-  public void refreshSources(String peerId) throws ReplicationException, IOException {
+  void refreshSources(String peerId) throws ReplicationException, IOException {
     String terminateMessage = "Peer " + peerId +
       " state or config changed. Will close the previous replication source and open a new one";
     ReplicationPeer peer = replicationPeers.getPeer(peerId);
@@ -447,7 +492,9 @@ public class ReplicationSourceManager implements ReplicationListener {
         .forEach(wal -> src.enqueueLog(new Path(this.logDir, wal)));
     }
     LOG.info("Startup replication source for " + src.getPeerId());
-    src.startup();
+    if (!replicationOffload) {
+      src.startup();
+    }
 
     List<ReplicationSourceInterface> toStartup = new ArrayList<>();
     // synchronized on oldsources to avoid race with NodeFailoverWorker
@@ -470,41 +517,18 @@ public class ReplicationSourceManager implements ReplicationListener {
         toStartup.add(recoveredReplicationSource);
       }
     }
-    for (ReplicationSourceInterface replicationSource : toStartup) {
-      replicationSource.startup();
-    }
-  }
-
-  /**
-   * Clear the metrics and related replication queue of the specified old source
-   * @param src source to clear
-   */
-  private boolean removeRecoveredSource(ReplicationSourceInterface src) {
-    if (!this.oldsources.remove(src)) {
-      return false;
-    }
-    LOG.info("Done with the recovered queue {}", src.getQueueId());
-    // Delete queue from storage and memory
-    deleteQueue(src.getQueueId());
-    return true;
-  }
-
-  void finishRecoveredSource(ReplicationSourceInterface src) {
-    synchronized (oldsources) {
-      if (!removeRecoveredSource(src)) {
-        return;
+    if (!replicationOffload) {
+      for (ReplicationSourceInterface replicationSource : toStartup) {
+        replicationSource.startup();
       }
     }
-    LOG.info("Finished recovering queue {} with the following stats: {}", src.getQueueId(),
-      src.getStats());
   }
 
   /**
    * Clear the metrics and related replication queue of the specified old source
    * @param src source to clear
    */
-  void removeSource(ReplicationSourceInterface src) {
-    LOG.info("Done with the queue " + src.getQueueId());
+  private void removeSource(ReplicationSourceInterface src) {
     this.sources.remove(src.getPeerId());
     // Delete queue from storage and memory
     deleteQueue(src.getQueueId());
@@ -548,8 +572,7 @@ public class ReplicationSourceManager implements ReplicationListener {
     }
   }
 
-  // public because of we call it in TestReplicationEmptyWALRecovery
-  @VisibleForTesting
+  @InterfaceAudience.Private
   public void preLogRoll(Path newLog) throws IOException {
     String logName = newLog.getName();
     String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(logName);
@@ -567,9 +590,8 @@ public class ReplicationSourceManager implements ReplicationListener {
     }
   }
 
-  // public because of we call it in TestReplicationEmptyWALRecovery
-  @VisibleForTesting
-  public void postLogRoll(Path newLog) throws IOException {
+  @InterfaceAudience.Private
+  public void postLogRoll(Path newLog) {
     // This only updates the sources we own, not the recovered ones
     for (ReplicationSourceInterface source : this.sources.values()) {
       source.enqueueLog(newLog);
@@ -739,7 +761,9 @@ public class ReplicationSourceManager implements ReplicationListener {
               LOG.trace("Enqueueing log from recovered queue for source: " + src.getQueueId());
               src.enqueueLog(new Path(oldLogDir, wal));
             }
-            src.startup();
+            if (!replicationOffload) {
+              src.startup();
+            }
           }
         } catch (IOException e) {
           // TODO manage it
@@ -849,19 +873,6 @@ public class ReplicationSourceManager implements ReplicationListener {
     }
   }
 
-  @VisibleForTesting
-  public AtomicLong getTotalBufferUsed() {
-    return totalBufferUsed;
-  }
-
-  /**
-   * Returns the maximum size in bytes of edits held in memory which are pending replication
-   * across all sources inside this RegionServer.
-   */
-  public long getTotalBufferLimit() {
-    return totalBufferLimit;
-  }
-
   /**
    * Get the directory where wals are archived
    * @return the directory where wals are archived
@@ -967,10 +978,6 @@ public class ReplicationSourceManager implements ReplicationListener {
     return executor.getActiveCount();
   }
 
-  MetricsReplicationGlobalSourceSource getGlobalMetrics() {
-    return this.globalMetrics;
-  }
-
   @InterfaceAudience.Private
   Server getServer() {
     return this.server;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index 65afe48..439915e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -268,10 +268,11 @@ class ReplicationSourceWALReader extends Thread {
   //returns false if we've already exceeded the global quota
   private boolean checkQuota() {
     // try not to go over total quota
-    if (source.manager.getTotalBufferUsed().get() > source.manager.getTotalBufferLimit()) {
+    if (source.controller.getTotalBufferUsed().get() > source.controller
+      .getTotalBufferLimit()) {
       LOG.warn("peer={}, can't read more edits from WAL as buffer usage {}B exceeds limit {}B",
-        this.source.getPeerId(), source.manager.getTotalBufferUsed().get(),
-        source.manager.getTotalBufferLimit());
+        this.source.getPeerId(), source.controller.getTotalBufferUsed().get(),
+        source.controller.getTotalBufferLimit());
       Threads.sleep(sleepForRetries);
       return false;
     }
@@ -400,10 +401,10 @@ class ReplicationSourceWALReader extends Thread {
    * @return true if we should clear buffer and push all
    */
   private boolean acquireBufferQuota(long size) {
-    long newBufferUsed = source.manager.getTotalBufferUsed().addAndGet(size);
+    long newBufferUsed = source.controller.getTotalBufferUsed().addAndGet(size);
     // Record the new buffer usage
-    source.manager.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
-    return newBufferUsed >= source.manager.getTotalBufferLimit();
+    source.controller.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
+    return newBufferUsed >= source.controller.getTotalBufferLimit();
   }
 
   /**
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
index b75a7ed..66059c7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
-import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
 import org.apache.hadoop.hbase.replication.regionserver.WALEntryBatch;
 import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
@@ -40,21 +39,21 @@ import org.apache.hadoop.hbase.wal.WAL.Entry;
 public class ReplicationSourceDummy implements ReplicationSourceInterface {
 
   private ReplicationPeer replicationPeer;
-  private String peerClusterId;
+  private String queueId;
   private Path currentPath;
   private MetricsSource metrics;
   private WALFileLengthProvider walFileLengthProvider;
   private AtomicBoolean startup = new AtomicBoolean(false);
 
   @Override
-  public void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSourceManager manager,
-    ReplicationQueueStorage rq, ReplicationPeer rp, Server server, String peerClusterId,
-    UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics)
-    throws IOException {
-    this.peerClusterId = peerClusterId;
+  public void init(Configuration conf, FileSystem fs, Path walDir,
+    ReplicationSourceController overallController, ReplicationQueueStorage queueStorage,
+    ReplicationPeer replicationPeer, Server server, String queueId, UUID clusterId,
+    WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
+    this.queueId = queueId;
     this.metrics = metrics;
     this.walFileLengthProvider = walFileLengthProvider;
-    this.replicationPeer = rp;
+    this.replicationPeer = replicationPeer;
   }
 
   @Override
@@ -96,14 +95,14 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
 
   @Override
   public String getQueueId() {
-    return peerClusterId;
+    return queueId;
   }
 
   @Override
   public String getPeerId() {
-    String[] parts = peerClusterId.split("-", 2);
+    String[] parts = queueId.split("-", 2);
     return parts.length != 1 ?
-        parts[0] : peerClusterId;
+        parts[0] : queueId;
   }
 
   @Override
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index 4b685ce..0e0353f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -43,6 +43,7 @@ import java.util.TreeSet;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.stream.Collectors;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -72,6 +73,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeer;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ReplicationSourceController;
 import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
 import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
 import org.apache.hadoop.hbase.replication.ReplicationUtils;
@@ -816,10 +818,11 @@ public abstract class TestReplicationSourceManager {
 
   static class FailInitializeDummyReplicationSource extends ReplicationSourceDummy {
 
-    @Override public void init(Configuration conf, FileSystem fs, Path walDir,
-      ReplicationSourceManager manager, ReplicationQueueStorage rq, ReplicationPeer rp,
-      Server server, String peerClusterId, UUID clusterId,
-      WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
+    @Override
+    public void init(Configuration conf, FileSystem fs, Path walDir,
+      ReplicationSourceController overallController, ReplicationQueueStorage queueStorage,
+      ReplicationPeer replicationPeer, Server server, String queueId, UUID clusterId,
+      WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException{
       throw new IOException("Failing deliberately");
     }
   }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
index 9410604..bafabb0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
@@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.client.RegionInfoBuilder;
 import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
+import org.apache.hadoop.hbase.replication.ReplicationSourceController;
 import org.apache.hadoop.hbase.replication.WALEntryFilter;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
@@ -375,19 +376,19 @@ public class TestWALEntryStream {
     when(source.getWALFileLengthProvider()).thenReturn(log);
     when(source.getServer()).thenReturn(mockServer);
     when(source.isRecovered()).thenReturn(recovered);
-    source.manager = mockReplicationSourceManager();
+    source.controller = mockReplicationSourceController();
     return source;
   }
 
-  private ReplicationSourceManager mockReplicationSourceManager() {
-    ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class);
+  private ReplicationSourceController mockReplicationSourceController() {
+    ReplicationSourceController controller = Mockito.mock(ReplicationSourceController.class);
     MetricsReplicationGlobalSourceSource globalMetrics =
       Mockito.mock(MetricsReplicationGlobalSourceSource.class);
-    when(mockSourceManager.getGlobalMetrics()).thenReturn(globalMetrics);
-    when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
-    when(mockSourceManager.getTotalBufferLimit())
+    when(controller.getGlobalMetrics()).thenReturn(globalMetrics);
+    when(controller.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
+    when(controller.getTotalBufferLimit())
       .thenReturn((long) HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
-    return mockSourceManager;
+    return controller;
   }
 
   private ReplicationSourceWALReader createReader(boolean recovered, Configuration conf) {


[hbase] 04/07: HBASE-24683 Add a basic ReplicationServer which only implement ReplicationSink Service (#2111)

Posted by zg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zghao pushed a commit to branch HBASE-24666
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 6625e921273b92fe75c9e1dcbc836ae0e7212d36
Author: XinSun <dd...@gmail.com>
AuthorDate: Fri Sep 4 18:53:46 2020 +0800

    HBASE-24683 Add a basic ReplicationServer which only implement ReplicationSink Service (#2111)
    
    Signed-off-by: Guanghao Zhang <zg...@apache.org>
---
 .../java/org/apache/hadoop/hbase/util/DNS.java     |   3 +-
 .../hbase/replication/HReplicationServer.java      | 391 ++++++++++++++++
 .../replication/ReplicationServerRpcServices.java  | 516 +++++++++++++++++++++
 .../hbase/replication/TestReplicationServer.java   | 151 ++++++
 4 files changed, 1060 insertions(+), 1 deletion(-)

diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/DNS.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/DNS.java
index 2b4e1cb..ddff6db 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/DNS.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/DNS.java
@@ -54,7 +54,8 @@ public final class DNS {
 
   public enum ServerType {
     MASTER("master"),
-    REGIONSERVER("regionserver");
+    REGIONSERVER("regionserver"),
+    REPLICATIONSERVER("replicationserver");
 
     private String name;
     ServerType(String name) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java
new file mode 100644
index 0000000..31dec0c
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java
@@ -0,0 +1,391 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ChoreService;
+import org.apache.hadoop.hbase.CoordinatedStateManager;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
+import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.log.HBaseMarkers;
+import org.apache.hadoop.hbase.regionserver.ReplicationService;
+import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.trace.TraceUtil;
+import org.apache.hadoop.hbase.util.Sleeper;
+import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * HReplicationServer which is responsible to all replication stuff. It checks in with
+ * the HMaster. There are many HReplicationServers in a single HBase deployment.
+ */
+@InterfaceAudience.Private
+@SuppressWarnings({ "deprecation"})
+public class HReplicationServer extends Thread implements Server {
+
+  private static final Logger LOG = LoggerFactory.getLogger(HReplicationServer.class);
+
+  /** replication server process name */
+  public static final String REPLICATION_SERVER = "replicationserver";
+
+  /**
+   * This servers start code.
+   */
+  protected final long startCode;
+
+  private volatile boolean stopped = false;
+
+  // Go down hard. Used if file system becomes unavailable and also in
+  // debugging and unit tests.
+  private AtomicBoolean abortRequested;
+
+  // flag set after we're done setting up server threads
+  final AtomicBoolean online = new AtomicBoolean(false);
+
+  /**
+   * The server name the Master sees us as.  Its made from the hostname the
+   * master passes us, port, and server start code. Gets set after registration
+   * against Master.
+   */
+  private ServerName serverName;
+
+  protected final Configuration conf;
+
+  private ReplicationSinkService replicationSinkService;
+
+  final int msgInterval;
+  // A sleeper that sleeps for msgInterval.
+  protected final Sleeper sleeper;
+
+  // zookeeper connection and watcher
+  protected final ZKWatcher zooKeeper;
+
+  /**
+   * The asynchronous cluster connection to be shared by services.
+   */
+  protected AsyncClusterConnection asyncClusterConnection;
+
+  private UserProvider userProvider;
+
+  protected final ReplicationServerRpcServices rpcServices;
+
+  public HReplicationServer(final Configuration conf) throws IOException {
+    TraceUtil.initTracer(conf);
+    try {
+      this.startCode = System.currentTimeMillis();
+      this.conf = conf;
+
+      this.abortRequested = new AtomicBoolean(false);
+
+      this.rpcServices = createRpcServices();
+
+      String hostName = this.rpcServices.isa.getHostName();
+      serverName = ServerName.valueOf(hostName, this.rpcServices.isa.getPort(), this.startCode);
+
+      this.userProvider = UserProvider.instantiate(conf);
+
+      this.msgInterval = conf.getInt("hbase.replicationserver.msginterval", 3 * 1000);
+      this.sleeper = new Sleeper(this.msgInterval, this);
+
+      // Some unit tests don't need a cluster, so no zookeeper at all
+      if (!conf.getBoolean("hbase.testing.nocluster", false)) {
+        // Open connection to zookeeper and set primary watcher
+        zooKeeper = new ZKWatcher(conf, getProcessName() + ":" +
+            rpcServices.isa.getPort(), this, false);
+      } else {
+        zooKeeper = null;
+      }
+
+      this.rpcServices.start(zooKeeper);
+    } catch (Throwable t) {
+      // Make sure we log the exception. HReplicationServer is often started via reflection and the
+      // cause of failed startup is lost.
+      LOG.error("Failed construction ReplicationServer", t);
+      throw t;
+    }
+  }
+
+  public String getProcessName() {
+    return REPLICATION_SERVER;
+  }
+
+  @Override
+  public void run() {
+    if (isStopped()) {
+      LOG.info("Skipping run; stopped");
+      return;
+    }
+    try {
+      // Do pre-registration initializations; zookeeper, lease threads, etc.
+      preRegistrationInitialization();
+    } catch (Throwable e) {
+      abort("Fatal exception during initialization", e);
+    }
+    try {
+      setupReplication();
+      startReplicationService();
+
+      online.set(true);
+
+      long lastMsg = System.currentTimeMillis();
+      // The main run loop.
+      while (!isStopped()) {
+        long now = System.currentTimeMillis();
+        if ((now - lastMsg) >= msgInterval) {
+          lastMsg = System.currentTimeMillis();
+        }
+        if (!isStopped() && !isAborted()) {
+          this.sleeper.sleep();
+        }
+      }
+
+      stopServiceThreads();
+
+      if (this.rpcServices != null) {
+        this.rpcServices.stop();
+      }
+    } catch (Throwable t) {
+      abort(t.getMessage(), t);
+    }
+
+    if (this.zooKeeper != null) {
+      this.zooKeeper.close();
+    }
+    LOG.info("Exiting; stopping=" + this.serverName + "; zookeeper connection closed.");
+  }
+
+  private Configuration cleanupConfiguration() {
+    Configuration conf = this.conf;
+    conf.set(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
+        HConstants.ZK_CONNECTION_REGISTRY_CLASS);
+    if (conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM) != null) {
+      // Use server ZK cluster for server-issued connections, so we clone
+      // the conf and unset the client ZK related properties
+      conf = new Configuration(this.conf);
+      conf.unset(HConstants.CLIENT_ZOOKEEPER_QUORUM);
+    }
+    return conf;
+  }
+
+  /**
+   * All initialization needed before we go register with Master.<br>
+   * Do bare minimum. Do bulk of initializations AFTER we've connected to the Master.<br>
+   * In here we just put up the RpcServer, setup Connection, and ZooKeeper.
+   */
+  private void preRegistrationInitialization() {
+    try {
+      setupClusterConnection();
+    } catch (Throwable t) {
+      // Call stop if error or process will stick around for ever since server
+      // puts up non-daemon threads.
+      this.rpcServices.stop();
+      abort("Initialization of RS failed.  Hence aborting RS.", t);
+    }
+  }
+
+  /**
+   * Setup our cluster connection if not already initialized.
+   */
+  protected final synchronized void setupClusterConnection() throws IOException {
+    if (asyncClusterConnection == null) {
+      Configuration conf = cleanupConfiguration();
+      InetSocketAddress localAddress = new InetSocketAddress(this.rpcServices.isa.getAddress(), 0);
+      User user = userProvider.getCurrent();
+      asyncClusterConnection =
+          ClusterConnectionFactory.createAsyncClusterConnection(conf, localAddress, user);
+    }
+  }
+
+  /**
+   * Wait on all threads to finish. Presumption is that all closes and stops
+   * have already been called.
+   */
+  protected void stopServiceThreads() {
+    if (this.replicationSinkService != null) {
+      this.replicationSinkService.stopReplicationService();
+    }
+  }
+
+  @Override
+  public Configuration getConfiguration() {
+    return conf;
+  }
+
+  @Override
+  public ZKWatcher getZooKeeper() {
+    return zooKeeper;
+  }
+
+  @Override
+  public Connection getConnection() {
+    return getAsyncConnection().toConnection();
+  }
+
+  @Override
+  public Connection createConnection(Configuration conf) throws IOException {
+    throw new DoNotRetryIOException(new UnsupportedOperationException("This's ReplicationServer."));
+  }
+
+  @Override
+  public AsyncClusterConnection getAsyncClusterConnection() {
+    return this.asyncClusterConnection;
+  }
+
+  @Override
+  public ServerName getServerName() {
+    return serverName;
+  }
+
+  @Override
+  public CoordinatedStateManager getCoordinatedStateManager() {
+    return null;
+  }
+
+  @Override
+  public ChoreService getChoreService() {
+    return null;
+  }
+
+  @Override
+  public void abort(String why, Throwable cause) {
+    if (!setAbortRequested()) {
+      // Abort already in progress, ignore the new request.
+      LOG.debug(
+          "Abort already in progress. Ignoring the current request with reason: {}", why);
+      return;
+    }
+    String msg = "***** ABORTING replication server " + this + ": " + why + " *****";
+    if (cause != null) {
+      LOG.error(HBaseMarkers.FATAL, msg, cause);
+    } else {
+      LOG.error(HBaseMarkers.FATAL, msg);
+    }
+    stop(why);
+  }
+
+  @Override
+  public boolean isAborted() {
+    return abortRequested.get();
+  }
+
+  @Override
+  public void stop(final String msg) {
+    if (!this.stopped) {
+      LOG.info("***** STOPPING region server '" + this + "' *****");
+      this.stopped = true;
+      LOG.info("STOPPED: " + msg);
+      // Wakes run() if it is sleeping
+      sleeper.skipSleepCycle();
+    }
+  }
+
+  @Override
+  public boolean isStopped() {
+    return this.stopped;
+  }
+
+  /**
+   * Setup WAL log and replication if enabled. Replication setup is done in here because it wants to
+   * be hooked up to WAL.
+   */
+  private void setupReplication() throws IOException {
+    // Instantiate replication if replication enabled. Pass it the log directories.
+    createNewReplicationInstance(conf, this);
+  }
+
+  /**
+   * Load the replication executorService objects, if any
+   */
+  private static void createNewReplicationInstance(Configuration conf, HReplicationServer server)
+      throws IOException {
+    // read in the name of the sink replication class from the config file.
+    String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME,
+        HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
+
+    server.replicationSinkService = newReplicationInstance(sinkClassname,
+        ReplicationSinkService.class, conf, server);
+  }
+
+  private static <T extends ReplicationService> T newReplicationInstance(String classname,
+      Class<T> xface, Configuration conf, HReplicationServer server) throws IOException {
+    final Class<? extends T> clazz;
+    try {
+      ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+      clazz = Class.forName(classname, true, classLoader).asSubclass(xface);
+    } catch (java.lang.ClassNotFoundException nfe) {
+      throw new IOException("Could not find class for " + classname);
+    }
+    T service = ReflectionUtils.newInstance(clazz, conf);
+    service.initialize(server, null, null, null, null);
+    return service;
+  }
+
+  /**
+   * Start up replication source and sink handlers.
+   */
+  private void startReplicationService() throws IOException {
+    if (this.replicationSinkService != null) {
+      this.replicationSinkService.startReplicationService();
+    }
+  }
+
+  /**
+   * @return Return the object that implements the replication sink executorService.
+   */
+  public ReplicationSinkService getReplicationSinkService() {
+    return replicationSinkService;
+  }
+
+  /**
+   * Report the status of the server. A server is online once all the startup is
+   * completed (setting up filesystem, starting executorService threads, etc.). This
+   * method is designed mostly to be useful in tests.
+   *
+   * @return true if online, false if not.
+   */
+  public boolean isOnline() {
+    return online.get();
+  }
+
+  protected ReplicationServerRpcServices createRpcServices() throws IOException {
+    return new ReplicationServerRpcServices(this);
+  }
+
+  /**
+   * Sets the abort state if not already set.
+   * @return True if abortRequested set to True successfully, false if an abort is already in
+   * progress.
+   */
+  protected boolean setAbortRequested() {
+    return abortRequested.compareAndSet(false, true);
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationServerRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationServerRpcServices.java
new file mode 100644
index 0000000..1b9b699
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationServerRpcServices.java
@@ -0,0 +1,516 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.net.BindException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.LongAdder;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.client.ConnectionUtils;
+import org.apache.hadoop.hbase.io.ByteBuffAllocator;
+import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.ipc.PriorityFunction;
+import org.apache.hadoop.hbase.ipc.QosPriority;
+import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
+import org.apache.hadoop.hbase.ipc.RpcServerFactory;
+import org.apache.hadoop.hbase.ipc.RpcServerInterface;
+import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
+import org.apache.hadoop.hbase.log.HBaseMarkers;
+import org.apache.hadoop.hbase.net.Address;
+import org.apache.hadoop.hbase.regionserver.RSRpcServices;
+import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException;
+import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
+import org.apache.hadoop.hbase.regionserver.RpcSchedulerFactory;
+import org.apache.hadoop.hbase.regionserver.SimpleRpcSchedulerFactory;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.access.AccessChecker;
+import org.apache.hadoop.hbase.security.access.NoopAccessChecker;
+import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher;
+import org.apache.hadoop.hbase.util.DNS;
+import org.apache.hadoop.hbase.util.DNS.ServerType;
+import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponseRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponses;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SlowLogResponseRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SlowLogResponses;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
+import org.apache.hbase.thirdparty.com.google.protobuf.Message;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
+import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
+
+/**
+ * Implements the regionserver RPC services for {@link HReplicationServer}.
+ */
+@InterfaceAudience.Private
+@SuppressWarnings("deprecation")
+public class ReplicationServerRpcServices implements HBaseRPCErrorHandler,
+    AdminService.BlockingInterface, PriorityFunction {
+
+  protected static final Logger LOG = LoggerFactory.getLogger(ReplicationServerRpcServices.class);
+
+  /** Parameter name for port replication server listens on. */
+  public static final String REPLICATION_SERVER_PORT = "hbase.replicationserver.port";
+
+  /** Default port replication server listens on. */
+  public static final int DEFAULT_REPLICATION_SERVER_PORT = 16040;
+
+  /** default port for replication server web api */
+  public static final int DEFAULT_REPLICATION_SERVER_INFOPORT = 16050;
+
+  // Request counter.
+  final LongAdder requestCount = new LongAdder();
+
+  // Server to handle client requests.
+  final RpcServerInterface rpcServer;
+  final InetSocketAddress isa;
+
+  protected final HReplicationServer replicationServer;
+
+  // The reference to the priority extraction function
+  private final PriorityFunction priority;
+
+  private AccessChecker accessChecker;
+  private ZKPermissionWatcher zkPermissionWatcher;
+
+  public ReplicationServerRpcServices(final HReplicationServer rs) throws IOException {
+    final Configuration conf = rs.getConfiguration();
+    replicationServer = rs;
+
+    final RpcSchedulerFactory rpcSchedulerFactory;
+    try {
+      rpcSchedulerFactory = getRpcSchedulerFactoryClass().asSubclass(RpcSchedulerFactory.class)
+          .getDeclaredConstructor().newInstance();
+    } catch (NoSuchMethodException | InvocationTargetException |
+        InstantiationException | IllegalAccessException e) {
+      throw new IllegalArgumentException(e);
+    }
+    // Server to handle client requests.
+    final InetSocketAddress initialIsa;
+    final InetSocketAddress bindAddress;
+
+    String hostname = DNS.getHostname(conf, ServerType.REPLICATIONSERVER);
+    int port = conf.getInt(REPLICATION_SERVER_PORT, DEFAULT_REPLICATION_SERVER_PORT);
+    // Creation of a HSA will force a resolve.
+    initialIsa = new InetSocketAddress(hostname, port);
+    bindAddress = new InetSocketAddress(
+        conf.get("hbase.replicationserver.ipc.address", hostname), port);
+
+    if (initialIsa.getAddress() == null) {
+      throw new IllegalArgumentException("Failed resolve of " + initialIsa);
+    }
+    priority = createPriority();
+    // Using Address means we don't get the IP too. Shorten it more even to just the host name
+    // w/o the domain.
+    final String name = rs.getProcessName() + "/" +
+        Address.fromParts(initialIsa.getHostName(), initialIsa.getPort()).toStringWithoutDomain();
+    // Set how many times to retry talking to another server over Connection.
+    ConnectionUtils.setServerSideHConnectionRetriesConfig(conf, name, LOG);
+    rpcServer = createRpcServer(rs, rpcSchedulerFactory, bindAddress, name);
+
+    final InetSocketAddress address = rpcServer.getListenerAddress();
+    if (address == null) {
+      throw new IOException("Listener channel is closed");
+    }
+    // Set our address, however we need the final port that was given to rpcServer
+    isa = new InetSocketAddress(initialIsa.getHostName(), address.getPort());
+    rpcServer.setErrorHandler(this);
+    rs.setName(name);
+  }
+
+  protected RpcServerInterface createRpcServer(
+      final Server server,
+      final RpcSchedulerFactory rpcSchedulerFactory,
+      final InetSocketAddress bindAddress,
+      final String name
+  ) throws IOException {
+    final Configuration conf = server.getConfiguration();
+    boolean reservoirEnabled = conf.getBoolean(ByteBuffAllocator.ALLOCATOR_POOL_ENABLED_KEY, true);
+    try {
+      return RpcServerFactory.createRpcServer(server, name, getServices(),
+          bindAddress, // use final bindAddress for this server.
+          conf, rpcSchedulerFactory.create(conf, this, server), reservoirEnabled);
+    } catch (BindException be) {
+      throw new IOException(be.getMessage() + ". To switch ports use the '"
+          + REPLICATION_SERVER_PORT + "' configuration property.",
+          be.getCause() != null ? be.getCause() : be);
+    }
+  }
+
+  protected Class<?> getRpcSchedulerFactoryClass() {
+    final Configuration conf = replicationServer.getConfiguration();
+    return conf.getClass(RSRpcServices.REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
+      SimpleRpcSchedulerFactory.class);
+  }
+
+  public PriorityFunction getPriority() {
+    return priority;
+  }
+
+  public Configuration getConfiguration() {
+    return replicationServer.getConfiguration();
+  }
+
+  void start(ZKWatcher zkWatcher) {
+    if (AccessChecker.isAuthorizationSupported(getConfiguration())) {
+      accessChecker = new AccessChecker(getConfiguration());
+    } else {
+      accessChecker = new NoopAccessChecker(getConfiguration());
+    }
+    if (!getConfiguration().getBoolean("hbase.testing.nocluster", false) && zkWatcher != null) {
+      zkPermissionWatcher =
+          new ZKPermissionWatcher(zkWatcher, accessChecker.getAuthManager(), getConfiguration());
+      try {
+        zkPermissionWatcher.start();
+      } catch (KeeperException e) {
+        LOG.error("ZooKeeper permission watcher initialization failed", e);
+      }
+    }
+    rpcServer.start();
+  }
+
+  void stop() {
+    if (zkPermissionWatcher != null) {
+      zkPermissionWatcher.close();
+    }
+    rpcServer.stop();
+  }
+
+  /**
+   * By default, put up an Admin Service.
+   * @return immutable list of blocking services and the security info classes that this server
+   *   supports
+   */
+  protected List<BlockingServiceAndInterface> getServices() {
+    List<BlockingServiceAndInterface> bssi = new ArrayList<>();
+    bssi.add(new BlockingServiceAndInterface(
+      AdminService.newReflectiveBlockingService(this),
+      AdminService.BlockingInterface.class));
+    return new ImmutableList.Builder<BlockingServiceAndInterface>().addAll(bssi).build();
+  }
+
+  public InetSocketAddress getSocketAddress() {
+    return isa;
+  }
+
+  @Override
+  public int getPriority(RequestHeader header, Message param, User user) {
+    return priority.getPriority(header, param, user);
+  }
+
+  @Override
+  public long getDeadline(RequestHeader header, Message param) {
+    return priority.getDeadline(header, param);
+  }
+
+  /*
+   * Check if an OOME and, if so, abort immediately to avoid creating more objects.
+   *
+   * @param e
+   *
+   * @return True if we OOME'd and are aborting.
+   */
+  @Override
+  public boolean checkOOME(final Throwable e) {
+    return exitIfOOME(e);
+  }
+
+  public static boolean exitIfOOME(final Throwable e) {
+    boolean stop = false;
+    try {
+      if (e instanceof OutOfMemoryError
+          || (e.getCause() != null && e.getCause() instanceof OutOfMemoryError)
+          || (e.getMessage() != null && e.getMessage().contains(
+              "java.lang.OutOfMemoryError"))) {
+        stop = true;
+        LOG.error(HBaseMarkers.FATAL, "Run out of memory; "
+          + ReplicationServerRpcServices.class.getSimpleName() + " will abort itself immediately",
+          e);
+      }
+    } finally {
+      if (stop) {
+        Runtime.getRuntime().halt(1);
+      }
+    }
+    return stop;
+  }
+
+  /**
+   * Called to verify that this server is up and running.
+   */
+  protected void checkOpen() throws IOException {
+    if (replicationServer.isAborted()) {
+      throw new RegionServerAbortedException("Server " + replicationServer.getServerName()
+          + " aborting");
+    }
+    if (replicationServer.isStopped()) {
+      throw new RegionServerStoppedException("Server " + replicationServer.getServerName()
+          + " stopping");
+    }
+    if (!replicationServer.isOnline()) {
+      throw new ServerNotRunningYetException("Server " + replicationServer.getServerName()
+          + " is not running yet");
+    }
+  }
+
+  @Override
+  public GetRegionInfoResponse getRegionInfo(RpcController controller, GetRegionInfoRequest request)
+      throws ServiceException {
+    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
+  }
+
+  @Override
+  public GetStoreFileResponse getStoreFile(RpcController controller, GetStoreFileRequest request)
+      throws ServiceException {
+    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
+  }
+
+  @Override
+  public GetOnlineRegionResponse getOnlineRegion(RpcController controller,
+      GetOnlineRegionRequest request) throws ServiceException {
+    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
+  }
+
+  @Override
+  public OpenRegionResponse openRegion(RpcController controller, OpenRegionRequest request)
+      throws ServiceException {
+    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
+  }
+
+  @Override
+  public WarmupRegionResponse warmupRegion(RpcController controller, WarmupRegionRequest request)
+      throws ServiceException {
+    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
+  }
+
+  @Override
+  public CloseRegionResponse closeRegion(RpcController controller, CloseRegionRequest request)
+      throws ServiceException {
+    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
+  }
+
+  @Override
+  public FlushRegionResponse flushRegion(RpcController controller, FlushRegionRequest request)
+      throws ServiceException {
+    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
+  }
+
+  @Override
+  public CompactionSwitchResponse compactionSwitch(RpcController controller,
+      CompactionSwitchRequest request) throws ServiceException {
+    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
+  }
+
+  @Override
+  public CompactRegionResponse compactRegion(RpcController controller,
+      CompactRegionRequest request) throws ServiceException {
+    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
+  }
+
+  @Override
+  public ReplicateWALEntryResponse replay(RpcController controller,
+      ReplicateWALEntryRequest request) throws ServiceException {
+    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
+  }
+
+  @Override
+  public RollWALWriterResponse rollWALWriter(RpcController controller, RollWALWriterRequest request)
+      throws ServiceException {
+    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
+  }
+
+  @Override
+  public GetServerInfoResponse getServerInfo(RpcController controller, GetServerInfoRequest request)
+      throws ServiceException {
+    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
+  }
+
+  /**
+   * Stop the replication server.
+   *
+   * @param controller the RPC controller
+   * @param request the request
+   */
+  @Override
+  @QosPriority(priority=HConstants.ADMIN_QOS)
+  public StopServerResponse stopServer(final RpcController controller,
+      final StopServerRequest request) {
+    requestCount.increment();
+    String reason = request.getReason();
+    replicationServer.stop(reason);
+    return StopServerResponse.newBuilder().build();
+  }
+
+  @Override
+  public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller,
+      UpdateFavoredNodesRequest request) throws ServiceException {
+    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
+  }
+
+  @Override
+  public UpdateConfigurationResponse updateConfiguration(RpcController controller,
+      UpdateConfigurationRequest request) throws ServiceException {
+    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
+  }
+
+  @Override
+  public GetRegionLoadResponse getRegionLoad(RpcController controller,
+      GetRegionLoadRequest request) throws ServiceException {
+    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
+  }
+
+  @Override
+  public ClearCompactionQueuesResponse clearCompactionQueues(RpcController controller,
+      ClearCompactionQueuesRequest request) throws ServiceException {
+    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
+  }
+
+  @Override
+  public ClearRegionBlockCacheResponse clearRegionBlockCache(RpcController controller,
+      ClearRegionBlockCacheRequest request) throws ServiceException {
+    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
+  }
+
+  @Override
+  public GetSpaceQuotaSnapshotsResponse getSpaceQuotaSnapshots(RpcController controller,
+      GetSpaceQuotaSnapshotsRequest request) throws ServiceException {
+    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
+  }
+
+  @Override
+  public ExecuteProceduresResponse executeProcedures(RpcController controller,
+      ExecuteProceduresRequest request) throws ServiceException {
+    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
+  }
+
+  @Override
+  public SlowLogResponses getSlowLogResponses(RpcController controller,
+      SlowLogResponseRequest request) throws ServiceException {
+    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
+  }
+
+  @Override
+  public SlowLogResponses getLargeLogResponses(RpcController controller,
+      SlowLogResponseRequest request) throws ServiceException {
+    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
+  }
+
+  @Override
+  public ClearSlowLogResponses clearSlowLogsResponses(RpcController controller,
+      ClearSlowLogResponseRequest request) throws ServiceException {
+    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
+  }
+
+  protected AccessChecker getAccessChecker() {
+    return accessChecker;
+  }
+
+  protected PriorityFunction createPriority() {
+    return new PriorityFunction() {
+      @Override
+      public int getPriority(RequestHeader header, Message param, User user) {
+        return 0;
+      }
+
+      @Override
+      public long getDeadline(RequestHeader header, Message param) {
+        return 0;
+      }
+    };
+  }
+
+  @Override
+  public ReplicateWALEntryResponse replicateWALEntry(RpcController controller,
+      ReplicateWALEntryRequest request) throws ServiceException {
+    try {
+      checkOpen();
+      if (replicationServer.getReplicationSinkService() != null) {
+        requestCount.increment();
+        List<WALEntry> entries = request.getEntryList();
+        CellScanner cellScanner = ((HBaseRpcController) controller).cellScanner();
+        // TODO: CP pre
+        replicationServer.getReplicationSinkService().replicateLogEntries(entries, cellScanner,
+            request.getReplicationClusterId(), request.getSourceBaseNamespaceDirPath(),
+            request.getSourceHFileArchiveDirPath());
+        // TODO: CP post
+        return ReplicateWALEntryResponse.newBuilder().build();
+      } else {
+        throw new ServiceException("Replication services are not initialized yet");
+      }
+    } catch (IOException ie) {
+      throw new ServiceException(ie);
+    }
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServer.java
new file mode 100644
index 0000000..6a0ef3d
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServer.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
+import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALKeyImpl;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category({ReplicationTests.class, MediumTests.class})
+public class TestReplicationServer {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestReplicationServer.class);
+
+  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationServer.class);
+
+  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  private static Configuration CONF = TEST_UTIL.getConfiguration();
+
+  private static HMaster MASTER;
+
+  private static HReplicationServer replicationServer;
+
+  private static Path baseNamespaceDir;
+  private static Path hfileArchiveDir;
+  private static String replicationClusterId;
+
+  private static int BATCH_SIZE = 10;
+
+  private static TableName TABLENAME = TableName.valueOf("t");
+  private static String FAMILY = "C";
+
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    TEST_UTIL.startMiniCluster();
+    MASTER = TEST_UTIL.getMiniHBaseCluster().getMaster();
+
+    replicationServer = new HReplicationServer(CONF);
+    replicationServer.start();
+
+    TEST_UTIL.getMiniHBaseCluster().waitForActiveAndReadyMaster();
+    TEST_UTIL.waitFor(60000, () -> replicationServer.isOnline());
+
+    Path rootDir = CommonFSUtils.getRootDir(CONF);
+    baseNamespaceDir = new Path(rootDir, new Path(HConstants.BASE_NAMESPACE_DIR));
+    hfileArchiveDir = new Path(rootDir, new Path(HConstants.HFILE_ARCHIVE_DIRECTORY));
+    replicationClusterId = "12345";
+  }
+
+  @AfterClass
+  public static void afterClass() throws IOException {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Before
+  public void before() throws Exception {
+    TEST_UTIL.createTable(TABLENAME, FAMILY);
+    TEST_UTIL.waitTableAvailable(TABLENAME);
+  }
+
+  @After
+  public void after() throws IOException {
+    TEST_UTIL.deleteTableIfAny(TABLENAME);
+  }
+
+  @Test
+  public void testReplicateWAL() throws Exception {
+    AsyncClusterConnection conn = TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().get(0)
+        .getRegionServer().getAsyncClusterConnection();
+    AsyncRegionServerAdmin rsAdmin = conn.getRegionServerAdmin(replicationServer.getServerName());
+
+    Entry[] entries = new Entry[BATCH_SIZE];
+    for(int i = 0; i < BATCH_SIZE; i++) {
+      entries[i] = generateEdit(i, TABLENAME, Bytes.toBytes(i));
+    }
+
+    ReplicationProtbufUtil.replicateWALEntry(rsAdmin, entries, replicationClusterId,
+        baseNamespaceDir, hfileArchiveDir, 1000);
+
+    for (int i = 0; i < BATCH_SIZE; i++) {
+      Table table = TEST_UTIL.getConnection().getTable(TABLENAME);
+      Result result = table.get(new Get(Bytes.toBytes(i)));
+      Cell cell = result.getColumnLatestCell(Bytes.toBytes(FAMILY), Bytes.toBytes(FAMILY));
+      assertNotNull(cell);
+      assertTrue(Bytes.equals(CellUtil.cloneValue(cell), Bytes.toBytes(i)));
+    }
+  }
+
+  private static WAL.Entry generateEdit(int i, TableName tableName, byte[] row) {
+    Threads.sleep(1);
+    long timestamp = System.currentTimeMillis();
+    WALKeyImpl key = new WALKeyImpl(new byte[32], tableName, i, timestamp,
+        HConstants.DEFAULT_CLUSTER_ID, null);
+    WALEdit edit = new WALEdit();
+    edit.add(new KeyValue(row, Bytes.toBytes(FAMILY), Bytes.toBytes(FAMILY), timestamp, row));
+    return new WAL.Entry(key, edit);
+  }
+}


[hbase] 02/07: HBASE-24681 Remove the cache walsById/walsByIdRecoveredQueues from ReplicationSourceManager (#2019)

Posted by zg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zghao pushed a commit to branch HBASE-24666
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 56465554696ce83e9bb520b88c48392101de07d3
Author: Guanghao Zhang <zg...@apache.org>
AuthorDate: Mon Jul 13 17:35:32 2020 +0800

    HBASE-24681 Remove the cache walsById/walsByIdRecoveredQueues from ReplicationSourceManager (#2019)
    
    Signed-off-by: Wellington Chevreuil <wc...@apache.org>
---
 .../regionserver/ReplicationSourceManager.java     | 204 +++++++--------------
 1 file changed, 62 insertions(+), 142 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 3869857..a222f4b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -91,30 +91,6 @@ import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFacto
  * <li>No need synchronized on {@link #sources}. {@link #sources} is a ConcurrentHashMap and there
  * is a Lock for peer id in {@link PeerProcedureHandlerImpl}. So there is no race for peer
  * operations.</li>
- * <li>Need synchronized on {@link #walsById}. There are four methods which modify it,
- * {@link #addPeer(String)}, {@link #removePeer(String)},
- * {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} and {@link #preLogRoll(Path)}.
- * {@link #walsById} is a ConcurrentHashMap and there is a Lock for peer id in
- * {@link PeerProcedureHandlerImpl}. So there is no race between {@link #addPeer(String)} and
- * {@link #removePeer(String)}. {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)}
- * is called by {@link ReplicationSourceInterface}. So no race with {@link #addPeer(String)}.
- * {@link #removePeer(String)} will terminate the {@link ReplicationSourceInterface} firstly, then
- * remove the wals from {@link #walsById}. So no race with {@link #removePeer(String)}. The only
- * case need synchronized is {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} and
- * {@link #preLogRoll(Path)}.</li>
- * <li>No need synchronized on {@link #walsByIdRecoveredQueues}. There are three methods which
- * modify it, {@link #removePeer(String)} ,
- * {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} and
- * {@link ReplicationSourceManager.NodeFailoverWorker#run()}.
- * {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} is called by
- * {@link ReplicationSourceInterface}. {@link #removePeer(String)} will terminate the
- * {@link ReplicationSourceInterface} firstly, then remove the wals from
- * {@link #walsByIdRecoveredQueues}. And {@link ReplicationSourceManager.NodeFailoverWorker#run()}
- * will add the wals to {@link #walsByIdRecoveredQueues} firstly, then start up a
- * {@link ReplicationSourceInterface}. So there is no race here. For
- * {@link ReplicationSourceManager.NodeFailoverWorker#run()} and {@link #removePeer(String)}, there
- * is already synchronized on {@link #oldsources}. So no need synchronized on
- * {@link #walsByIdRecoveredQueues}.</li>
  * <li>Need synchronized on {@link #latestPaths} to avoid the new open source miss new log.</li>
  * <li>Need synchronized on {@link #oldsources} to avoid adding recovered source for the
  * to-be-removed peer.</li>
@@ -135,15 +111,6 @@ public class ReplicationSourceManager implements ReplicationListener {
   // All about stopping
   private final Server server;
 
-  // All logs we are currently tracking
-  // Index structure of the map is: queue_id->logPrefix/logGroup->logs
-  // For normal replication source, the peer id is same with the queue id
-  private final ConcurrentMap<String, Map<String, NavigableSet<String>>> walsById;
-  // Logs for recovered sources we are currently tracking
-  // the map is: queue_id->logPrefix/logGroup->logs
-  // For recovered source, the queue id's format is peer_id-servername-*
-  private final ConcurrentMap<String, Map<String, NavigableSet<String>>> walsByIdRecoveredQueues;
-
   private final SyncReplicationPeerMappingManager syncReplicationPeerMappingManager;
 
   private final Configuration conf;
@@ -199,8 +166,6 @@ public class ReplicationSourceManager implements ReplicationListener {
     this.replicationPeers = replicationPeers;
     this.replicationTracker = replicationTracker;
     this.server = server;
-    this.walsById = new ConcurrentHashMap<>();
-    this.walsByIdRecoveredQueues = new ConcurrentHashMap<>();
     this.oldsources = new ArrayList<>();
     this.conf = conf;
     this.fs = fs;
@@ -338,7 +303,6 @@ public class ReplicationSourceManager implements ReplicationListener {
       // Delete queue from storage and memory and queue id is same with peer id for normal
       // source
       deleteQueue(peerId);
-      this.walsById.remove(peerId);
     }
     ReplicationPeerConfig peerConfig = peer.getPeerConfig();
     if (peerConfig.isSyncReplication()) {
@@ -379,15 +343,10 @@ public class ReplicationSourceManager implements ReplicationListener {
     // synchronized on latestPaths to avoid missing the new log
     synchronized (this.latestPaths) {
       this.sources.put(peerId, src);
-      Map<String, NavigableSet<String>> walsByGroup = new HashMap<>();
-      this.walsById.put(peerId, walsByGroup);
       // Add the latest wal to that source's queue
       if (!latestPaths.isEmpty()) {
         for (Map.Entry<String, Path> walPrefixAndPath : latestPaths.entrySet()) {
           Path walPath = walPrefixAndPath.getValue();
-          NavigableSet<String> wals = new TreeSet<>();
-          wals.add(walPath.getName());
-          walsByGroup.put(walPrefixAndPath.getKey(), wals);
           // Abort RS and throw exception to make add peer failed
           abortAndThrowIOExceptionWhenFail(
             () -> this.queueStorage.addWAL(server.getServerName(), peerId, walPath.getName()));
@@ -441,7 +400,10 @@ public class ReplicationSourceManager implements ReplicationListener {
       // map from walsById since later we may fail to delete them from the replication queue
       // storage, and when we retry next time, we can not know the wal files that need to be deleted
       // from the replication queue storage.
-      walsById.get(peerId).forEach((k, v) -> wals.put(k, new TreeSet<>(v)));
+      this.queueStorage.getWALsInQueue(this.server.getServerName(), peerId).forEach(wal -> {
+        String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal);
+        wals.computeIfAbsent(walPrefix, p -> new TreeSet<>()).add(wal);
+      });
     }
     LOG.info("Startup replication source for " + src.getPeerId());
     src.startup();
@@ -450,15 +412,6 @@ public class ReplicationSourceManager implements ReplicationListener {
         queueStorage.removeWAL(server.getServerName(), peerId, wal);
       }
     }
-    synchronized (walsById) {
-      Map<String, NavigableSet<String>> oldWals = walsById.get(peerId);
-      wals.forEach((k, v) -> {
-        NavigableSet<String> walsByGroup = oldWals.get(k);
-        if (walsByGroup != null) {
-          walsByGroup.removeAll(v);
-        }
-      });
-    }
     // synchronized on oldsources to avoid race with NodeFailoverWorker. Since NodeFailoverWorker is
     // a background task, we will delete the file from replication queue storage under the lock to
     // simplify the logic.
@@ -470,7 +423,6 @@ public class ReplicationSourceManager implements ReplicationListener {
           oldSource.terminate(terminateMessage);
           oldSource.getSourceMetrics().clear();
           queueStorage.removeQueue(server.getServerName(), queueId);
-          walsByIdRecoveredQueues.remove(queueId);
           iter.remove();
         }
       }
@@ -483,7 +435,7 @@ public class ReplicationSourceManager implements ReplicationListener {
    * replication queue storage and only to enqueue all logs to the new replication source
    * @param peerId the id of the replication peer
    */
-  public void refreshSources(String peerId) throws IOException {
+  public void refreshSources(String peerId) throws ReplicationException, IOException {
     String terminateMessage = "Peer " + peerId +
       " state or config changed. Will close the previous replication source and open a new one";
     ReplicationPeer peer = replicationPeers.getPeer(peerId);
@@ -496,9 +448,8 @@ public class ReplicationSourceManager implements ReplicationListener {
         // Do not clear metrics
         toRemove.terminate(terminateMessage, null, false);
       }
-      for (NavigableSet<String> walsByGroup : walsById.get(peerId).values()) {
-        walsByGroup.forEach(wal -> src.enqueueLog(new Path(this.logDir, wal)));
-      }
+      this.queueStorage.getWALsInQueue(this.server.getServerName(), peerId)
+        .forEach(wal -> src.enqueueLog(new Path(this.logDir, wal)));
     }
     LOG.info("Startup replication source for " + src.getPeerId());
     src.startup();
@@ -519,9 +470,8 @@ public class ReplicationSourceManager implements ReplicationListener {
       for (String queueId : previousQueueIds) {
         ReplicationSourceInterface recoveredReplicationSource = createSource(queueId, peer);
         this.oldsources.add(recoveredReplicationSource);
-        for (SortedSet<String> walsByGroup : walsByIdRecoveredQueues.get(queueId).values()) {
-          walsByGroup.forEach(wal -> recoveredReplicationSource.enqueueLog(new Path(wal)));
-        }
+        this.queueStorage.getWALsInQueue(this.server.getServerName(), queueId)
+          .forEach(wal -> recoveredReplicationSource.enqueueLog(new Path(wal)));
         toStartup.add(recoveredReplicationSource);
       }
     }
@@ -541,7 +491,6 @@ public class ReplicationSourceManager implements ReplicationListener {
     LOG.info("Done with the recovered queue {}", src.getQueueId());
     // Delete queue from storage and memory
     deleteQueue(src.getQueueId());
-    this.walsByIdRecoveredQueues.remove(src.getQueueId());
     return true;
   }
 
@@ -564,8 +513,6 @@ public class ReplicationSourceManager implements ReplicationListener {
     this.sources.remove(src.getPeerId());
     // Delete queue from storage and memory
     deleteQueue(src.getQueueId());
-    this.walsById.remove(src.getQueueId());
-
   }
 
   /**
@@ -651,42 +598,19 @@ public class ReplicationSourceManager implements ReplicationListener {
    * @param source the replication source
    */
   @VisibleForTesting
-  void cleanOldLogs(String log, boolean inclusive, ReplicationSourceInterface source) {
-    String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(log);
-    if (source.isRecovered()) {
-      NavigableSet<String> wals = walsByIdRecoveredQueues.get(source.getQueueId()).get(logPrefix);
-      if (wals != null) {
-        NavigableSet<String> walsToRemove = wals.headSet(log, inclusive);
-        if (walsToRemove.isEmpty()) {
-          return;
-        }
-        cleanOldLogs(walsToRemove, source);
-        walsToRemove.clear();
-      }
-    } else {
-      NavigableSet<String> wals;
-      NavigableSet<String> walsToRemove;
-      // synchronized on walsById to avoid race with preLogRoll
-      synchronized (this.walsById) {
-        wals = walsById.get(source.getQueueId()).get(logPrefix);
-        if (wals == null) {
-          return;
-        }
-        walsToRemove = wals.headSet(log, inclusive);
-        if (walsToRemove.isEmpty()) {
-          return;
-        }
-        walsToRemove = new TreeSet<>(walsToRemove);
-      }
-      // cleanOldLogs may spend some time, especially for sync replication where we may want to
-      // remove remote wals as the remote cluster may have already been down, so we do it outside
-      // the lock to avoid block preLogRoll
-      cleanOldLogs(walsToRemove, source);
-      // now let's remove the files in the set
-      synchronized (this.walsById) {
-        wals.removeAll(walsToRemove);
-      }
+  void cleanOldLogs(String log, boolean inclusive,
+    ReplicationSourceInterface source) {
+    NavigableSet<String> walsToRemove;
+    synchronized (this.latestPaths) {
+      walsToRemove = getWalsToRemove(source.getQueueId(), log, inclusive);
     }
+    if (walsToRemove.isEmpty()) {
+      return;
+    }
+    // cleanOldLogs may spend some time, especially for sync replication where we may want to
+    // remove remote wals as the remote cluster may have already been down, so we do it outside
+    // the lock to avoid block preLogRoll
+    cleanOldLogs(walsToRemove, source);
   }
 
   private void removeRemoteWALs(String peerId, String remoteWALDir, Collection<String> wals)
@@ -767,37 +691,6 @@ public class ReplicationSourceManager implements ReplicationListener {
         abortAndThrowIOExceptionWhenFail(
           () -> this.queueStorage.addWAL(server.getServerName(), source.getQueueId(), logName));
       }
-
-      // synchronized on walsById to avoid race with cleanOldLogs
-      synchronized (this.walsById) {
-        // Update walsById map
-        for (Map.Entry<String, Map<String, NavigableSet<String>>> entry : this.walsById
-          .entrySet()) {
-          String peerId = entry.getKey();
-          Map<String, NavigableSet<String>> walsByPrefix = entry.getValue();
-          boolean existingPrefix = false;
-          for (Map.Entry<String, NavigableSet<String>> walsEntry : walsByPrefix.entrySet()) {
-            SortedSet<String> wals = walsEntry.getValue();
-            if (this.sources.isEmpty()) {
-              // If there's no slaves, don't need to keep the old wals since
-              // we only consider the last one when a new slave comes in
-              wals.clear();
-            }
-            if (logPrefix.equals(walsEntry.getKey())) {
-              wals.add(logName);
-              existingPrefix = true;
-            }
-          }
-          if (!existingPrefix) {
-            // The new log belongs to a new group, add it into this peer
-            LOG.debug("Start tracking logs for wal group {} for peer {}", logPrefix, peerId);
-            NavigableSet<String> wals = new TreeSet<>();
-            wals.add(logName);
-            walsByPrefix.put(logPrefix, wals);
-          }
-        }
-      }
-
       // Add to latestPaths
       latestPaths.put(logPrefix, newLog);
     }
@@ -969,18 +862,6 @@ public class ReplicationSourceManager implements ReplicationListener {
                 continue;
               }
             }
-            // track sources in walsByIdRecoveredQueues
-            Map<String, NavigableSet<String>> walsByGroup = new HashMap<>();
-            walsByIdRecoveredQueues.put(queueId, walsByGroup);
-            for (String wal : walsSet) {
-              String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal);
-              NavigableSet<String> wals = walsByGroup.get(walPrefix);
-              if (wals == null) {
-                wals = new TreeSet<>();
-                walsByGroup.put(walPrefix, wals);
-              }
-              wals.add(wal);
-            }
             oldsources.add(src);
             LOG.trace("Added source for recovered queue: " + src.getQueueId());
             for (String wal : walsSet) {
@@ -1012,7 +893,18 @@ public class ReplicationSourceManager implements ReplicationListener {
    * @return a sorted set of wal names
    */
   @VisibleForTesting
-  public Map<String, Map<String, NavigableSet<String>>> getWALs() {
+  public Map<String, Map<String, NavigableSet<String>>> getWALs()
+    throws ReplicationException {
+    Map<String, Map<String, NavigableSet<String>>> walsById = new HashMap<>();
+    for (ReplicationSourceInterface source : sources.values()) {
+      String queueId = source.getQueueId();
+      Map<String, NavigableSet<String>> walsByGroup = new HashMap<>();
+      walsById.put(queueId, walsByGroup);
+      for (String wal : this.queueStorage.getWALsInQueue(this.server.getServerName(), queueId)) {
+        String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal);
+        walsByGroup.computeIfAbsent(walPrefix, p -> new TreeSet<>()).add(wal);
+      }
+    }
     return Collections.unmodifiableMap(walsById);
   }
 
@@ -1021,7 +913,18 @@ public class ReplicationSourceManager implements ReplicationListener {
    * @return a sorted set of wal names
    */
   @VisibleForTesting
-  Map<String, Map<String, NavigableSet<String>>> getWalsByIdRecoveredQueues() {
+  Map<String, Map<String, NavigableSet<String>>> getWalsByIdRecoveredQueues()
+    throws ReplicationException {
+    Map<String, Map<String, NavigableSet<String>>> walsByIdRecoveredQueues = new HashMap<>();
+    for (ReplicationSourceInterface source : oldsources) {
+      String queueId = source.getQueueId();
+      Map<String, NavigableSet<String>> walsByGroup = new HashMap<>();
+      walsByIdRecoveredQueues.put(queueId, walsByGroup);
+      for (String wal : this.queueStorage.getWALsInQueue(this.server.getServerName(), queueId)) {
+        String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal);
+        walsByGroup.computeIfAbsent(walPrefix, p -> new TreeSet<>()).add(wal);
+      }
+    }
     return Collections.unmodifiableMap(walsByIdRecoveredQueues);
   }
 
@@ -1200,4 +1103,21 @@ public class ReplicationSourceManager implements ReplicationListener {
   MetricsReplicationGlobalSourceSource getGlobalMetrics() {
     return this.globalMetrics;
   }
+
+  private NavigableSet<String> getWalsToRemove(String queueId, String log, boolean inclusive) {
+    NavigableSet<String> walsToRemove = new TreeSet<>();
+    String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(log);
+    try {
+      this.queueStorage.getWALsInQueue(this.server.getServerName(), queueId).forEach(wal -> {
+        String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal);
+        if (walPrefix.equals(logPrefix)) {
+          walsToRemove.add(wal);
+        }
+      });
+    } catch (ReplicationException e) {
+      // Just log the exception here, as the recovered replication source will try to cleanup again.
+      LOG.warn("Failed to read wals in queue {}", queueId, e);
+    }
+    return walsToRemove.headSet(log, inclusive);
+  }
 }


[hbase] 07/07: HBASE-24684 Fetch ReplicationSink servers list from HMaster instead o… (#2077)

Posted by zg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zghao pushed a commit to branch HBASE-24666
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 827ac96a210ce53ad124f694d0bddb9e048961a1
Author: XinSun <dd...@gmail.com>
AuthorDate: Sun Sep 20 10:54:43 2020 +0800

    HBASE-24684 Fetch ReplicationSink servers list from HMaster instead o… (#2077)
    
    Signed-off-by: Wellington Chevreuil <wc...@apache.org>
---
 .../src/main/protobuf/server/master/Master.proto   |  12 +-
 .../hadoop/hbase/coprocessor/MasterObserver.java   |  16 +++
 .../org/apache/hadoop/hbase/master/HMaster.java    |   5 +
 .../hadoop/hbase/master/MasterCoprocessorHost.java |  18 +++
 .../hadoop/hbase/master/MasterRpcServices.java     |  21 +++
 .../apache/hadoop/hbase/master/MasterServices.java |   6 +
 .../replication/HBaseReplicationEndpoint.java      | 146 +++++++++++++++++++--
 .../regionserver/ReplicationSource.java            |   4 +-
 .../regionserver/ReplicationSourceShipper.java     |   2 +-
 .../hbase/master/MockNoopMasterServices.java       |   5 +
 .../replication/TestHBaseReplicationEndpoint.java  |   5 +
 .../replication/TestReplicationFetchServers.java   | 106 +++++++++++++++
 .../TestGlobalReplicationThrottler.java            |   4 +
 ...stRegionReplicaReplicationEndpointNoMaster.java |   2 +
 .../regionserver/TestReplicationSource.java        |   4 +-
 15 files changed, 337 insertions(+), 19 deletions(-)

diff --git a/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto b/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto
index 118ce77..7dec566 100644
--- a/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto
@@ -717,6 +717,13 @@ message BalancerDecisionsResponse {
   repeated BalancerDecision balancer_decision = 1;
 }
 
+message ListReplicationSinkServersRequest {
+}
+
+message ListReplicationSinkServersResponse {
+  repeated ServerName server_name = 1;
+}
+
 service MasterService {
   /** Used by the client to get the number of regions that have received the updated schema */
   rpc GetSchemaAlterStatus(GetSchemaAlterStatusRequest)
@@ -1146,10 +1153,13 @@ service MasterService {
     returns (RenameRSGroupResponse);
 
   rpc UpdateRSGroupConfig(UpdateRSGroupConfigRequest)
-  returns (UpdateRSGroupConfigResponse);
+    returns (UpdateRSGroupConfigResponse);
 
   rpc GetLogEntries(LogRequest)
     returns(LogEntry);
+
+  rpc ListReplicationSinkServers(ListReplicationSinkServersRequest)
+    returns (ListReplicationSinkServersResponse);
 }
 
 // HBCK Service definitions.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java
index ac35caa..ec009cc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java
@@ -1782,4 +1782,20 @@ public interface MasterObserver {
   default void postHasUserPermissions(ObserverContext<MasterCoprocessorEnvironment> ctx,
       String userName, List<Permission> permissions) throws IOException {
   }
+
+  /**
+   * Called before getting servers for replication sink.
+   * @param ctx the coprocessor instance's environment
+   */
+  default void preListReplicationSinkServers(ObserverContext<MasterCoprocessorEnvironment> ctx)
+    throws IOException {
+  }
+
+  /**
+   * Called after getting servers for replication sink.
+   * @param ctx the coprocessor instance's environment
+   */
+  default void postListReplicationSinkServers(ObserverContext<MasterCoprocessorEnvironment> ctx)
+    throws IOException {
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index e4bd3c5..d1a8280 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -3900,4 +3900,9 @@ public class HMaster extends HRegionServer implements MasterServices {
   public RSGroupInfoManager getRSGroupInfoManager() {
     return rsGroupInfoManager;
   }
+
+  @Override
+  public List<ServerName> listReplicationSinkServers() throws IOException {
+    return this.serverManager.getOnlineServersList();
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
index 01d1a62..f775eba 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
@@ -2038,4 +2038,22 @@ public class MasterCoprocessorHost
       }
     });
   }
+
+  public void preListReplicationSinkServers() throws IOException {
+    execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
+      @Override
+      public void call(MasterObserver observer) throws IOException {
+        observer.preListReplicationSinkServers(this);
+      }
+    });
+  }
+
+  public void postListReplicationSinkServers() throws IOException {
+    execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
+      @Override
+      public void call(MasterObserver observer) throws IOException {
+        observer.postListReplicationSinkServers(this);
+      }
+    });
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
index 37fc589..314a2d6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
@@ -263,6 +263,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamesp
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListReplicationSinkServersRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListReplicationSinkServersResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest;
@@ -3375,4 +3377,23 @@ public class MasterRpcServices extends RSRpcServices implements
       .addAllBalancerDecision(balancerDecisions).build();
   }
 
+  public ListReplicationSinkServersResponse listReplicationSinkServers(
+    RpcController controller, ListReplicationSinkServersRequest request)
+    throws ServiceException {
+    ListReplicationSinkServersResponse.Builder builder =
+      ListReplicationSinkServersResponse.newBuilder();
+    try {
+      if (master.getMasterCoprocessorHost() != null) {
+        master.getMasterCoprocessorHost().preListReplicationSinkServers();
+      }
+      builder.addAllServerName(master.listReplicationSinkServers().stream()
+        .map(ProtobufUtil::toServerName).collect(Collectors.toList()));
+      if (master.getMasterCoprocessorHost() != null) {
+        master.getMasterCoprocessorHost().postListReplicationSinkServers();
+      }
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+    return builder.build();
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
index 908d212..7b21289 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
@@ -553,4 +553,10 @@ public interface MasterServices extends Server {
    * @return The state of the load balancer, or false if the load balancer isn't defined.
    */
   boolean isBalancerOn();
+
+  /**
+   * Get a list of servers' addresses for replication sink.
+   * @return a list of servers' address
+   */
+  List<ServerName> listReplicationSinkServers() throws IOException;
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java
index 8d60f23..3513020 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java
@@ -18,6 +18,9 @@
 
 package org.apache.hadoop.hbase.replication;
 
+import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT;
+import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -28,21 +31,26 @@ import java.util.concurrent.ThreadLocalRandom;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.ChoreService;
 import org.apache.hadoop.hbase.client.AsyncClusterConnection;
 import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
 import org.apache.hadoop.hbase.client.AsyncReplicationServerAdmin;
 import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
 import org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil;
+import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.util.FutureUtils;
 import org.apache.hadoop.hbase.wal.WAL;
-import org.apache.hadoop.hbase.zookeeper.ZKListener;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
+import org.apache.hadoop.hbase.zookeeper.ZKListener;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
+import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.AuthFailedException;
 import org.apache.zookeeper.KeeperException.ConnectionLossException;
@@ -52,6 +60,12 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
+import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListReplicationSinkServersRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListReplicationSinkServersResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
 
 /**
  * A {@link BaseReplicationEndpoint} for replication endpoints whose
@@ -63,6 +77,13 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
 
   private static final Logger LOG = LoggerFactory.getLogger(HBaseReplicationEndpoint.class);
 
+  public static final String FETCH_SERVERS_USE_ZK_CONF_KEY =
+      "hbase.replication.fetch.servers.usezk";
+
+  public static final String FETCH_SERVERS_INTERVAL_CONF_KEY =
+      "hbase.replication.fetch.servers.interval";
+  public static final int DEFAULT_FETCH_SERVERS_INTERVAL = 10 * 60 * 1000; // 10 mins
+
   private ZKWatcher zkw = null;
   private final Object zkwLock = new Object();
 
@@ -94,6 +115,11 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
 
   private List<ServerName> sinkServers = new ArrayList<>(0);
 
+  private AsyncClusterConnection peerConnection;
+  private boolean fetchServersUseZk = false;
+  private FetchServersChore fetchServersChore;
+  private int shortOperationTimeout;
+
   /*
    * Some implementations of HBaseInterClusterReplicationEndpoint may require instantiate different
    * Connection implementations, or initialize it in a different way, so defining createConnection
@@ -129,6 +155,19 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
         LOG.warn("{} Failed to close the connection", ctx.getPeerId());
       }
     }
+    if (fetchServersChore != null) {
+      ChoreService choreService = ctx.getServer().getChoreService();
+      if (null != choreService) {
+        choreService.cancelChore(fetchServersChore);
+      }
+    }
+    if (peerConnection != null) {
+      try {
+        peerConnection.close();
+      } catch (IOException e) {
+        LOG.warn("Attempt to close peerConnection failed.", e);
+      }
+    }
   }
 
   /**
@@ -159,8 +198,27 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
   }
 
   @Override
-  protected void doStart() {
+  protected synchronized void doStart() {
+    this.shortOperationTimeout = ctx.getLocalConfiguration().getInt(
+        HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY, DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT);
     try {
+      if (ctx.getLocalConfiguration().getBoolean(FETCH_SERVERS_USE_ZK_CONF_KEY, false)) {
+        fetchServersUseZk = true;
+      } else {
+        try {
+          if (ReplicationUtils.isPeerClusterSupportReplicationOffload(getPeerConnection())) {
+            fetchServersChore = new FetchServersChore(ctx.getServer(), this);
+            ctx.getServer().getChoreService().scheduleChore(fetchServersChore);
+            fetchServersUseZk = false;
+          } else {
+            fetchServersUseZk = true;
+          }
+        } catch (Throwable t) {
+          fetchServersUseZk = true;
+          LOG.warn("Peer {} try to fetch servers by admin failed. Using zk impl.",
+              ctx.getPeerId(), t);
+        }
+      }
       reloadZkWatcher();
       connectPeerCluster();
       notifyStarted();
@@ -203,7 +261,9 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
       }
       zkw = new ZKWatcher(ctx.getConfiguration(),
           "connection to cluster: " + ctx.getPeerId(), this);
-      zkw.registerListener(new PeerRegionServerListener(this));
+      if (fetchServersUseZk) {
+        zkw.registerListener(new PeerRegionServerListener(this));
+      }
     }
   }
 
@@ -229,11 +289,46 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
   }
 
   /**
+   * Get the connection to peer cluster
+   * @return connection to peer cluster
+   * @throws IOException If anything goes wrong connecting
+   */
+  private synchronized AsyncClusterConnection getPeerConnection() throws IOException {
+    if (peerConnection == null) {
+      Configuration conf = ctx.getConfiguration();
+      peerConnection = ClusterConnectionFactory.createAsyncClusterConnection(conf, null,
+          UserProvider.instantiate(conf).getCurrent());
+    }
+    return peerConnection;
+  }
+
+  /**
+   * Get the list of all the servers that are responsible for replication sink
+   * from the specified peer master
+   * @return list of server addresses or an empty list if the slave is unavailable
+   */
+  protected List<ServerName> fetchSlavesAddresses() {
+    try {
+      AsyncClusterConnection peerConn = getPeerConnection();
+      ServerName master = FutureUtils.get(peerConn.getAdmin().getMaster());
+      MasterService.BlockingInterface masterStub = MasterService.newBlockingStub(
+        peerConn.getRpcClient()
+          .createBlockingRpcChannel(master, User.getCurrent(), shortOperationTimeout));
+      ListReplicationSinkServersResponse resp = masterStub
+        .listReplicationSinkServers(null, ListReplicationSinkServersRequest.newBuilder().build());
+      return ProtobufUtil.toServerNameList(resp.getServerNameList());
+    } catch (ServiceException | IOException e) {
+      LOG.error("Peer {} fetches servers failed", ctx.getPeerId(), e);
+    }
+    return Collections.emptyList();
+  }
+
+  /**
    * Get the list of all the region servers from the specified peer
    *
    * @return list of region server addresses or an empty list if the slave is unavailable
    */
-  protected List<ServerName> fetchSlavesAddresses() {
+  protected List<ServerName> fetchSlavesAddressesByZK() {
     List<String> children = null;
     try {
       synchronized (zkwLock) {
@@ -256,7 +351,12 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
   }
 
   protected synchronized void chooseSinks() {
-    List<ServerName> slaveAddresses = fetchSlavesAddresses();
+    List<ServerName> slaveAddresses = Collections.emptyList();
+    if (fetchServersUseZk) {
+      slaveAddresses = fetchSlavesAddressesByZK();
+    } else {
+      slaveAddresses = fetchSlavesAddresses();
+    }
     if (slaveAddresses.isEmpty()) {
       LOG.warn("No sinks available at peer. Will not be able to replicate");
     }
@@ -287,6 +387,14 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
     return createSinkPeer(serverName);
   }
 
+  private SinkPeer createSinkPeer(ServerName serverName) throws IOException {
+    if (ReplicationUtils.isPeerClusterSupportReplicationOffload(conn)) {
+      return new ReplicationServerSinkPeer(serverName, conn.getReplicationServerAdmin(serverName));
+    } else {
+      return new RegionServerSinkPeer(serverName, conn.getRegionServerAdmin(serverName));
+    }
+  }
+
   /**
    * Report a {@code SinkPeer} as being bad (i.e. an attempt to replicate to it
    * failed). If a single SinkPeer is reported as bad more than
@@ -396,11 +504,23 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
     }
   }
 
-  private SinkPeer createSinkPeer(ServerName serverName) throws IOException {
-    if (ReplicationUtils.isPeerClusterSupportReplicationOffload(conn)) {
-      return new ReplicationServerSinkPeer(serverName, conn.getReplicationServerAdmin(serverName));
-    } else {
-      return new RegionServerSinkPeer(serverName, conn.getRegionServerAdmin(serverName));
+  /**
+   * Chore that will fetch the list of servers from peer master.
+   */
+  public static class FetchServersChore extends ScheduledChore {
+
+    private HBaseReplicationEndpoint endpoint;
+
+    public FetchServersChore(Server server, HBaseReplicationEndpoint endpoint) {
+      super("Peer-" + endpoint.ctx.getPeerId() + "-FetchServersChore", server,
+        server.getConfiguration()
+          .getInt(FETCH_SERVERS_INTERVAL_CONF_KEY, DEFAULT_FETCH_SERVERS_INTERVAL));
+      this.endpoint = endpoint;
+    }
+
+    @Override
+    protected void chore() {
+      endpoint.chooseSinks();
     }
   }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 879c604..2bf575d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -343,9 +343,9 @@ public class ReplicationSource implements ReplicationSourceInterface {
         Threads.setDaemonThreadRunning(
             walReader, Thread.currentThread().getName()
             + ".replicationSource.wal-reader." + walGroupId + "," + queueId,
-          (t,e) -> this.uncaughtException(t, e, this.manager, this.getPeerId()));
+          (t,e) -> this.uncaughtException(t, e, null, this.getPeerId()));
         worker.setWALReader(walReader);
-        worker.startup((t,e) -> this.uncaughtException(t, e, this.manager, this.getPeerId()));
+        worker.startup((t,e) -> this.uncaughtException(t, e, null, this.getPeerId()));
         return worker;
       }
     });
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
index eb558e0..a1983e7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
@@ -369,6 +369,6 @@ public class ReplicationSourceShipper extends Thread {
 
     LOG.trace("Decrementing totalBufferUsed by {}B while stopping Replication WAL Readers.",
       totalToDecrement.longValue());
-    source.getSourceManager().getTotalBufferUsed().addAndGet(-totalToDecrement.longValue());
+    source.controller.getTotalBufferUsed().addAndGet(-totalToDecrement.longValue());
   }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
index 7c65005..947ba5d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
@@ -507,4 +507,9 @@ public class MockNoopMasterServices implements MasterServices {
   public boolean isBalancerOn() {
     return false;
   }
+
+  @Override
+  public List<ServerName> listReplicationSinkServers() throws IOException {
+    return null;
+  }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java
index 4182eaf..6765794 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java
@@ -199,6 +199,11 @@ public class TestHBaseReplicationEndpoint {
     }
 
     @Override
+    public List<ServerName> fetchSlavesAddressesByZK() {
+      return regionServers;
+    }
+
+    @Override
     public boolean replicate(ReplicateContext replicateContext) {
       return false;
     }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationFetchServers.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationFetchServers.java
new file mode 100644
index 0000000..9ceacee
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationFetchServers.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.apache.hadoop.hbase.coprocessor.CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.MasterObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListReplicationSinkServersRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListReplicationSinkServersResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
+
+@Category({ ReplicationTests.class, MediumTests.class })
+public class TestReplicationFetchServers extends TestReplicationBase {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestReplicationFetchServers.class);
+
+  private static AtomicBoolean fetchFlag = new AtomicBoolean(false);
+
+  public static class MyObserver implements MasterCoprocessor, MasterObserver {
+
+    @Override
+    public Optional<MasterObserver> getMasterObserver() {
+      return Optional.of(this);
+    }
+
+    @Override
+    public void postListReplicationSinkServers(ObserverContext<MasterCoprocessorEnvironment> ctx) {
+      fetchFlag.set(true);
+    }
+  }
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    CONF2.set(MASTER_COPROCESSOR_CONF_KEY, MyObserver.class.getName());
+    TestReplicationBase.setUpBeforeClass();
+  }
+
+  @Before
+  public void beforeMethod() {
+    fetchFlag.set(false);
+  }
+
+  @Test
+  public void testMasterListReplicationPeerServers() throws IOException, ServiceException {
+    AsyncClusterConnection conn = UTIL2.getAsyncConnection();
+    ServerName master = UTIL2.getAdmin().getMaster();
+    MasterService.BlockingInterface masterStub = MasterService.newBlockingStub(
+        conn.getRpcClient().createBlockingRpcChannel(master, User.getCurrent(), 1000));
+    ListReplicationSinkServersResponse resp = masterStub.listReplicationSinkServers(
+        null, ListReplicationSinkServersRequest.newBuilder().build());
+    List<ServerName> servers = ProtobufUtil.toServerNameList(resp.getServerNameList());
+    assertFalse(servers.isEmpty());
+    assertTrue(fetchFlag.get());
+  }
+
+  @Test
+  public void testPutData() throws IOException {
+    htable1.put(new Put(row).addColumn(famName, famName, row));
+    UTIL2.waitFor(30000L, () -> !htable2.get(new Get(row)).isEmpty());
+    assertTrue(fetchFlag.get());
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalReplicationThrottler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalReplicationThrottler.java
index 1538fa3..cfc9fa3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalReplicationThrottler.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalReplicationThrottler.java
@@ -118,6 +118,10 @@ public class TestGlobalReplicationThrottler {
 
   @AfterClass
   public static void tearDownAfterClass() throws Exception {
+    Admin admin1 = utility1.getAdmin();
+    admin1.removeReplicationPeer("peer1");
+    admin1.removeReplicationPeer("peer2");
+    admin1.removeReplicationPeer("peer3");
     utility2.shutdownMiniCluster();
     utility1.shutdownMiniCluster();
   }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
index ee1ae5f..c676e30 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
@@ -256,11 +256,13 @@ public class TestRegionReplicaReplicationEndpointNoMaster {
 
     ReplicationEndpoint.Context context = mock(ReplicationEndpoint.Context.class);
     when(context.getConfiguration()).thenReturn(HTU.getConfiguration());
+    when(context.getLocalConfiguration()).thenReturn(HTU.getConfiguration());
     when(context.getMetrics()).thenReturn(mock(MetricsSource.class));
     when(context.getServer()).thenReturn(rs0);
     when(context.getTableDescriptors()).thenReturn(rs0.getTableDescriptors());
     replicator.init(context);
     replicator.startAsync();
+    HTU.waitFor(30000, replicator::isRunning);
 
     //load some data to primary
     HTU.loadNumericRows(table, f, 0, 1000);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
index 5817c00..1d9081a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
@@ -284,7 +284,7 @@ public class TestReplicationSource {
     ReplicationPeer mockPeer = mock(ReplicationPeer.class);
     Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
     Configuration testConf = HBaseConfiguration.create();
-    source.init(testConf, null, mockManager, null, mockPeer, null,
+    source.init(testConf, null, null, mockManager, null, mockPeer, null,
       "testPeer", null, p -> OptionalLong.empty(), mock(MetricsSource.class));
     ReplicationSourceWALReader reader = new ReplicationSourceWALReader(null,
       conf, null, 0, null, source);
@@ -310,7 +310,7 @@ public class TestReplicationSource {
     reader.addEntryToBatch(batch, mockEntry);
     reader.entryBatchQueue.put(batch);
     source.terminate("test");
-    assertEquals(0, source.getSourceManager().getTotalBufferUsed().get());
+    assertEquals(0, source.controller.getTotalBufferUsed().get());
   }
 
   /**


[hbase] 01/07: HBASE-24682 Refactor ReplicationSource#addHFileRefs method: move it to ReplicationSourceManager (#2020)

Posted by zg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zghao pushed a commit to branch HBASE-24666
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit c315e684214e6f18fdfd2156333b0396b7dc7d29
Author: Guanghao Zhang <zg...@apache.org>
AuthorDate: Wed Jul 8 14:29:08 2020 +0800

    HBASE-24682 Refactor ReplicationSource#addHFileRefs method: move it to ReplicationSourceManager (#2020)
    
    Signed-off-by: Wellington Chevreuil <wc...@apache.org>
---
 .../regionserver/ReplicationSource.java            | 38 +----------------
 .../regionserver/ReplicationSourceInterface.java   | 14 -------
 .../regionserver/ReplicationSourceManager.java     | 48 +++++++++++++++++++++-
 .../hbase/replication/ReplicationSourceDummy.java  |  9 +---
 4 files changed, 49 insertions(+), 60 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index bf8127f..2716ade 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -28,7 +28,6 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
@@ -38,6 +37,7 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Predicate;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.mutable.MutableBoolean;
 import org.apache.hadoop.conf.Configuration;
@@ -48,21 +48,17 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableDescriptors;
-import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.RSRpcServices;
 import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
 import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
 import org.apache.hadoop.hbase.replication.ClusterMarkingEntryFilter;
 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
-import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationPeer;
 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
 import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
 import org.apache.hadoop.hbase.replication.WALEntryFilter;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
@@ -280,38 +276,6 @@ public class ReplicationSource implements ReplicationSourceInterface {
     }
   }
 
-  @Override
-  public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
-      throws ReplicationException {
-    String peerId = replicationPeer.getId();
-    Set<String> namespaces = replicationPeer.getNamespaces();
-    Map<TableName, List<String>> tableCFMap = replicationPeer.getTableCFs();
-    if (tableCFMap != null) { // All peers with TableCFs
-      List<String> tableCfs = tableCFMap.get(tableName);
-      if (tableCFMap.containsKey(tableName)
-          && (tableCfs == null || tableCfs.contains(Bytes.toString(family)))) {
-        this.queueStorage.addHFileRefs(peerId, pairs);
-        metrics.incrSizeOfHFileRefsQueue(pairs.size());
-      } else {
-        LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}",
-            tableName, Bytes.toString(family), peerId);
-      }
-    } else if (namespaces != null) { // Only for set NAMESPACES peers
-      if (namespaces.contains(tableName.getNamespaceAsString())) {
-        this.queueStorage.addHFileRefs(peerId, pairs);
-        metrics.incrSizeOfHFileRefsQueue(pairs.size());
-      } else {
-        LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}",
-            tableName, Bytes.toString(family), peerId);
-      }
-    } else {
-      // user has explicitly not defined any table cfs for replication, means replicate all the
-      // data
-      this.queueStorage.addHFileRefs(peerId, pairs);
-      metrics.incrSizeOfHFileRefsQueue(pairs.size());
-    }
-  }
-
   private ReplicationEndpoint createReplicationEndpoint()
       throws InstantiationException, IllegalAccessException, ClassNotFoundException, IOException {
     RegionServerCoprocessorHost rsServerHost = null;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
index 0bd90cf..33a413f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
@@ -29,12 +29,9 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
-import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationPeer;
 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
-import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.yetus.audience.InterfaceAudience;
 
@@ -63,17 +60,6 @@ public interface ReplicationSourceInterface {
   void enqueueLog(Path log);
 
   /**
-   * Add hfile names to the queue to be replicated.
-   * @param tableName Name of the table these files belongs to
-   * @param family Name of the family these files belong to
-   * @param pairs list of pairs of { HFile location in staging dir, HFile path in region dir which
-   *          will be added in the queue for replication}
-   * @throws ReplicationException If failed to add hfile references
-   */
-  void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
-      throws ReplicationException;
-
-  /**
    * Start the replication
    */
   void startup();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 0940b5a..3869857 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -61,6 +61,7 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
 import org.apache.hadoop.hbase.replication.ReplicationTracker;
 import org.apache.hadoop.hbase.replication.ReplicationUtils;
 import org.apache.hadoop.hbase.replication.SyncReplicationState;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider;
@@ -173,6 +174,8 @@ public class ReplicationSourceManager implements ReplicationListener {
   private final long totalBufferLimit;
   private final MetricsReplicationGlobalSourceSource globalMetrics;
 
+  private final Map<String, MetricsSource> sourceMetrics = new HashMap<>();
+
   /**
    * Creates a replication manager and sets the watch on all the other registered region servers
    * @param queueStorage the interface for manipulating replication queues
@@ -355,6 +358,7 @@ public class ReplicationSourceManager implements ReplicationListener {
     ReplicationSourceInterface src = ReplicationSourceFactory.create(conf, queueId);
 
     MetricsSource metrics = new MetricsSource(queueId);
+    sourceMetrics.put(queueId, metrics);
     // init replication source
     src.init(conf, fs, this, queueStorage, replicationPeer, server, queueId, clusterId,
       walFileLengthProvider, metrics);
@@ -1139,7 +1143,49 @@ public class ReplicationSourceManager implements ReplicationListener {
   public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
       throws IOException {
     for (ReplicationSourceInterface source : this.sources.values()) {
-      throwIOExceptionWhenFail(() -> source.addHFileRefs(tableName, family, pairs));
+      throwIOExceptionWhenFail(() -> addHFileRefs(source.getPeerId(), tableName, family, pairs));
+    }
+  }
+
+  /**
+   * Add hfile names to the queue to be replicated.
+   * @param peerId the replication peer id
+   * @param tableName Name of the table these files belongs to
+   * @param family Name of the family these files belong to
+   * @param pairs list of pairs of { HFile location in staging dir, HFile path in region dir which
+   *          will be added in the queue for replication}
+   * @throws ReplicationException If failed to add hfile references
+   */
+  private void addHFileRefs(String peerId, TableName tableName, byte[] family,
+    List<Pair<Path, Path>> pairs) throws ReplicationException {
+    // Only the normal replication source update here, its peerId is equals to queueId.
+    MetricsSource metrics = sourceMetrics.get(peerId);
+    ReplicationPeer replicationPeer = replicationPeers.getPeer(peerId);
+    Set<String> namespaces = replicationPeer.getNamespaces();
+    Map<TableName, List<String>> tableCFMap = replicationPeer.getTableCFs();
+    if (tableCFMap != null) { // All peers with TableCFs
+      List<String> tableCfs = tableCFMap.get(tableName);
+      if (tableCFMap.containsKey(tableName)
+        && (tableCfs == null || tableCfs.contains(Bytes.toString(family)))) {
+        this.queueStorage.addHFileRefs(peerId, pairs);
+        metrics.incrSizeOfHFileRefsQueue(pairs.size());
+      } else {
+        LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}",
+          tableName, Bytes.toString(family), peerId);
+      }
+    } else if (namespaces != null) { // Only for set NAMESPACES peers
+      if (namespaces.contains(tableName.getNamespaceAsString())) {
+        this.queueStorage.addHFileRefs(peerId, pairs);
+        metrics.incrSizeOfHFileRefsQueue(pairs.size());
+      } else {
+        LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}",
+          tableName, Bytes.toString(family), peerId);
+      }
+    } else {
+      // user has explicitly not defined any table cfs for replication, means replicate all the
+      // data
+      this.queueStorage.addHFileRefs(peerId, pairs);
+      metrics.incrSizeOfHFileRefsQueue(pairs.size());
     }
   }
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
index a361c44..781a1da 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
@@ -21,17 +21,16 @@ import java.io.IOException;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
 import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider;
-import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 
 /**
@@ -114,12 +113,6 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
   }
 
   @Override
-  public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> files)
-      throws ReplicationException {
-    return;
-  }
-
-  @Override
   public boolean isPeerEnabled() {
     return true;
   }