You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zg...@apache.org on 2020/09/14 07:00:33 UTC

[hbase] branch HBASE-24666 updated (f3dc6f6 -> c302a9e)

This is an automated email from the ASF dual-hosted git repository.

zghao pushed a change to branch HBASE-24666
in repository https://gitbox.apache.org/repos/asf/hbase.git.


    omit f3dc6f6  HBASE-24982 Disassemble the method replicateWALEntry from AdminService to a new interface ReplicationServerService (#2360)
    omit e053a00  HBASE-24683 Add a basic ReplicationServer which only implement ReplicationSink Service (#2111)
    omit 54e70e5  HBASE-24735: Refactor ReplicationSourceManager: move logPositionAndCleanOldLogs/cleanUpHFileRefs to ReplicationSource inside (#2064)
    omit b659616  HBASE-24681 Remove the cache walsById/walsByIdRecoveredQueues from ReplicationSourceManager (#2019)
    omit 57bafbb  HBASE-24682 Refactor ReplicationSource#addHFileRefs method: move it to ReplicationSourceManager (#2020)
     add a589e55  HBASE-24992 log after Generator success when running ITBLL (#2358)
     add d48c732  HBASE-24602 Add Increment and Append support to CheckAndMutate (#2228)
     add 0d95a8f  HBASE-24979 : Client operation timeout test for batch requests
     add 2250b51  HBASE-24995: MetaFixer fails to fix overlaps when multiple tables have overlaps (#2361)
     add 2e638de  HBASE-24994 Add hedgedReadOpsInCurThread metric (#2365)
     add 2e96a5b  HBASE-24993 Remove OfflineMetaRebuildTestCore (#2359)
     add 0511089  HBASE-25000 Move delete region info related methods to RegionStateStore (#2366)
     add 84a34be  HBASE-24974: Provide a flexibility to print only row key and filter for multiple tables in the WALPrettyPrinter (#2345)
     add 23713f4  HBASE-25005 Refactor CatalogJanitor (#2373)
     add f36c55c  HBASE-24990 Fix empty value of properties 'hbase.replication.source.maxthreads' in hbase-thrift module (#2356)
     add bc15b61  HBASE-24776 [hbtop] Support Batch mode (#2291)
     add ce59a2b  HBASE-25004 : Log RegionTooBusyException details (#2371)
     add bbfbe33  HBASE-24958 CompactingMemStore.timeOfOldestEdit error update (#2321)
     add 9c5dbb2  HBASE-24764: Add support of adding default peer configs via hbase-site.xml for all replication peers. (#2284)
     add 3c00ff5  HBASE-23643 Add document for "HBASE-23065 [hbtop] Top-N heavy hitter user and client drill downs" (#2381)
     add e5ca9ad  HBASE-25008 Add document for "HBASE-24776 [hbtop] Support Batch mode" (#2382)
     add fe77630  HBASE-25006 Make the cost functions optional for StochastoicBalancer
     add a393fc5  HBASE-25016 Should close ResultScanner in MetaTableAccessor.scanByRegionEncodedName
     add 0f00e1c  HBASE-25018 EOM cleanup (#2391)
     new 8e1bf74  HBASE-24682 Refactor ReplicationSource#addHFileRefs method: move it to ReplicationSourceManager (#2020)
     new d90c20e  HBASE-24681 Remove the cache walsById/walsByIdRecoveredQueues from ReplicationSourceManager (#2019)
     new d490870  HBASE-24735: Refactor ReplicationSourceManager: move logPositionAndCleanOldLogs/cleanUpHFileRefs to ReplicationSource inside (#2064)
     new 2e95e75  HBASE-24683 Add a basic ReplicationServer which only implement ReplicationSink Service (#2111)
     new c302a9e  HBASE-24982 Disassemble the method replicateWALEntry from AdminService to a new interface ReplicationServerService (#2360)

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (f3dc6f6)
            \
             N -- N -- N   refs/heads/HBASE-24666 (c302a9e)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/hadoop/hbase/MetaTableAccessor.java |  75 +-
 .../apache/hadoop/hbase/client/CheckAndMutate.java |  33 +-
 .../hadoop/hbase/client/RawAsyncTableImpl.java     |   7 +-
 .../hadoop/hbase/client/RegionInfoBuilder.java     |   6 +-
 .../replication/ReplicationPeerConfigUtil.java     |  38 +
 .../hadoop/hbase/shaded/protobuf/ProtobufUtil.java |   4 +
 .../hbase/shaded/protobuf/RequestConverter.java    |  63 +-
 .../hbase/shaded/protobuf/ResponseConverter.java   |   4 +-
 .../regionserver/MetricsRegionServerSource.java    |   3 +
 .../MetricsRegionServerSourceImpl.java             |   2 +
 .../regionserver/MetricsRegionServerWrapper.java   |   5 +
 .../java/org/apache/hadoop/hbase/hbtop/HBTop.java  | 164 +++-
 .../hbase/hbtop/screen/AbstractScreenView.java     |   1 +
 .../apache/hadoop/hbase/hbtop/screen/Screen.java   |  26 +-
 .../hbase/hbtop/screen/top/TopScreenModel.java     |  49 +-
 .../hbase/hbtop/screen/top/TopScreenPresenter.java |  48 +-
 .../hbase/hbtop/screen/top/TopScreenView.java      |  48 +-
 .../hadoop/hbase/hbtop/terminal/Terminal.java      |   2 +-
 .../hbtop/terminal/impl/batch/BatchTerminal.java   |  80 ++
 .../terminal/impl/batch/BatchTerminalPrinter.java  |  33 +-
 .../hbase/hbtop/screen/top/TestTopScreenModel.java |  16 +-
 .../hbtop/screen/top/TestTopScreenPresenter.java   |   3 +-
 .../hbase/test/IntegrationTestBigLinkedList.java   |   1 +
 .../replication/TestZKReplicationPeerStorage.java  |  47 ++
 .../hadoop/hbase/coprocessor/RegionObserver.java   |  19 +-
 .../apache/hadoop/hbase/master/CatalogJanitor.java | 819 --------------------
 .../org/apache/hadoop/hbase/master/HMaster.java    |   4 +-
 .../hadoop/hbase/master/MasterRpcServices.java     |   1 +
 .../apache/hadoop/hbase/master/MasterServices.java |   1 +
 .../hbase/master/assignment/GCRegionProcedure.java |   3 +-
 .../hbase/master/assignment/RegionStateStore.java  |  53 +-
 .../master/balancer/StochasticLoadBalancer.java    |  32 +-
 .../hbase/master/janitor/CatalogJanitor.java       | 456 +++++++++++
 .../hbase/master/{ => janitor}/MetaFixer.java      |  46 +-
 .../apache/hadoop/hbase/master/janitor/Report.java | 128 ++++
 .../hbase/master/janitor/ReportMakingVisitor.java  | 294 ++++++++
 .../master/procedure/RestoreSnapshotProcedure.java |   8 +-
 .../master/replication/ReplicationPeerManager.java |   5 +
 .../hbase/regionserver/AbstractMemStore.java       |   6 +-
 .../hbase/regionserver/CompactingMemStore.java     |   1 +
 .../hadoop/hbase/regionserver/DefaultMemStore.java |   1 +
 .../apache/hadoop/hbase/regionserver/HRegion.java  | 837 +++++++++++----------
 .../MetricsRegionServerWrapperImpl.java            |   5 +
 .../regionserver/MiniBatchOperationInProgress.java |  18 +
 .../hadoop/hbase/regionserver/OperationStatus.java |  26 +-
 .../hadoop/hbase/regionserver/RSRpcServices.java   | 162 +---
 .../apache/hadoop/hbase/regionserver/Region.java   |   3 +-
 .../hbase/security/access/AccessController.java    |   5 +-
 .../apache/hadoop/hbase/util/HBaseFsckRepair.java  |   3 +-
 .../apache/hadoop/hbase/wal/WALPrettyPrinter.java  |  62 +-
 .../main/resources/hbase-webapps/master/hbck.jsp   |   6 +-
 .../hadoop/hbase/TestClientOperationTimeout.java   |  85 ++-
 .../org/apache/hadoop/hbase/client/TestAdmin1.java |   2 +-
 .../hbase/client/TestAsyncRegionAdminApi2.java     |   2 +-
 .../apache/hadoop/hbase/client/TestAsyncTable.java | 230 ++++++
 .../hadoop/hbase/client/TestAsyncTableBatch.java   |  35 +-
 .../hadoop/hbase/client/TestCheckAndMutate.java    | 219 ++++++
 .../hadoop/hbase/client/TestFromClientSide3.java   |  35 +-
 .../hbase/master/MockNoopMasterServices.java       |   1 +
 .../hbase/master/TestMasterChoreScheduled.java     |   1 +
 .../master/{ => janitor}/TestCatalogJanitor.java   |  44 +-
 .../{ => janitor}/TestCatalogJanitorCluster.java   | 141 ++--
 .../TestCatalogJanitorInMemoryStates.java          |  66 +-
 .../hbase/master/{ => janitor}/TestMetaFixer.java  |  72 +-
 .../{ => janitor}/TestMetaFixerNoCluster.java      |  58 +-
 .../MetricsRegionServerWrapperStub.java            |   5 +
 .../hbase/regionserver/TestAtomicOperation.java    |   2 +-
 .../hbase/regionserver/TestCompactingMemStore.java |  24 +
 .../hadoop/hbase/regionserver/TestHRegion.java     | 137 +++-
 .../hbase/replication/TestMasterReplication.java   |  80 ++
 .../util/hbck/OfflineMetaRebuildTestCore.java      | 294 --------
 hbase-thrift/src/test/resources/hbase-site.xml     |   2 +-
 src/main/asciidoc/_chapters/community.adoc         |   3 -
 src/main/asciidoc/_chapters/configuration.adoc     |  64 +-
 src/main/asciidoc/_chapters/hbtop.adoc             |  46 +-
 src/main/asciidoc/_chapters/performance.adoc       |   2 +
 76 files changed, 3264 insertions(+), 2148 deletions(-)
 create mode 100644 hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/terminal/impl/batch/BatchTerminal.java
 copy hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/monkies/CalmChaosMonkey.java => hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/terminal/impl/batch/BatchTerminalPrinter.java (60%)
 delete mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java
 create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/janitor/CatalogJanitor.java
 rename hbase-server/src/main/java/org/apache/hadoop/hbase/master/{ => janitor}/MetaFixer.java (89%)
 create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/janitor/Report.java
 create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/janitor/ReportMakingVisitor.java
 rename hbase-server/src/test/java/org/apache/hadoop/hbase/master/{ => janitor}/TestCatalogJanitor.java (95%)
 rename hbase-server/src/test/java/org/apache/hadoop/hbase/master/{ => janitor}/TestCatalogJanitorCluster.java (74%)
 rename hbase-server/src/test/java/org/apache/hadoop/hbase/master/{ => janitor}/TestCatalogJanitorInMemoryStates.java (75%)
 rename hbase-server/src/test/java/org/apache/hadoop/hbase/master/{ => janitor}/TestMetaFixer.java (85%)
 rename hbase-server/src/test/java/org/apache/hadoop/hbase/master/{ => janitor}/TestMetaFixerNoCluster.java (73%)
 delete mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/OfflineMetaRebuildTestCore.java


[hbase] 05/05: HBASE-24982 Disassemble the method replicateWALEntry from AdminService to a new interface ReplicationServerService (#2360)

Posted by zg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zghao pushed a commit to branch HBASE-24666
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit c302a9e600802312fad79f4edba0b2e2ecf840a8
Author: XinSun <dd...@gmail.com>
AuthorDate: Wed Sep 9 15:00:37 2020 +0800

    HBASE-24982 Disassemble the method replicateWALEntry from AdminService to a new interface ReplicationServerService (#2360)
    
    Signed-off-by: Wellington Chevreuil <wc...@apache.org>
---
 .../hadoop/hbase/client/AsyncConnectionImpl.java   |  16 ++
 .../server/replication/ReplicationServer.proto     |  32 ++++
 .../hadoop/hbase/replication/ReplicationUtils.java |  19 ++
 .../hbase/client/AsyncClusterConnection.java       |   5 +
 .../hbase/client/AsyncClusterConnectionImpl.java   |   5 +
 .../hbase/client/AsyncReplicationServerAdmin.java  |  80 +++++++++
 .../hbase/protobuf/ReplicationProtbufUtil.java     |  18 ++
 .../hadoop/hbase/regionserver/RSRpcServices.java   |  10 +-
 .../replication/ReplicationServerRpcServices.java  | 200 +--------------------
 .../HBaseInterClusterReplicationEndpoint.java      |   8 +-
 .../regionserver/ReplicationSinkManager.java       |  58 +++++-
 .../hbase/client/DummyAsyncClusterConnection.java  |   5 +
 .../hbase/replication/SyncReplicationTestBase.java |   8 +-
 .../hbase/replication/TestReplicationServer.java   |  43 ++++-
 .../regionserver/TestReplicationSinkManager.java   |  17 +-
 15 files changed, 294 insertions(+), 230 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
index 896b7dc..c9de408 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
@@ -66,6 +66,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminServic
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationServerProtos.ReplicationServerService;
 
 /**
  * The implementation of AsyncConnection.
@@ -107,6 +108,8 @@ class AsyncConnectionImpl implements AsyncConnection {
 
   private final ConcurrentMap<String, ClientService.Interface> rsStubs = new ConcurrentHashMap<>();
   private final ConcurrentMap<String, AdminService.Interface> adminSubs = new ConcurrentHashMap<>();
+  private final ConcurrentMap<String, ReplicationServerService.Interface> replStubs =
+      new ConcurrentHashMap<>();
 
   private final AtomicReference<MasterService.Interface> masterStub = new AtomicReference<>();
 
@@ -266,12 +269,25 @@ class AsyncConnectionImpl implements AsyncConnection {
     return AdminService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout));
   }
 
+  private ReplicationServerService.Interface createReplicationServerStub(ServerName serverName)
+      throws IOException {
+    return ReplicationServerService.newStub(
+        rpcClient.createRpcChannel(serverName, user, rpcTimeout));
+  }
+
   AdminService.Interface getAdminStub(ServerName serverName) throws IOException {
     return ConcurrentMapUtils.computeIfAbsentEx(adminSubs,
       getStubKey(AdminService.Interface.class.getSimpleName(), serverName, hostnameCanChange),
       () -> createAdminServerStub(serverName));
   }
 
+  ReplicationServerService.Interface getReplicationServerStub(ServerName serverName)
+      throws IOException {
+    return ConcurrentMapUtils.computeIfAbsentEx(replStubs,
+        getStubKey(ReplicationServerService.Interface.class.getSimpleName(), serverName,
+            hostnameCanChange), () -> createReplicationServerStub(serverName));
+  }
+
   CompletableFuture<MasterService.Interface> getMasterStub() {
     return ConnectionUtils.getOrFetch(masterStub, masterStubMakeFuture, false, () -> {
       CompletableFuture<MasterService.Interface> future = new CompletableFuture<>();
diff --git a/hbase-protocol-shaded/src/main/protobuf/server/replication/ReplicationServer.proto b/hbase-protocol-shaded/src/main/protobuf/server/replication/ReplicationServer.proto
new file mode 100644
index 0000000..ed334c4
--- /dev/null
+++ b/hbase-protocol-shaded/src/main/protobuf/server/replication/ReplicationServer.proto
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+syntax = "proto2";
+package hbase.pb;
+
+option java_package = "org.apache.hadoop.hbase.shaded.protobuf.generated";
+option java_outer_classname = "ReplicationServerProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+option optimize_for = SPEED;
+
+import "server/region/Admin.proto";
+
+service ReplicationServerService {
+  rpc ReplicateWALEntry(ReplicateWALEntryRequest)
+    returns(ReplicateWALEntryResponse);
+}
\ No newline at end of file
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
index a786206..7bafbc2 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
@@ -30,6 +30,9 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.AsyncAdmin;
+import org.apache.hadoop.hbase.client.AsyncConnection;
+import org.apache.hadoop.hbase.util.FutureUtils;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -212,4 +215,20 @@ public final class ReplicationUtils {
     }
     return initialValue * HConstants.RETRY_BACKOFF[ntries];
   }
+
+  /**
+   * Check whether peer cluster supports replication offload.
+   * @param peerConn connection for peer cluster
+   * @return true if peer cluster version >= 3
+   * @throws IOException exception
+   */
+  public static boolean isPeerClusterSupportReplicationOffload(AsyncConnection peerConn)
+      throws IOException {
+    AsyncAdmin admin = peerConn.getAdmin();
+    String version = FutureUtils.get(admin.getClusterMetrics()).getHBaseVersion();
+    if (Integer.parseInt(version.split("\\.")[0]) >= 3) {
+      return true;
+    }
+    return false;
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
index 92118ac..b6a3b97 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
@@ -42,6 +42,11 @@ public interface AsyncClusterConnection extends AsyncConnection {
   AsyncRegionServerAdmin getRegionServerAdmin(ServerName serverName);
 
   /**
+   * Get the admin service for the give replication server.
+   */
+  AsyncReplicationServerAdmin getReplicationServerAdmin(ServerName serverName);
+
+  /**
    * Get the nonce generator for this connection.
    */
   NonceGenerator getNonceGenerator();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java
index 39fc3a2..e4c2ee3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java
@@ -71,6 +71,11 @@ class AsyncClusterConnectionImpl extends AsyncConnectionImpl implements AsyncClu
   }
 
   @Override
+  public AsyncReplicationServerAdmin getReplicationServerAdmin(ServerName serverName) {
+    return new AsyncReplicationServerAdmin(serverName, this);
+  }
+
+  @Override
   public CompletableFuture<FlushRegionResponse> flush(byte[] regionName,
       boolean writeFlushWALMarker) {
     RawAsyncHBaseAdmin admin = (RawAsyncHBaseAdmin) getAdmin();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncReplicationServerAdmin.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncReplicationServerAdmin.java
new file mode 100644
index 0000000..7511a64
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncReplicationServerAdmin.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationServerProtos.ReplicationServerService;
+
+/**
+ * A simple wrapper of the {@link ReplicationServerService} for a replication server.
+ * <p/>
+ * Notice that there is no retry, and this is intentional.
+ */
+@InterfaceAudience.Private
+public class AsyncReplicationServerAdmin {
+
+  private final ServerName server;
+
+  private final AsyncConnectionImpl conn;
+
+  AsyncReplicationServerAdmin(ServerName server, AsyncConnectionImpl conn) {
+    this.server = server;
+    this.conn = conn;
+  }
+
+  @FunctionalInterface
+  private interface RpcCall<RESP> {
+    void call(ReplicationServerService.Interface stub, HBaseRpcController controller,
+        RpcCallback<RESP> done);
+  }
+
+  private <RESP> CompletableFuture<RESP> call(RpcCall<RESP> rpcCall, CellScanner cellScanner) {
+    CompletableFuture<RESP> future = new CompletableFuture<>();
+    HBaseRpcController controller = conn.rpcControllerFactory.newController(cellScanner);
+    try {
+      rpcCall.call(conn.getReplicationServerStub(server), controller, resp -> {
+        if (controller.failed()) {
+          future.completeExceptionally(controller.getFailed());
+        } else {
+          future.complete(resp);
+        }
+      });
+    } catch (IOException e) {
+      future.completeExceptionally(e);
+    }
+    return future;
+  }
+
+  public CompletableFuture<AdminProtos.ReplicateWALEntryResponse> replicateWALEntry(
+      AdminProtos.ReplicateWALEntryRequest request, CellScanner cellScanner, int timeout) {
+    return call((stub, controller, done) -> {
+      controller.setCallTimeout(timeout);
+      stub.replicateWALEntry(controller, request, done);
+    }, cellScanner);
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
index 4e2e577..8995d77 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.PrivateCellUtil;
 import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
+import org.apache.hadoop.hbase.client.AsyncReplicationServerAdmin;
 import org.apache.hadoop.hbase.io.SizedCellScanner;
 import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
 import org.apache.hadoop.hbase.util.FutureUtils;
@@ -62,6 +63,23 @@ public class ReplicationProtbufUtil {
   }
 
   /**
+   * A helper to replicate a list of WAL entries using replication server admin
+   * @param admin the replication server admin
+   * @param entries Array of WAL entries to be replicated
+   * @param replicationClusterId Id which will uniquely identify source cluster FS client
+   *          configurations in the replication configuration directory
+   * @param sourceBaseNamespaceDir Path to source cluster base namespace directory
+   * @param sourceHFileArchiveDir Path to the source cluster hfile archive directory
+   */
+  public static void replicateWALEntry(AsyncReplicationServerAdmin admin, Entry[] entries,
+      String replicationClusterId, Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir,
+      int timeout) throws IOException {
+    Pair<ReplicateWALEntryRequest, CellScanner> p = buildReplicateWALEntryRequest(entries, null,
+        replicationClusterId, sourceBaseNamespaceDir, sourceHFileArchiveDir);
+    FutureUtils.get(admin.replicateWALEntry(p.getFirst(), p.getSecond(), timeout));
+  }
+
+  /**
    * Create a new ReplicateWALEntryRequest from a list of WAL entries
    * @param entries the WAL entries to be replicated
    * @return a pair of ReplicateWALEntryRequest and a CellScanner over all the WALEdit values found.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 7f71eb9..95de7ae 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -254,6 +254,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuo
 import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse.TableQuotaSnapshot;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationServerProtos.ReplicationServerService;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
@@ -267,7 +268,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDe
 @SuppressWarnings("deprecation")
 public class RSRpcServices implements HBaseRPCErrorHandler,
     AdminService.BlockingInterface, ClientService.BlockingInterface, PriorityFunction,
-    ConfigurationObserver {
+    ConfigurationObserver, ReplicationServerService.BlockingInterface {
   protected static final Logger LOG = LoggerFactory.getLogger(RSRpcServices.class);
 
   /** RPC scheduler to use for the region server. */
@@ -1487,8 +1488,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     }
     if (admin) {
       bssi.add(new BlockingServiceAndInterface(
-      AdminService.newReflectiveBlockingService(this),
-      AdminService.BlockingInterface.class));
+          AdminService.newReflectiveBlockingService(this),
+          AdminService.BlockingInterface.class));
+      bssi.add(new BlockingServiceAndInterface(
+          ReplicationServerService.newReflectiveBlockingService(this),
+          ReplicationServerService.BlockingInterface.class));
     }
     return new org.apache.hbase.thirdparty.com.google.common.collect.
         ImmutableList.Builder<BlockingServiceAndInterface>().addAll(bssi).build();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationServerRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationServerRpcServices.java
index 1b9b699..15d4f8c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationServerRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationServerRpcServices.java
@@ -27,14 +27,12 @@ import java.util.concurrent.atomic.LongAdder;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.CellScanner;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.client.ConnectionUtils;
 import org.apache.hadoop.hbase.io.ByteBuffAllocator;
 import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
 import org.apache.hadoop.hbase.ipc.HBaseRpcController;
 import org.apache.hadoop.hbase.ipc.PriorityFunction;
-import org.apache.hadoop.hbase.ipc.QosPriority;
 import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
 import org.apache.hadoop.hbase.ipc.RpcServerFactory;
 import org.apache.hadoop.hbase.ipc.RpcServerInterface;
@@ -58,53 +56,11 @@ import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponseRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponses;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SlowLogResponseRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SlowLogResponses;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationServerProtos.ReplicationServerService;
 
 import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
 import org.apache.hbase.thirdparty.com.google.protobuf.Message;
@@ -117,7 +73,7 @@ import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
 @InterfaceAudience.Private
 @SuppressWarnings("deprecation")
 public class ReplicationServerRpcServices implements HBaseRPCErrorHandler,
-    AdminService.BlockingInterface, PriorityFunction {
+    ReplicationServerService.BlockingInterface, PriorityFunction {
 
   protected static final Logger LOG = LoggerFactory.getLogger(ReplicationServerRpcServices.class);
 
@@ -256,8 +212,8 @@ public class ReplicationServerRpcServices implements HBaseRPCErrorHandler,
   protected List<BlockingServiceAndInterface> getServices() {
     List<BlockingServiceAndInterface> bssi = new ArrayList<>();
     bssi.add(new BlockingServiceAndInterface(
-      AdminService.newReflectiveBlockingService(this),
-      AdminService.BlockingInterface.class));
+      ReplicationServerService.newReflectiveBlockingService(this),
+        ReplicationServerService.BlockingInterface.class));
     return new ImmutableList.Builder<BlockingServiceAndInterface>().addAll(bssi).build();
   }
 
@@ -325,154 +281,6 @@ public class ReplicationServerRpcServices implements HBaseRPCErrorHandler,
     }
   }
 
-  @Override
-  public GetRegionInfoResponse getRegionInfo(RpcController controller, GetRegionInfoRequest request)
-      throws ServiceException {
-    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
-  }
-
-  @Override
-  public GetStoreFileResponse getStoreFile(RpcController controller, GetStoreFileRequest request)
-      throws ServiceException {
-    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
-  }
-
-  @Override
-  public GetOnlineRegionResponse getOnlineRegion(RpcController controller,
-      GetOnlineRegionRequest request) throws ServiceException {
-    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
-  }
-
-  @Override
-  public OpenRegionResponse openRegion(RpcController controller, OpenRegionRequest request)
-      throws ServiceException {
-    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
-  }
-
-  @Override
-  public WarmupRegionResponse warmupRegion(RpcController controller, WarmupRegionRequest request)
-      throws ServiceException {
-    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
-  }
-
-  @Override
-  public CloseRegionResponse closeRegion(RpcController controller, CloseRegionRequest request)
-      throws ServiceException {
-    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
-  }
-
-  @Override
-  public FlushRegionResponse flushRegion(RpcController controller, FlushRegionRequest request)
-      throws ServiceException {
-    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
-  }
-
-  @Override
-  public CompactionSwitchResponse compactionSwitch(RpcController controller,
-      CompactionSwitchRequest request) throws ServiceException {
-    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
-  }
-
-  @Override
-  public CompactRegionResponse compactRegion(RpcController controller,
-      CompactRegionRequest request) throws ServiceException {
-    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
-  }
-
-  @Override
-  public ReplicateWALEntryResponse replay(RpcController controller,
-      ReplicateWALEntryRequest request) throws ServiceException {
-    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
-  }
-
-  @Override
-  public RollWALWriterResponse rollWALWriter(RpcController controller, RollWALWriterRequest request)
-      throws ServiceException {
-    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
-  }
-
-  @Override
-  public GetServerInfoResponse getServerInfo(RpcController controller, GetServerInfoRequest request)
-      throws ServiceException {
-    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
-  }
-
-  /**
-   * Stop the replication server.
-   *
-   * @param controller the RPC controller
-   * @param request the request
-   */
-  @Override
-  @QosPriority(priority=HConstants.ADMIN_QOS)
-  public StopServerResponse stopServer(final RpcController controller,
-      final StopServerRequest request) {
-    requestCount.increment();
-    String reason = request.getReason();
-    replicationServer.stop(reason);
-    return StopServerResponse.newBuilder().build();
-  }
-
-  @Override
-  public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller,
-      UpdateFavoredNodesRequest request) throws ServiceException {
-    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
-  }
-
-  @Override
-  public UpdateConfigurationResponse updateConfiguration(RpcController controller,
-      UpdateConfigurationRequest request) throws ServiceException {
-    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
-  }
-
-  @Override
-  public GetRegionLoadResponse getRegionLoad(RpcController controller,
-      GetRegionLoadRequest request) throws ServiceException {
-    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
-  }
-
-  @Override
-  public ClearCompactionQueuesResponse clearCompactionQueues(RpcController controller,
-      ClearCompactionQueuesRequest request) throws ServiceException {
-    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
-  }
-
-  @Override
-  public ClearRegionBlockCacheResponse clearRegionBlockCache(RpcController controller,
-      ClearRegionBlockCacheRequest request) throws ServiceException {
-    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
-  }
-
-  @Override
-  public GetSpaceQuotaSnapshotsResponse getSpaceQuotaSnapshots(RpcController controller,
-      GetSpaceQuotaSnapshotsRequest request) throws ServiceException {
-    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
-  }
-
-  @Override
-  public ExecuteProceduresResponse executeProcedures(RpcController controller,
-      ExecuteProceduresRequest request) throws ServiceException {
-    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
-  }
-
-  @Override
-  public SlowLogResponses getSlowLogResponses(RpcController controller,
-      SlowLogResponseRequest request) throws ServiceException {
-    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
-  }
-
-  @Override
-  public SlowLogResponses getLargeLogResponses(RpcController controller,
-      SlowLogResponseRequest request) throws ServiceException {
-    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
-  }
-
-  @Override
-  public ClearSlowLogResponses clearSlowLogsResponses(RpcController controller,
-      ClearSlowLogResponseRequest request) throws ServiceException {
-    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
-  }
-
   protected AccessChecker getAccessChecker() {
     return accessChecker;
   }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
index 6a407e2..20d870a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
@@ -51,12 +51,10 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.AsyncClusterConnection;
-import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
 import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.ipc.RpcServer;
-import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
 import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
 import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
 import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
@@ -624,11 +622,9 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
           logPeerId(), entriesHashCode, entries.size(), size, replicationClusterId);
       }
       sinkPeer = replicationSinkMgr.getReplicationSink();
-      AsyncRegionServerAdmin rsAdmin = sinkPeer.getRegionServer();
       try {
-        ReplicationProtbufUtil.replicateWALEntry(rsAdmin,
-          entries.toArray(new Entry[entries.size()]), replicationClusterId, baseNamespaceDir,
-          hfileArchiveDir, timeout);
+        sinkPeer.replicateWALEntry(entries.toArray(new Entry[entries.size()]), replicationClusterId,
+            baseNamespaceDir, hfileArchiveDir, timeout);
         if (LOG.isTraceEnabled()) {
           LOG.trace("{} Completed replicating batch {}", logPeerId(), entriesHashCode);
         }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java
index db12dc0..f54b986 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java
@@ -23,10 +23,15 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.client.AsyncClusterConnection;
 import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
+import org.apache.hadoop.hbase.client.AsyncReplicationServerAdmin;
+import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
 import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
+import org.apache.hadoop.hbase.replication.ReplicationUtils;
+import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -110,7 +115,7 @@ public class ReplicationSinkManager {
       throw new IOException("No replication sinks are available");
     }
     ServerName serverName = sinks.get(ThreadLocalRandom.current().nextInt(sinks.size()));
-    return new SinkPeer(serverName, conn.getRegionServerAdmin(serverName));
+    return createSinkPeer(serverName);
   }
 
   /**
@@ -173,21 +178,60 @@ public class ReplicationSinkManager {
    * Wraps a replication region server sink to provide the ability to identify
    * it.
    */
-  public static class SinkPeer {
+  public static abstract class SinkPeer {
     private ServerName serverName;
-    private AsyncRegionServerAdmin regionServer;
 
-    public SinkPeer(ServerName serverName, AsyncRegionServerAdmin regionServer) {
+    public SinkPeer(ServerName serverName) {
       this.serverName = serverName;
-      this.regionServer = regionServer;
     }
 
     ServerName getServerName() {
       return serverName;
     }
 
-    public AsyncRegionServerAdmin getRegionServer() {
-      return regionServer;
+    public abstract void replicateWALEntry(WAL.Entry[] entries, String replicationClusterId,
+        Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir, int timeout) throws IOException;
+  }
+
+  public static class RegionServerSinkPeer extends SinkPeer {
+
+    private AsyncRegionServerAdmin regionServer;
+
+    public RegionServerSinkPeer(ServerName serverName,
+        AsyncRegionServerAdmin replicationServer) {
+      super(serverName);
+      this.regionServer = replicationServer;
+    }
+
+    public void replicateWALEntry(WAL.Entry[] entries, String replicationClusterId,
+        Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir, int timeout) throws IOException {
+      ReplicationProtbufUtil.replicateWALEntry(regionServer, entries, replicationClusterId,
+          sourceBaseNamespaceDir, sourceHFileArchiveDir, timeout);
+    }
+  }
+
+  public static class ReplicationServerSinkPeer extends SinkPeer {
+
+    private AsyncReplicationServerAdmin replicationServer;
+
+    public ReplicationServerSinkPeer(ServerName serverName,
+        AsyncReplicationServerAdmin replicationServer) {
+      super(serverName);
+      this.replicationServer = replicationServer;
+    }
+
+    public void replicateWALEntry(WAL.Entry[] entries, String replicationClusterId,
+        Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir, int timeout) throws IOException {
+      ReplicationProtbufUtil.replicateWALEntry(replicationServer, entries, replicationClusterId,
+          sourceBaseNamespaceDir, sourceHFileArchiveDir, timeout);
+    }
+  }
+
+  private SinkPeer createSinkPeer(ServerName serverName) throws IOException {
+    if (ReplicationUtils.isPeerClusterSupportReplicationOffload(conn)) {
+      return new ReplicationServerSinkPeer(serverName, conn.getReplicationServerAdmin(serverName));
+    } else {
+      return new RegionServerSinkPeer(serverName, conn.getRegionServerAdmin(serverName));
     }
   }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java
index 8755749..5af4086 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java
@@ -109,6 +109,11 @@ public class DummyAsyncClusterConnection implements AsyncClusterConnection {
   }
 
   @Override
+  public AsyncReplicationServerAdmin getReplicationServerAdmin(ServerName serverName) {
+    return null;
+  }
+
+  @Override
   public NonceGenerator getNonceGenerator() {
     return null;
   }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
index f11bd49..9ed584b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
@@ -267,13 +267,13 @@ public class SyncReplicationTestBase {
     }
     if (!expectedRejection) {
       ReplicationProtbufUtil.replicateWALEntry(
-        connection.getRegionServerAdmin(regionServer.getServerName()), entries, null, null, null,
-        HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT);
+          connection.getReplicationServerAdmin(regionServer.getServerName()), entries, null, null,
+          null, HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT);
     } else {
       try {
         ReplicationProtbufUtil.replicateWALEntry(
-          connection.getRegionServerAdmin(regionServer.getServerName()), entries, null, null, null,
-          HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT);
+          connection.getReplicationServerAdmin(regionServer.getServerName()), entries, null, null,
+            null, HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT);
         fail("Should throw IOException when sync-replication state is in A or DA");
       } catch (RemoteException e) {
         assertRejection(e.unwrapRemoteException());
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServer.java
index 6a0ef3d..97a1bac 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServer.java
@@ -30,14 +30,16 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.AsyncClusterConnection;
-import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
+import org.apache.hadoop.hbase.client.AsyncReplicationServerAdmin;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.master.HMaster;
-import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.ReplicationServerSinkPeer;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -116,22 +118,47 @@ public class TestReplicationServer {
     TEST_UTIL.deleteTableIfAny(TABLENAME);
   }
 
+  /**
+   * Requests replication server using {@link AsyncReplicationServerAdmin}
+   */
   @Test
   public void testReplicateWAL() throws Exception {
-    AsyncClusterConnection conn = TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().get(0)
-        .getRegionServer().getAsyncClusterConnection();
-    AsyncRegionServerAdmin rsAdmin = conn.getRegionServerAdmin(replicationServer.getServerName());
+    AsyncClusterConnection conn =
+        TEST_UTIL.getHBaseCluster().getMaster().getAsyncClusterConnection();
+    AsyncReplicationServerAdmin replAdmin =
+        conn.getReplicationServerAdmin(replicationServer.getServerName());
+
+    ReplicationServerSinkPeer sinkPeer =
+        new ReplicationServerSinkPeer(replicationServer.getServerName(), replAdmin);
+    replicateWALEntryAndVerify(sinkPeer);
+  }
+
+  /**
+   * Requests region server using {@link AsyncReplicationServerAdmin}
+   */
+  @Test
+  public void testReplicateWAL2() throws Exception {
+    AsyncClusterConnection conn =
+        TEST_UTIL.getHBaseCluster().getMaster().getAsyncClusterConnection();
+    ServerName rs = TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().get(0)
+        .getRegionServer().getServerName();
+    AsyncReplicationServerAdmin replAdmin = conn.getReplicationServerAdmin(rs);
+
+    ReplicationServerSinkPeer sinkPeer = new ReplicationServerSinkPeer(rs, replAdmin);
+    replicateWALEntryAndVerify(sinkPeer);
+  }
 
+  private void replicateWALEntryAndVerify(SinkPeer sinkPeer) throws Exception {
     Entry[] entries = new Entry[BATCH_SIZE];
     for(int i = 0; i < BATCH_SIZE; i++) {
       entries[i] = generateEdit(i, TABLENAME, Bytes.toBytes(i));
     }
 
-    ReplicationProtbufUtil.replicateWALEntry(rsAdmin, entries, replicationClusterId,
-        baseNamespaceDir, hfileArchiveDir, 1000);
+    sinkPeer.replicateWALEntry(entries, replicationClusterId, baseNamespaceDir, hfileArchiveDir,
+        1000);
 
+    Table table = TEST_UTIL.getConnection().getTable(TABLENAME);
     for (int i = 0; i < BATCH_SIZE; i++) {
-      Table table = TEST_UTIL.getConnection().getTable(TABLENAME);
       Result result = table.get(new Get(Bytes.toBytes(i)));
       Cell cell = result.getColumnLatestCell(Bytes.toBytes(FAMILY), Bytes.toBytes(FAMILY));
       assertNotNull(cell);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java
index f8a2ab9..f2a2508 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java
@@ -25,8 +25,9 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.client.AsyncClusterConnection;
-import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
+import org.apache.hadoop.hbase.client.AsyncReplicationServerAdmin;
 import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.ReplicationServerSinkPeer;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
@@ -110,7 +111,7 @@ public class TestReplicationSinkManager {
     // Sanity check
     assertEquals(1, sinkManager.getNumSinks());
 
-    SinkPeer sinkPeer = new SinkPeer(serverNameA, mock(AsyncRegionServerAdmin.class));
+    SinkPeer sinkPeer = mockSinkPeer(serverNameA);
 
     sinkManager.reportBadSink(sinkPeer);
 
@@ -138,7 +139,7 @@ public class TestReplicationSinkManager {
 
     ServerName serverName = sinkManager.getSinksForTesting().get(0);
 
-    SinkPeer sinkPeer = new SinkPeer(serverName, mock(AsyncRegionServerAdmin.class));
+    SinkPeer sinkPeer = mockSinkPeer(serverName);
 
     sinkManager.reportSinkSuccess(sinkPeer); // has no effect, counter does not go negative
     for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD; i++) {
@@ -154,7 +155,7 @@ public class TestReplicationSinkManager {
     //
     serverName = sinkManager.getSinksForTesting().get(0);
 
-    sinkPeer = new SinkPeer(serverName, mock(AsyncRegionServerAdmin.class));
+    sinkPeer = mockSinkPeer(serverName);
     for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD-1; i++) {
       sinkManager.reportBadSink(sinkPeer);
     }
@@ -192,8 +193,8 @@ public class TestReplicationSinkManager {
     ServerName serverNameA = sinkList.get(0);
     ServerName serverNameB = sinkList.get(1);
 
-    SinkPeer sinkPeerA = new SinkPeer(serverNameA, mock(AsyncRegionServerAdmin.class));
-    SinkPeer sinkPeerB = new SinkPeer(serverNameB, mock(AsyncRegionServerAdmin.class));
+    SinkPeer sinkPeerA = mockSinkPeer(serverNameA);
+    SinkPeer sinkPeerB = mockSinkPeer(serverNameB);
 
     for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD; i++) {
       sinkManager.reportBadSink(sinkPeerA);
@@ -207,4 +208,8 @@ public class TestReplicationSinkManager {
     assertEquals(expected, sinkManager.getNumSinks());
   }
 
+  private SinkPeer mockSinkPeer(ServerName serverName) {
+    return new ReplicationServerSinkPeer(serverName, mock(AsyncReplicationServerAdmin.class));
+  }
+
 }


[hbase] 01/05: HBASE-24682 Refactor ReplicationSource#addHFileRefs method: move it to ReplicationSourceManager (#2020)

Posted by zg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zghao pushed a commit to branch HBASE-24666
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 8e1bf74d0a9969c2241fd6cab2c22208cdc8f8a6
Author: Guanghao Zhang <zg...@apache.org>
AuthorDate: Wed Jul 8 14:29:08 2020 +0800

    HBASE-24682 Refactor ReplicationSource#addHFileRefs method: move it to ReplicationSourceManager (#2020)
    
    Signed-off-by: Wellington Chevreuil <wc...@apache.org>
---
 .../regionserver/ReplicationSource.java            | 38 +----------------
 .../regionserver/ReplicationSourceInterface.java   | 14 -------
 .../regionserver/ReplicationSourceManager.java     | 48 +++++++++++++++++++++-
 .../hbase/replication/ReplicationSourceDummy.java  |  9 +---
 4 files changed, 49 insertions(+), 60 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index f24ecfa..85c4657 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -28,7 +28,6 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
@@ -37,6 +36,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Predicate;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -46,21 +46,17 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableDescriptors;
-import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.RSRpcServices;
 import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
 import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
 import org.apache.hadoop.hbase.replication.ClusterMarkingEntryFilter;
 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
-import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationPeer;
 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
 import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
 import org.apache.hadoop.hbase.replication.WALEntryFilter;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
@@ -265,38 +261,6 @@ public class ReplicationSource implements ReplicationSourceInterface {
     }
   }
 
-  @Override
-  public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
-      throws ReplicationException {
-    String peerId = replicationPeer.getId();
-    Set<String> namespaces = replicationPeer.getNamespaces();
-    Map<TableName, List<String>> tableCFMap = replicationPeer.getTableCFs();
-    if (tableCFMap != null) { // All peers with TableCFs
-      List<String> tableCfs = tableCFMap.get(tableName);
-      if (tableCFMap.containsKey(tableName)
-          && (tableCfs == null || tableCfs.contains(Bytes.toString(family)))) {
-        this.queueStorage.addHFileRefs(peerId, pairs);
-        metrics.incrSizeOfHFileRefsQueue(pairs.size());
-      } else {
-        LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}",
-            tableName, Bytes.toString(family), peerId);
-      }
-    } else if (namespaces != null) { // Only for set NAMESPACES peers
-      if (namespaces.contains(tableName.getNamespaceAsString())) {
-        this.queueStorage.addHFileRefs(peerId, pairs);
-        metrics.incrSizeOfHFileRefsQueue(pairs.size());
-      } else {
-        LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}",
-            tableName, Bytes.toString(family), peerId);
-      }
-    } else {
-      // user has explicitly not defined any table cfs for replication, means replicate all the
-      // data
-      this.queueStorage.addHFileRefs(peerId, pairs);
-      metrics.incrSizeOfHFileRefsQueue(pairs.size());
-    }
-  }
-
   private ReplicationEndpoint createReplicationEndpoint()
       throws InstantiationException, IllegalAccessException, ClassNotFoundException, IOException {
     RegionServerCoprocessorHost rsServerHost = null;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
index 0bd90cf..33a413f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
@@ -29,12 +29,9 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
-import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationPeer;
 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
-import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.yetus.audience.InterfaceAudience;
 
@@ -63,17 +60,6 @@ public interface ReplicationSourceInterface {
   void enqueueLog(Path log);
 
   /**
-   * Add hfile names to the queue to be replicated.
-   * @param tableName Name of the table these files belongs to
-   * @param family Name of the family these files belong to
-   * @param pairs list of pairs of { HFile location in staging dir, HFile path in region dir which
-   *          will be added in the queue for replication}
-   * @throws ReplicationException If failed to add hfile references
-   */
-  void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
-      throws ReplicationException;
-
-  /**
    * Start the replication
    */
   void startup();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 0940b5a..3869857 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -61,6 +61,7 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
 import org.apache.hadoop.hbase.replication.ReplicationTracker;
 import org.apache.hadoop.hbase.replication.ReplicationUtils;
 import org.apache.hadoop.hbase.replication.SyncReplicationState;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider;
@@ -173,6 +174,8 @@ public class ReplicationSourceManager implements ReplicationListener {
   private final long totalBufferLimit;
   private final MetricsReplicationGlobalSourceSource globalMetrics;
 
+  private final Map<String, MetricsSource> sourceMetrics = new HashMap<>();
+
   /**
    * Creates a replication manager and sets the watch on all the other registered region servers
    * @param queueStorage the interface for manipulating replication queues
@@ -355,6 +358,7 @@ public class ReplicationSourceManager implements ReplicationListener {
     ReplicationSourceInterface src = ReplicationSourceFactory.create(conf, queueId);
 
     MetricsSource metrics = new MetricsSource(queueId);
+    sourceMetrics.put(queueId, metrics);
     // init replication source
     src.init(conf, fs, this, queueStorage, replicationPeer, server, queueId, clusterId,
       walFileLengthProvider, metrics);
@@ -1139,7 +1143,49 @@ public class ReplicationSourceManager implements ReplicationListener {
   public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
       throws IOException {
     for (ReplicationSourceInterface source : this.sources.values()) {
-      throwIOExceptionWhenFail(() -> source.addHFileRefs(tableName, family, pairs));
+      throwIOExceptionWhenFail(() -> addHFileRefs(source.getPeerId(), tableName, family, pairs));
+    }
+  }
+
+  /**
+   * Add hfile names to the queue to be replicated.
+   * @param peerId the replication peer id
+   * @param tableName Name of the table these files belongs to
+   * @param family Name of the family these files belong to
+   * @param pairs list of pairs of { HFile location in staging dir, HFile path in region dir which
+   *          will be added in the queue for replication}
+   * @throws ReplicationException If failed to add hfile references
+   */
+  private void addHFileRefs(String peerId, TableName tableName, byte[] family,
+    List<Pair<Path, Path>> pairs) throws ReplicationException {
+    // Only the normal replication source update here, its peerId is equals to queueId.
+    MetricsSource metrics = sourceMetrics.get(peerId);
+    ReplicationPeer replicationPeer = replicationPeers.getPeer(peerId);
+    Set<String> namespaces = replicationPeer.getNamespaces();
+    Map<TableName, List<String>> tableCFMap = replicationPeer.getTableCFs();
+    if (tableCFMap != null) { // All peers with TableCFs
+      List<String> tableCfs = tableCFMap.get(tableName);
+      if (tableCFMap.containsKey(tableName)
+        && (tableCfs == null || tableCfs.contains(Bytes.toString(family)))) {
+        this.queueStorage.addHFileRefs(peerId, pairs);
+        metrics.incrSizeOfHFileRefsQueue(pairs.size());
+      } else {
+        LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}",
+          tableName, Bytes.toString(family), peerId);
+      }
+    } else if (namespaces != null) { // Only for set NAMESPACES peers
+      if (namespaces.contains(tableName.getNamespaceAsString())) {
+        this.queueStorage.addHFileRefs(peerId, pairs);
+        metrics.incrSizeOfHFileRefsQueue(pairs.size());
+      } else {
+        LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}",
+          tableName, Bytes.toString(family), peerId);
+      }
+    } else {
+      // user has explicitly not defined any table cfs for replication, means replicate all the
+      // data
+      this.queueStorage.addHFileRefs(peerId, pairs);
+      metrics.incrSizeOfHFileRefsQueue(pairs.size());
     }
   }
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
index a361c44..781a1da 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
@@ -21,17 +21,16 @@ import java.io.IOException;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
 import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider;
-import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 
 /**
@@ -114,12 +113,6 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
   }
 
   @Override
-  public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> files)
-      throws ReplicationException {
-    return;
-  }
-
-  @Override
   public boolean isPeerEnabled() {
     return true;
   }


[hbase] 02/05: HBASE-24681 Remove the cache walsById/walsByIdRecoveredQueues from ReplicationSourceManager (#2019)

Posted by zg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zghao pushed a commit to branch HBASE-24666
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit d90c20e344f624877362525dc56a645b5c92ca82
Author: Guanghao Zhang <zg...@apache.org>
AuthorDate: Mon Jul 13 17:35:32 2020 +0800

    HBASE-24681 Remove the cache walsById/walsByIdRecoveredQueues from ReplicationSourceManager (#2019)
    
    Signed-off-by: Wellington Chevreuil <wc...@apache.org>
---
 .../regionserver/ReplicationSourceManager.java     | 204 +++++++--------------
 1 file changed, 62 insertions(+), 142 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 3869857..a222f4b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -91,30 +91,6 @@ import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFacto
  * <li>No need synchronized on {@link #sources}. {@link #sources} is a ConcurrentHashMap and there
  * is a Lock for peer id in {@link PeerProcedureHandlerImpl}. So there is no race for peer
  * operations.</li>
- * <li>Need synchronized on {@link #walsById}. There are four methods which modify it,
- * {@link #addPeer(String)}, {@link #removePeer(String)},
- * {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} and {@link #preLogRoll(Path)}.
- * {@link #walsById} is a ConcurrentHashMap and there is a Lock for peer id in
- * {@link PeerProcedureHandlerImpl}. So there is no race between {@link #addPeer(String)} and
- * {@link #removePeer(String)}. {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)}
- * is called by {@link ReplicationSourceInterface}. So no race with {@link #addPeer(String)}.
- * {@link #removePeer(String)} will terminate the {@link ReplicationSourceInterface} firstly, then
- * remove the wals from {@link #walsById}. So no race with {@link #removePeer(String)}. The only
- * case need synchronized is {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} and
- * {@link #preLogRoll(Path)}.</li>
- * <li>No need synchronized on {@link #walsByIdRecoveredQueues}. There are three methods which
- * modify it, {@link #removePeer(String)} ,
- * {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} and
- * {@link ReplicationSourceManager.NodeFailoverWorker#run()}.
- * {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} is called by
- * {@link ReplicationSourceInterface}. {@link #removePeer(String)} will terminate the
- * {@link ReplicationSourceInterface} firstly, then remove the wals from
- * {@link #walsByIdRecoveredQueues}. And {@link ReplicationSourceManager.NodeFailoverWorker#run()}
- * will add the wals to {@link #walsByIdRecoveredQueues} firstly, then start up a
- * {@link ReplicationSourceInterface}. So there is no race here. For
- * {@link ReplicationSourceManager.NodeFailoverWorker#run()} and {@link #removePeer(String)}, there
- * is already synchronized on {@link #oldsources}. So no need synchronized on
- * {@link #walsByIdRecoveredQueues}.</li>
  * <li>Need synchronized on {@link #latestPaths} to avoid the new open source miss new log.</li>
  * <li>Need synchronized on {@link #oldsources} to avoid adding recovered source for the
  * to-be-removed peer.</li>
@@ -135,15 +111,6 @@ public class ReplicationSourceManager implements ReplicationListener {
   // All about stopping
   private final Server server;
 
-  // All logs we are currently tracking
-  // Index structure of the map is: queue_id->logPrefix/logGroup->logs
-  // For normal replication source, the peer id is same with the queue id
-  private final ConcurrentMap<String, Map<String, NavigableSet<String>>> walsById;
-  // Logs for recovered sources we are currently tracking
-  // the map is: queue_id->logPrefix/logGroup->logs
-  // For recovered source, the queue id's format is peer_id-servername-*
-  private final ConcurrentMap<String, Map<String, NavigableSet<String>>> walsByIdRecoveredQueues;
-
   private final SyncReplicationPeerMappingManager syncReplicationPeerMappingManager;
 
   private final Configuration conf;
@@ -199,8 +166,6 @@ public class ReplicationSourceManager implements ReplicationListener {
     this.replicationPeers = replicationPeers;
     this.replicationTracker = replicationTracker;
     this.server = server;
-    this.walsById = new ConcurrentHashMap<>();
-    this.walsByIdRecoveredQueues = new ConcurrentHashMap<>();
     this.oldsources = new ArrayList<>();
     this.conf = conf;
     this.fs = fs;
@@ -338,7 +303,6 @@ public class ReplicationSourceManager implements ReplicationListener {
       // Delete queue from storage and memory and queue id is same with peer id for normal
       // source
       deleteQueue(peerId);
-      this.walsById.remove(peerId);
     }
     ReplicationPeerConfig peerConfig = peer.getPeerConfig();
     if (peerConfig.isSyncReplication()) {
@@ -379,15 +343,10 @@ public class ReplicationSourceManager implements ReplicationListener {
     // synchronized on latestPaths to avoid missing the new log
     synchronized (this.latestPaths) {
       this.sources.put(peerId, src);
-      Map<String, NavigableSet<String>> walsByGroup = new HashMap<>();
-      this.walsById.put(peerId, walsByGroup);
       // Add the latest wal to that source's queue
       if (!latestPaths.isEmpty()) {
         for (Map.Entry<String, Path> walPrefixAndPath : latestPaths.entrySet()) {
           Path walPath = walPrefixAndPath.getValue();
-          NavigableSet<String> wals = new TreeSet<>();
-          wals.add(walPath.getName());
-          walsByGroup.put(walPrefixAndPath.getKey(), wals);
           // Abort RS and throw exception to make add peer failed
           abortAndThrowIOExceptionWhenFail(
             () -> this.queueStorage.addWAL(server.getServerName(), peerId, walPath.getName()));
@@ -441,7 +400,10 @@ public class ReplicationSourceManager implements ReplicationListener {
       // map from walsById since later we may fail to delete them from the replication queue
       // storage, and when we retry next time, we can not know the wal files that need to be deleted
       // from the replication queue storage.
-      walsById.get(peerId).forEach((k, v) -> wals.put(k, new TreeSet<>(v)));
+      this.queueStorage.getWALsInQueue(this.server.getServerName(), peerId).forEach(wal -> {
+        String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal);
+        wals.computeIfAbsent(walPrefix, p -> new TreeSet<>()).add(wal);
+      });
     }
     LOG.info("Startup replication source for " + src.getPeerId());
     src.startup();
@@ -450,15 +412,6 @@ public class ReplicationSourceManager implements ReplicationListener {
         queueStorage.removeWAL(server.getServerName(), peerId, wal);
       }
     }
-    synchronized (walsById) {
-      Map<String, NavigableSet<String>> oldWals = walsById.get(peerId);
-      wals.forEach((k, v) -> {
-        NavigableSet<String> walsByGroup = oldWals.get(k);
-        if (walsByGroup != null) {
-          walsByGroup.removeAll(v);
-        }
-      });
-    }
     // synchronized on oldsources to avoid race with NodeFailoverWorker. Since NodeFailoverWorker is
     // a background task, we will delete the file from replication queue storage under the lock to
     // simplify the logic.
@@ -470,7 +423,6 @@ public class ReplicationSourceManager implements ReplicationListener {
           oldSource.terminate(terminateMessage);
           oldSource.getSourceMetrics().clear();
           queueStorage.removeQueue(server.getServerName(), queueId);
-          walsByIdRecoveredQueues.remove(queueId);
           iter.remove();
         }
       }
@@ -483,7 +435,7 @@ public class ReplicationSourceManager implements ReplicationListener {
    * replication queue storage and only to enqueue all logs to the new replication source
    * @param peerId the id of the replication peer
    */
-  public void refreshSources(String peerId) throws IOException {
+  public void refreshSources(String peerId) throws ReplicationException, IOException {
     String terminateMessage = "Peer " + peerId +
       " state or config changed. Will close the previous replication source and open a new one";
     ReplicationPeer peer = replicationPeers.getPeer(peerId);
@@ -496,9 +448,8 @@ public class ReplicationSourceManager implements ReplicationListener {
         // Do not clear metrics
         toRemove.terminate(terminateMessage, null, false);
       }
-      for (NavigableSet<String> walsByGroup : walsById.get(peerId).values()) {
-        walsByGroup.forEach(wal -> src.enqueueLog(new Path(this.logDir, wal)));
-      }
+      this.queueStorage.getWALsInQueue(this.server.getServerName(), peerId)
+        .forEach(wal -> src.enqueueLog(new Path(this.logDir, wal)));
     }
     LOG.info("Startup replication source for " + src.getPeerId());
     src.startup();
@@ -519,9 +470,8 @@ public class ReplicationSourceManager implements ReplicationListener {
       for (String queueId : previousQueueIds) {
         ReplicationSourceInterface recoveredReplicationSource = createSource(queueId, peer);
         this.oldsources.add(recoveredReplicationSource);
-        for (SortedSet<String> walsByGroup : walsByIdRecoveredQueues.get(queueId).values()) {
-          walsByGroup.forEach(wal -> recoveredReplicationSource.enqueueLog(new Path(wal)));
-        }
+        this.queueStorage.getWALsInQueue(this.server.getServerName(), queueId)
+          .forEach(wal -> recoveredReplicationSource.enqueueLog(new Path(wal)));
         toStartup.add(recoveredReplicationSource);
       }
     }
@@ -541,7 +491,6 @@ public class ReplicationSourceManager implements ReplicationListener {
     LOG.info("Done with the recovered queue {}", src.getQueueId());
     // Delete queue from storage and memory
     deleteQueue(src.getQueueId());
-    this.walsByIdRecoveredQueues.remove(src.getQueueId());
     return true;
   }
 
@@ -564,8 +513,6 @@ public class ReplicationSourceManager implements ReplicationListener {
     this.sources.remove(src.getPeerId());
     // Delete queue from storage and memory
     deleteQueue(src.getQueueId());
-    this.walsById.remove(src.getQueueId());
-
   }
 
   /**
@@ -651,42 +598,19 @@ public class ReplicationSourceManager implements ReplicationListener {
    * @param source the replication source
    */
   @VisibleForTesting
-  void cleanOldLogs(String log, boolean inclusive, ReplicationSourceInterface source) {
-    String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(log);
-    if (source.isRecovered()) {
-      NavigableSet<String> wals = walsByIdRecoveredQueues.get(source.getQueueId()).get(logPrefix);
-      if (wals != null) {
-        NavigableSet<String> walsToRemove = wals.headSet(log, inclusive);
-        if (walsToRemove.isEmpty()) {
-          return;
-        }
-        cleanOldLogs(walsToRemove, source);
-        walsToRemove.clear();
-      }
-    } else {
-      NavigableSet<String> wals;
-      NavigableSet<String> walsToRemove;
-      // synchronized on walsById to avoid race with preLogRoll
-      synchronized (this.walsById) {
-        wals = walsById.get(source.getQueueId()).get(logPrefix);
-        if (wals == null) {
-          return;
-        }
-        walsToRemove = wals.headSet(log, inclusive);
-        if (walsToRemove.isEmpty()) {
-          return;
-        }
-        walsToRemove = new TreeSet<>(walsToRemove);
-      }
-      // cleanOldLogs may spend some time, especially for sync replication where we may want to
-      // remove remote wals as the remote cluster may have already been down, so we do it outside
-      // the lock to avoid block preLogRoll
-      cleanOldLogs(walsToRemove, source);
-      // now let's remove the files in the set
-      synchronized (this.walsById) {
-        wals.removeAll(walsToRemove);
-      }
+  void cleanOldLogs(String log, boolean inclusive,
+    ReplicationSourceInterface source) {
+    NavigableSet<String> walsToRemove;
+    synchronized (this.latestPaths) {
+      walsToRemove = getWalsToRemove(source.getQueueId(), log, inclusive);
     }
+    if (walsToRemove.isEmpty()) {
+      return;
+    }
+    // cleanOldLogs may spend some time, especially for sync replication where we may want to
+    // remove remote wals as the remote cluster may have already been down, so we do it outside
+    // the lock to avoid block preLogRoll
+    cleanOldLogs(walsToRemove, source);
   }
 
   private void removeRemoteWALs(String peerId, String remoteWALDir, Collection<String> wals)
@@ -767,37 +691,6 @@ public class ReplicationSourceManager implements ReplicationListener {
         abortAndThrowIOExceptionWhenFail(
           () -> this.queueStorage.addWAL(server.getServerName(), source.getQueueId(), logName));
       }
-
-      // synchronized on walsById to avoid race with cleanOldLogs
-      synchronized (this.walsById) {
-        // Update walsById map
-        for (Map.Entry<String, Map<String, NavigableSet<String>>> entry : this.walsById
-          .entrySet()) {
-          String peerId = entry.getKey();
-          Map<String, NavigableSet<String>> walsByPrefix = entry.getValue();
-          boolean existingPrefix = false;
-          for (Map.Entry<String, NavigableSet<String>> walsEntry : walsByPrefix.entrySet()) {
-            SortedSet<String> wals = walsEntry.getValue();
-            if (this.sources.isEmpty()) {
-              // If there's no slaves, don't need to keep the old wals since
-              // we only consider the last one when a new slave comes in
-              wals.clear();
-            }
-            if (logPrefix.equals(walsEntry.getKey())) {
-              wals.add(logName);
-              existingPrefix = true;
-            }
-          }
-          if (!existingPrefix) {
-            // The new log belongs to a new group, add it into this peer
-            LOG.debug("Start tracking logs for wal group {} for peer {}", logPrefix, peerId);
-            NavigableSet<String> wals = new TreeSet<>();
-            wals.add(logName);
-            walsByPrefix.put(logPrefix, wals);
-          }
-        }
-      }
-
       // Add to latestPaths
       latestPaths.put(logPrefix, newLog);
     }
@@ -969,18 +862,6 @@ public class ReplicationSourceManager implements ReplicationListener {
                 continue;
               }
             }
-            // track sources in walsByIdRecoveredQueues
-            Map<String, NavigableSet<String>> walsByGroup = new HashMap<>();
-            walsByIdRecoveredQueues.put(queueId, walsByGroup);
-            for (String wal : walsSet) {
-              String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal);
-              NavigableSet<String> wals = walsByGroup.get(walPrefix);
-              if (wals == null) {
-                wals = new TreeSet<>();
-                walsByGroup.put(walPrefix, wals);
-              }
-              wals.add(wal);
-            }
             oldsources.add(src);
             LOG.trace("Added source for recovered queue: " + src.getQueueId());
             for (String wal : walsSet) {
@@ -1012,7 +893,18 @@ public class ReplicationSourceManager implements ReplicationListener {
    * @return a sorted set of wal names
    */
   @VisibleForTesting
-  public Map<String, Map<String, NavigableSet<String>>> getWALs() {
+  public Map<String, Map<String, NavigableSet<String>>> getWALs()
+    throws ReplicationException {
+    Map<String, Map<String, NavigableSet<String>>> walsById = new HashMap<>();
+    for (ReplicationSourceInterface source : sources.values()) {
+      String queueId = source.getQueueId();
+      Map<String, NavigableSet<String>> walsByGroup = new HashMap<>();
+      walsById.put(queueId, walsByGroup);
+      for (String wal : this.queueStorage.getWALsInQueue(this.server.getServerName(), queueId)) {
+        String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal);
+        walsByGroup.computeIfAbsent(walPrefix, p -> new TreeSet<>()).add(wal);
+      }
+    }
     return Collections.unmodifiableMap(walsById);
   }
 
@@ -1021,7 +913,18 @@ public class ReplicationSourceManager implements ReplicationListener {
    * @return a sorted set of wal names
    */
   @VisibleForTesting
-  Map<String, Map<String, NavigableSet<String>>> getWalsByIdRecoveredQueues() {
+  Map<String, Map<String, NavigableSet<String>>> getWalsByIdRecoveredQueues()
+    throws ReplicationException {
+    Map<String, Map<String, NavigableSet<String>>> walsByIdRecoveredQueues = new HashMap<>();
+    for (ReplicationSourceInterface source : oldsources) {
+      String queueId = source.getQueueId();
+      Map<String, NavigableSet<String>> walsByGroup = new HashMap<>();
+      walsByIdRecoveredQueues.put(queueId, walsByGroup);
+      for (String wal : this.queueStorage.getWALsInQueue(this.server.getServerName(), queueId)) {
+        String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal);
+        walsByGroup.computeIfAbsent(walPrefix, p -> new TreeSet<>()).add(wal);
+      }
+    }
     return Collections.unmodifiableMap(walsByIdRecoveredQueues);
   }
 
@@ -1200,4 +1103,21 @@ public class ReplicationSourceManager implements ReplicationListener {
   MetricsReplicationGlobalSourceSource getGlobalMetrics() {
     return this.globalMetrics;
   }
+
+  private NavigableSet<String> getWalsToRemove(String queueId, String log, boolean inclusive) {
+    NavigableSet<String> walsToRemove = new TreeSet<>();
+    String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(log);
+    try {
+      this.queueStorage.getWALsInQueue(this.server.getServerName(), queueId).forEach(wal -> {
+        String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal);
+        if (walPrefix.equals(logPrefix)) {
+          walsToRemove.add(wal);
+        }
+      });
+    } catch (ReplicationException e) {
+      // Just log the exception here, as the recovered replication source will try to cleanup again.
+      LOG.warn("Failed to read wals in queue {}", queueId, e);
+    }
+    return walsToRemove.headSet(log, inclusive);
+  }
 }


[hbase] 04/05: HBASE-24683 Add a basic ReplicationServer which only implement ReplicationSink Service (#2111)

Posted by zg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zghao pushed a commit to branch HBASE-24666
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 2e95e759f6fc8c7f35f9564429618969c29ea8aa
Author: XinSun <dd...@gmail.com>
AuthorDate: Fri Sep 4 18:53:46 2020 +0800

    HBASE-24683 Add a basic ReplicationServer which only implement ReplicationSink Service (#2111)
    
    Signed-off-by: Guanghao Zhang <zg...@apache.org>
---
 .../java/org/apache/hadoop/hbase/util/DNS.java     |   3 +-
 .../hbase/replication/HReplicationServer.java      | 391 ++++++++++++++++
 .../replication/ReplicationServerRpcServices.java  | 516 +++++++++++++++++++++
 .../hbase/replication/TestReplicationServer.java   | 151 ++++++
 4 files changed, 1060 insertions(+), 1 deletion(-)

diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/DNS.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/DNS.java
index 2b4e1cb..ddff6db 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/DNS.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/DNS.java
@@ -54,7 +54,8 @@ public final class DNS {
 
   public enum ServerType {
     MASTER("master"),
-    REGIONSERVER("regionserver");
+    REGIONSERVER("regionserver"),
+    REPLICATIONSERVER("replicationserver");
 
     private String name;
     ServerType(String name) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java
new file mode 100644
index 0000000..31dec0c
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java
@@ -0,0 +1,391 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ChoreService;
+import org.apache.hadoop.hbase.CoordinatedStateManager;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
+import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.log.HBaseMarkers;
+import org.apache.hadoop.hbase.regionserver.ReplicationService;
+import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.trace.TraceUtil;
+import org.apache.hadoop.hbase.util.Sleeper;
+import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * HReplicationServer which is responsible to all replication stuff. It checks in with
+ * the HMaster. There are many HReplicationServers in a single HBase deployment.
+ */
+@InterfaceAudience.Private
+@SuppressWarnings({ "deprecation"})
+public class HReplicationServer extends Thread implements Server {
+
+  private static final Logger LOG = LoggerFactory.getLogger(HReplicationServer.class);
+
+  /** replication server process name */
+  public static final String REPLICATION_SERVER = "replicationserver";
+
+  /**
+   * This servers start code.
+   */
+  protected final long startCode;
+
+  private volatile boolean stopped = false;
+
+  // Go down hard. Used if file system becomes unavailable and also in
+  // debugging and unit tests.
+  private AtomicBoolean abortRequested;
+
+  // flag set after we're done setting up server threads
+  final AtomicBoolean online = new AtomicBoolean(false);
+
+  /**
+   * The server name the Master sees us as.  Its made from the hostname the
+   * master passes us, port, and server start code. Gets set after registration
+   * against Master.
+   */
+  private ServerName serverName;
+
+  protected final Configuration conf;
+
+  private ReplicationSinkService replicationSinkService;
+
+  final int msgInterval;
+  // A sleeper that sleeps for msgInterval.
+  protected final Sleeper sleeper;
+
+  // zookeeper connection and watcher
+  protected final ZKWatcher zooKeeper;
+
+  /**
+   * The asynchronous cluster connection to be shared by services.
+   */
+  protected AsyncClusterConnection asyncClusterConnection;
+
+  private UserProvider userProvider;
+
+  protected final ReplicationServerRpcServices rpcServices;
+
+  public HReplicationServer(final Configuration conf) throws IOException {
+    TraceUtil.initTracer(conf);
+    try {
+      this.startCode = System.currentTimeMillis();
+      this.conf = conf;
+
+      this.abortRequested = new AtomicBoolean(false);
+
+      this.rpcServices = createRpcServices();
+
+      String hostName = this.rpcServices.isa.getHostName();
+      serverName = ServerName.valueOf(hostName, this.rpcServices.isa.getPort(), this.startCode);
+
+      this.userProvider = UserProvider.instantiate(conf);
+
+      this.msgInterval = conf.getInt("hbase.replicationserver.msginterval", 3 * 1000);
+      this.sleeper = new Sleeper(this.msgInterval, this);
+
+      // Some unit tests don't need a cluster, so no zookeeper at all
+      if (!conf.getBoolean("hbase.testing.nocluster", false)) {
+        // Open connection to zookeeper and set primary watcher
+        zooKeeper = new ZKWatcher(conf, getProcessName() + ":" +
+            rpcServices.isa.getPort(), this, false);
+      } else {
+        zooKeeper = null;
+      }
+
+      this.rpcServices.start(zooKeeper);
+    } catch (Throwable t) {
+      // Make sure we log the exception. HReplicationServer is often started via reflection and the
+      // cause of failed startup is lost.
+      LOG.error("Failed construction ReplicationServer", t);
+      throw t;
+    }
+  }
+
+  public String getProcessName() {
+    return REPLICATION_SERVER;
+  }
+
+  @Override
+  public void run() {
+    if (isStopped()) {
+      LOG.info("Skipping run; stopped");
+      return;
+    }
+    try {
+      // Do pre-registration initializations; zookeeper, lease threads, etc.
+      preRegistrationInitialization();
+    } catch (Throwable e) {
+      abort("Fatal exception during initialization", e);
+    }
+    try {
+      setupReplication();
+      startReplicationService();
+
+      online.set(true);
+
+      long lastMsg = System.currentTimeMillis();
+      // The main run loop.
+      while (!isStopped()) {
+        long now = System.currentTimeMillis();
+        if ((now - lastMsg) >= msgInterval) {
+          lastMsg = System.currentTimeMillis();
+        }
+        if (!isStopped() && !isAborted()) {
+          this.sleeper.sleep();
+        }
+      }
+
+      stopServiceThreads();
+
+      if (this.rpcServices != null) {
+        this.rpcServices.stop();
+      }
+    } catch (Throwable t) {
+      abort(t.getMessage(), t);
+    }
+
+    if (this.zooKeeper != null) {
+      this.zooKeeper.close();
+    }
+    LOG.info("Exiting; stopping=" + this.serverName + "; zookeeper connection closed.");
+  }
+
+  private Configuration cleanupConfiguration() {
+    Configuration conf = this.conf;
+    conf.set(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
+        HConstants.ZK_CONNECTION_REGISTRY_CLASS);
+    if (conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM) != null) {
+      // Use server ZK cluster for server-issued connections, so we clone
+      // the conf and unset the client ZK related properties
+      conf = new Configuration(this.conf);
+      conf.unset(HConstants.CLIENT_ZOOKEEPER_QUORUM);
+    }
+    return conf;
+  }
+
+  /**
+   * All initialization needed before we go register with Master.<br>
+   * Do bare minimum. Do bulk of initializations AFTER we've connected to the Master.<br>
+   * In here we just put up the RpcServer, setup Connection, and ZooKeeper.
+   */
+  private void preRegistrationInitialization() {
+    try {
+      setupClusterConnection();
+    } catch (Throwable t) {
+      // Call stop if error or process will stick around for ever since server
+      // puts up non-daemon threads.
+      this.rpcServices.stop();
+      abort("Initialization of RS failed.  Hence aborting RS.", t);
+    }
+  }
+
+  /**
+   * Setup our cluster connection if not already initialized.
+   */
+  protected final synchronized void setupClusterConnection() throws IOException {
+    if (asyncClusterConnection == null) {
+      Configuration conf = cleanupConfiguration();
+      InetSocketAddress localAddress = new InetSocketAddress(this.rpcServices.isa.getAddress(), 0);
+      User user = userProvider.getCurrent();
+      asyncClusterConnection =
+          ClusterConnectionFactory.createAsyncClusterConnection(conf, localAddress, user);
+    }
+  }
+
+  /**
+   * Wait on all threads to finish. Presumption is that all closes and stops
+   * have already been called.
+   */
+  protected void stopServiceThreads() {
+    if (this.replicationSinkService != null) {
+      this.replicationSinkService.stopReplicationService();
+    }
+  }
+
+  @Override
+  public Configuration getConfiguration() {
+    return conf;
+  }
+
+  @Override
+  public ZKWatcher getZooKeeper() {
+    return zooKeeper;
+  }
+
+  @Override
+  public Connection getConnection() {
+    return getAsyncConnection().toConnection();
+  }
+
+  @Override
+  public Connection createConnection(Configuration conf) throws IOException {
+    throw new DoNotRetryIOException(new UnsupportedOperationException("This's ReplicationServer."));
+  }
+
+  @Override
+  public AsyncClusterConnection getAsyncClusterConnection() {
+    return this.asyncClusterConnection;
+  }
+
+  @Override
+  public ServerName getServerName() {
+    return serverName;
+  }
+
+  @Override
+  public CoordinatedStateManager getCoordinatedStateManager() {
+    return null;
+  }
+
+  @Override
+  public ChoreService getChoreService() {
+    return null;
+  }
+
+  @Override
+  public void abort(String why, Throwable cause) {
+    if (!setAbortRequested()) {
+      // Abort already in progress, ignore the new request.
+      LOG.debug(
+          "Abort already in progress. Ignoring the current request with reason: {}", why);
+      return;
+    }
+    String msg = "***** ABORTING replication server " + this + ": " + why + " *****";
+    if (cause != null) {
+      LOG.error(HBaseMarkers.FATAL, msg, cause);
+    } else {
+      LOG.error(HBaseMarkers.FATAL, msg);
+    }
+    stop(why);
+  }
+
+  @Override
+  public boolean isAborted() {
+    return abortRequested.get();
+  }
+
+  @Override
+  public void stop(final String msg) {
+    if (!this.stopped) {
+      LOG.info("***** STOPPING region server '" + this + "' *****");
+      this.stopped = true;
+      LOG.info("STOPPED: " + msg);
+      // Wakes run() if it is sleeping
+      sleeper.skipSleepCycle();
+    }
+  }
+
+  @Override
+  public boolean isStopped() {
+    return this.stopped;
+  }
+
+  /**
+   * Setup WAL log and replication if enabled. Replication setup is done in here because it wants to
+   * be hooked up to WAL.
+   */
+  private void setupReplication() throws IOException {
+    // Instantiate replication if replication enabled. Pass it the log directories.
+    createNewReplicationInstance(conf, this);
+  }
+
+  /**
+   * Load the replication executorService objects, if any
+   */
+  private static void createNewReplicationInstance(Configuration conf, HReplicationServer server)
+      throws IOException {
+    // read in the name of the sink replication class from the config file.
+    String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME,
+        HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
+
+    server.replicationSinkService = newReplicationInstance(sinkClassname,
+        ReplicationSinkService.class, conf, server);
+  }
+
+  private static <T extends ReplicationService> T newReplicationInstance(String classname,
+      Class<T> xface, Configuration conf, HReplicationServer server) throws IOException {
+    final Class<? extends T> clazz;
+    try {
+      ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+      clazz = Class.forName(classname, true, classLoader).asSubclass(xface);
+    } catch (java.lang.ClassNotFoundException nfe) {
+      throw new IOException("Could not find class for " + classname);
+    }
+    T service = ReflectionUtils.newInstance(clazz, conf);
+    service.initialize(server, null, null, null, null);
+    return service;
+  }
+
+  /**
+   * Start up replication source and sink handlers.
+   */
+  private void startReplicationService() throws IOException {
+    if (this.replicationSinkService != null) {
+      this.replicationSinkService.startReplicationService();
+    }
+  }
+
+  /**
+   * @return Return the object that implements the replication sink executorService.
+   */
+  public ReplicationSinkService getReplicationSinkService() {
+    return replicationSinkService;
+  }
+
+  /**
+   * Report the status of the server. A server is online once all the startup is
+   * completed (setting up filesystem, starting executorService threads, etc.). This
+   * method is designed mostly to be useful in tests.
+   *
+   * @return true if online, false if not.
+   */
+  public boolean isOnline() {
+    return online.get();
+  }
+
+  protected ReplicationServerRpcServices createRpcServices() throws IOException {
+    return new ReplicationServerRpcServices(this);
+  }
+
+  /**
+   * Sets the abort state if not already set.
+   * @return True if abortRequested set to True successfully, false if an abort is already in
+   * progress.
+   */
+  protected boolean setAbortRequested() {
+    return abortRequested.compareAndSet(false, true);
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationServerRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationServerRpcServices.java
new file mode 100644
index 0000000..1b9b699
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationServerRpcServices.java
@@ -0,0 +1,516 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.net.BindException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.LongAdder;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.client.ConnectionUtils;
+import org.apache.hadoop.hbase.io.ByteBuffAllocator;
+import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.ipc.PriorityFunction;
+import org.apache.hadoop.hbase.ipc.QosPriority;
+import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
+import org.apache.hadoop.hbase.ipc.RpcServerFactory;
+import org.apache.hadoop.hbase.ipc.RpcServerInterface;
+import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
+import org.apache.hadoop.hbase.log.HBaseMarkers;
+import org.apache.hadoop.hbase.net.Address;
+import org.apache.hadoop.hbase.regionserver.RSRpcServices;
+import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException;
+import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
+import org.apache.hadoop.hbase.regionserver.RpcSchedulerFactory;
+import org.apache.hadoop.hbase.regionserver.SimpleRpcSchedulerFactory;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.access.AccessChecker;
+import org.apache.hadoop.hbase.security.access.NoopAccessChecker;
+import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher;
+import org.apache.hadoop.hbase.util.DNS;
+import org.apache.hadoop.hbase.util.DNS.ServerType;
+import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponseRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponses;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SlowLogResponseRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SlowLogResponses;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
+import org.apache.hbase.thirdparty.com.google.protobuf.Message;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
+import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
+
+/**
+ * Implements the regionserver RPC services for {@link HReplicationServer}.
+ */
+@InterfaceAudience.Private
+@SuppressWarnings("deprecation")
+public class ReplicationServerRpcServices implements HBaseRPCErrorHandler,
+    AdminService.BlockingInterface, PriorityFunction {
+
+  protected static final Logger LOG = LoggerFactory.getLogger(ReplicationServerRpcServices.class);
+
+  /** Parameter name for port replication server listens on. */
+  public static final String REPLICATION_SERVER_PORT = "hbase.replicationserver.port";
+
+  /** Default port replication server listens on. */
+  public static final int DEFAULT_REPLICATION_SERVER_PORT = 16040;
+
+  /** default port for replication server web api */
+  public static final int DEFAULT_REPLICATION_SERVER_INFOPORT = 16050;
+
+  // Request counter.
+  final LongAdder requestCount = new LongAdder();
+
+  // Server to handle client requests.
+  final RpcServerInterface rpcServer;
+  final InetSocketAddress isa;
+
+  protected final HReplicationServer replicationServer;
+
+  // The reference to the priority extraction function
+  private final PriorityFunction priority;
+
+  private AccessChecker accessChecker;
+  private ZKPermissionWatcher zkPermissionWatcher;
+
+  public ReplicationServerRpcServices(final HReplicationServer rs) throws IOException {
+    final Configuration conf = rs.getConfiguration();
+    replicationServer = rs;
+
+    final RpcSchedulerFactory rpcSchedulerFactory;
+    try {
+      rpcSchedulerFactory = getRpcSchedulerFactoryClass().asSubclass(RpcSchedulerFactory.class)
+          .getDeclaredConstructor().newInstance();
+    } catch (NoSuchMethodException | InvocationTargetException |
+        InstantiationException | IllegalAccessException e) {
+      throw new IllegalArgumentException(e);
+    }
+    // Server to handle client requests.
+    final InetSocketAddress initialIsa;
+    final InetSocketAddress bindAddress;
+
+    String hostname = DNS.getHostname(conf, ServerType.REPLICATIONSERVER);
+    int port = conf.getInt(REPLICATION_SERVER_PORT, DEFAULT_REPLICATION_SERVER_PORT);
+    // Creation of a HSA will force a resolve.
+    initialIsa = new InetSocketAddress(hostname, port);
+    bindAddress = new InetSocketAddress(
+        conf.get("hbase.replicationserver.ipc.address", hostname), port);
+
+    if (initialIsa.getAddress() == null) {
+      throw new IllegalArgumentException("Failed resolve of " + initialIsa);
+    }
+    priority = createPriority();
+    // Using Address means we don't get the IP too. Shorten it more even to just the host name
+    // w/o the domain.
+    final String name = rs.getProcessName() + "/" +
+        Address.fromParts(initialIsa.getHostName(), initialIsa.getPort()).toStringWithoutDomain();
+    // Set how many times to retry talking to another server over Connection.
+    ConnectionUtils.setServerSideHConnectionRetriesConfig(conf, name, LOG);
+    rpcServer = createRpcServer(rs, rpcSchedulerFactory, bindAddress, name);
+
+    final InetSocketAddress address = rpcServer.getListenerAddress();
+    if (address == null) {
+      throw new IOException("Listener channel is closed");
+    }
+    // Set our address, however we need the final port that was given to rpcServer
+    isa = new InetSocketAddress(initialIsa.getHostName(), address.getPort());
+    rpcServer.setErrorHandler(this);
+    rs.setName(name);
+  }
+
+  protected RpcServerInterface createRpcServer(
+      final Server server,
+      final RpcSchedulerFactory rpcSchedulerFactory,
+      final InetSocketAddress bindAddress,
+      final String name
+  ) throws IOException {
+    final Configuration conf = server.getConfiguration();
+    boolean reservoirEnabled = conf.getBoolean(ByteBuffAllocator.ALLOCATOR_POOL_ENABLED_KEY, true);
+    try {
+      return RpcServerFactory.createRpcServer(server, name, getServices(),
+          bindAddress, // use final bindAddress for this server.
+          conf, rpcSchedulerFactory.create(conf, this, server), reservoirEnabled);
+    } catch (BindException be) {
+      throw new IOException(be.getMessage() + ". To switch ports use the '"
+          + REPLICATION_SERVER_PORT + "' configuration property.",
+          be.getCause() != null ? be.getCause() : be);
+    }
+  }
+
+  protected Class<?> getRpcSchedulerFactoryClass() {
+    final Configuration conf = replicationServer.getConfiguration();
+    return conf.getClass(RSRpcServices.REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
+      SimpleRpcSchedulerFactory.class);
+  }
+
+  public PriorityFunction getPriority() {
+    return priority;
+  }
+
+  public Configuration getConfiguration() {
+    return replicationServer.getConfiguration();
+  }
+
+  void start(ZKWatcher zkWatcher) {
+    if (AccessChecker.isAuthorizationSupported(getConfiguration())) {
+      accessChecker = new AccessChecker(getConfiguration());
+    } else {
+      accessChecker = new NoopAccessChecker(getConfiguration());
+    }
+    if (!getConfiguration().getBoolean("hbase.testing.nocluster", false) && zkWatcher != null) {
+      zkPermissionWatcher =
+          new ZKPermissionWatcher(zkWatcher, accessChecker.getAuthManager(), getConfiguration());
+      try {
+        zkPermissionWatcher.start();
+      } catch (KeeperException e) {
+        LOG.error("ZooKeeper permission watcher initialization failed", e);
+      }
+    }
+    rpcServer.start();
+  }
+
+  void stop() {
+    if (zkPermissionWatcher != null) {
+      zkPermissionWatcher.close();
+    }
+    rpcServer.stop();
+  }
+
+  /**
+   * By default, put up an Admin Service.
+   * @return immutable list of blocking services and the security info classes that this server
+   *   supports
+   */
+  protected List<BlockingServiceAndInterface> getServices() {
+    List<BlockingServiceAndInterface> bssi = new ArrayList<>();
+    bssi.add(new BlockingServiceAndInterface(
+      AdminService.newReflectiveBlockingService(this),
+      AdminService.BlockingInterface.class));
+    return new ImmutableList.Builder<BlockingServiceAndInterface>().addAll(bssi).build();
+  }
+
+  public InetSocketAddress getSocketAddress() {
+    return isa;
+  }
+
+  @Override
+  public int getPriority(RequestHeader header, Message param, User user) {
+    return priority.getPriority(header, param, user);
+  }
+
+  @Override
+  public long getDeadline(RequestHeader header, Message param) {
+    return priority.getDeadline(header, param);
+  }
+
+  /*
+   * Check if an OOME and, if so, abort immediately to avoid creating more objects.
+   *
+   * @param e
+   *
+   * @return True if we OOME'd and are aborting.
+   */
+  @Override
+  public boolean checkOOME(final Throwable e) {
+    return exitIfOOME(e);
+  }
+
+  public static boolean exitIfOOME(final Throwable e) {
+    boolean stop = false;
+    try {
+      if (e instanceof OutOfMemoryError
+          || (e.getCause() != null && e.getCause() instanceof OutOfMemoryError)
+          || (e.getMessage() != null && e.getMessage().contains(
+              "java.lang.OutOfMemoryError"))) {
+        stop = true;
+        LOG.error(HBaseMarkers.FATAL, "Run out of memory; "
+          + ReplicationServerRpcServices.class.getSimpleName() + " will abort itself immediately",
+          e);
+      }
+    } finally {
+      if (stop) {
+        Runtime.getRuntime().halt(1);
+      }
+    }
+    return stop;
+  }
+
+  /**
+   * Called to verify that this server is up and running.
+   */
+  protected void checkOpen() throws IOException {
+    if (replicationServer.isAborted()) {
+      throw new RegionServerAbortedException("Server " + replicationServer.getServerName()
+          + " aborting");
+    }
+    if (replicationServer.isStopped()) {
+      throw new RegionServerStoppedException("Server " + replicationServer.getServerName()
+          + " stopping");
+    }
+    if (!replicationServer.isOnline()) {
+      throw new ServerNotRunningYetException("Server " + replicationServer.getServerName()
+          + " is not running yet");
+    }
+  }
+
+  @Override
+  public GetRegionInfoResponse getRegionInfo(RpcController controller, GetRegionInfoRequest request)
+      throws ServiceException {
+    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
+  }
+
+  @Override
+  public GetStoreFileResponse getStoreFile(RpcController controller, GetStoreFileRequest request)
+      throws ServiceException {
+    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
+  }
+
+  @Override
+  public GetOnlineRegionResponse getOnlineRegion(RpcController controller,
+      GetOnlineRegionRequest request) throws ServiceException {
+    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
+  }
+
+  @Override
+  public OpenRegionResponse openRegion(RpcController controller, OpenRegionRequest request)
+      throws ServiceException {
+    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
+  }
+
+  @Override
+  public WarmupRegionResponse warmupRegion(RpcController controller, WarmupRegionRequest request)
+      throws ServiceException {
+    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
+  }
+
+  @Override
+  public CloseRegionResponse closeRegion(RpcController controller, CloseRegionRequest request)
+      throws ServiceException {
+    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
+  }
+
+  @Override
+  public FlushRegionResponse flushRegion(RpcController controller, FlushRegionRequest request)
+      throws ServiceException {
+    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
+  }
+
+  @Override
+  public CompactionSwitchResponse compactionSwitch(RpcController controller,
+      CompactionSwitchRequest request) throws ServiceException {
+    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
+  }
+
+  @Override
+  public CompactRegionResponse compactRegion(RpcController controller,
+      CompactRegionRequest request) throws ServiceException {
+    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
+  }
+
+  @Override
+  public ReplicateWALEntryResponse replay(RpcController controller,
+      ReplicateWALEntryRequest request) throws ServiceException {
+    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
+  }
+
+  @Override
+  public RollWALWriterResponse rollWALWriter(RpcController controller, RollWALWriterRequest request)
+      throws ServiceException {
+    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
+  }
+
+  @Override
+  public GetServerInfoResponse getServerInfo(RpcController controller, GetServerInfoRequest request)
+      throws ServiceException {
+    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
+  }
+
+  /**
+   * Stop the replication server.
+   *
+   * @param controller the RPC controller
+   * @param request the request
+   */
+  @Override
+  @QosPriority(priority=HConstants.ADMIN_QOS)
+  public StopServerResponse stopServer(final RpcController controller,
+      final StopServerRequest request) {
+    requestCount.increment();
+    String reason = request.getReason();
+    replicationServer.stop(reason);
+    return StopServerResponse.newBuilder().build();
+  }
+
+  @Override
+  public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller,
+      UpdateFavoredNodesRequest request) throws ServiceException {
+    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
+  }
+
+  @Override
+  public UpdateConfigurationResponse updateConfiguration(RpcController controller,
+      UpdateConfigurationRequest request) throws ServiceException {
+    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
+  }
+
+  @Override
+  public GetRegionLoadResponse getRegionLoad(RpcController controller,
+      GetRegionLoadRequest request) throws ServiceException {
+    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
+  }
+
+  @Override
+  public ClearCompactionQueuesResponse clearCompactionQueues(RpcController controller,
+      ClearCompactionQueuesRequest request) throws ServiceException {
+    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
+  }
+
+  @Override
+  public ClearRegionBlockCacheResponse clearRegionBlockCache(RpcController controller,
+      ClearRegionBlockCacheRequest request) throws ServiceException {
+    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
+  }
+
+  @Override
+  public GetSpaceQuotaSnapshotsResponse getSpaceQuotaSnapshots(RpcController controller,
+      GetSpaceQuotaSnapshotsRequest request) throws ServiceException {
+    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
+  }
+
+  @Override
+  public ExecuteProceduresResponse executeProcedures(RpcController controller,
+      ExecuteProceduresRequest request) throws ServiceException {
+    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
+  }
+
+  @Override
+  public SlowLogResponses getSlowLogResponses(RpcController controller,
+      SlowLogResponseRequest request) throws ServiceException {
+    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
+  }
+
+  @Override
+  public SlowLogResponses getLargeLogResponses(RpcController controller,
+      SlowLogResponseRequest request) throws ServiceException {
+    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
+  }
+
+  @Override
+  public ClearSlowLogResponses clearSlowLogsResponses(RpcController controller,
+      ClearSlowLogResponseRequest request) throws ServiceException {
+    throw new ServiceException(new UnsupportedOperationException("This's Replication Server"));
+  }
+
+  protected AccessChecker getAccessChecker() {
+    return accessChecker;
+  }
+
+  protected PriorityFunction createPriority() {
+    return new PriorityFunction() {
+      @Override
+      public int getPriority(RequestHeader header, Message param, User user) {
+        return 0;
+      }
+
+      @Override
+      public long getDeadline(RequestHeader header, Message param) {
+        return 0;
+      }
+    };
+  }
+
+  @Override
+  public ReplicateWALEntryResponse replicateWALEntry(RpcController controller,
+      ReplicateWALEntryRequest request) throws ServiceException {
+    try {
+      checkOpen();
+      if (replicationServer.getReplicationSinkService() != null) {
+        requestCount.increment();
+        List<WALEntry> entries = request.getEntryList();
+        CellScanner cellScanner = ((HBaseRpcController) controller).cellScanner();
+        // TODO: CP pre
+        replicationServer.getReplicationSinkService().replicateLogEntries(entries, cellScanner,
+            request.getReplicationClusterId(), request.getSourceBaseNamespaceDirPath(),
+            request.getSourceHFileArchiveDirPath());
+        // TODO: CP post
+        return ReplicateWALEntryResponse.newBuilder().build();
+      } else {
+        throw new ServiceException("Replication services are not initialized yet");
+      }
+    } catch (IOException ie) {
+      throw new ServiceException(ie);
+    }
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServer.java
new file mode 100644
index 0000000..6a0ef3d
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServer.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
+import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALKeyImpl;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category({ReplicationTests.class, MediumTests.class})
+public class TestReplicationServer {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestReplicationServer.class);
+
+  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationServer.class);
+
+  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  private static Configuration CONF = TEST_UTIL.getConfiguration();
+
+  private static HMaster MASTER;
+
+  private static HReplicationServer replicationServer;
+
+  private static Path baseNamespaceDir;
+  private static Path hfileArchiveDir;
+  private static String replicationClusterId;
+
+  private static int BATCH_SIZE = 10;
+
+  private static TableName TABLENAME = TableName.valueOf("t");
+  private static String FAMILY = "C";
+
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    TEST_UTIL.startMiniCluster();
+    MASTER = TEST_UTIL.getMiniHBaseCluster().getMaster();
+
+    replicationServer = new HReplicationServer(CONF);
+    replicationServer.start();
+
+    TEST_UTIL.getMiniHBaseCluster().waitForActiveAndReadyMaster();
+    TEST_UTIL.waitFor(60000, () -> replicationServer.isOnline());
+
+    Path rootDir = CommonFSUtils.getRootDir(CONF);
+    baseNamespaceDir = new Path(rootDir, new Path(HConstants.BASE_NAMESPACE_DIR));
+    hfileArchiveDir = new Path(rootDir, new Path(HConstants.HFILE_ARCHIVE_DIRECTORY));
+    replicationClusterId = "12345";
+  }
+
+  @AfterClass
+  public static void afterClass() throws IOException {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Before
+  public void before() throws Exception {
+    TEST_UTIL.createTable(TABLENAME, FAMILY);
+    TEST_UTIL.waitTableAvailable(TABLENAME);
+  }
+
+  @After
+  public void after() throws IOException {
+    TEST_UTIL.deleteTableIfAny(TABLENAME);
+  }
+
+  @Test
+  public void testReplicateWAL() throws Exception {
+    AsyncClusterConnection conn = TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().get(0)
+        .getRegionServer().getAsyncClusterConnection();
+    AsyncRegionServerAdmin rsAdmin = conn.getRegionServerAdmin(replicationServer.getServerName());
+
+    Entry[] entries = new Entry[BATCH_SIZE];
+    for(int i = 0; i < BATCH_SIZE; i++) {
+      entries[i] = generateEdit(i, TABLENAME, Bytes.toBytes(i));
+    }
+
+    ReplicationProtbufUtil.replicateWALEntry(rsAdmin, entries, replicationClusterId,
+        baseNamespaceDir, hfileArchiveDir, 1000);
+
+    for (int i = 0; i < BATCH_SIZE; i++) {
+      Table table = TEST_UTIL.getConnection().getTable(TABLENAME);
+      Result result = table.get(new Get(Bytes.toBytes(i)));
+      Cell cell = result.getColumnLatestCell(Bytes.toBytes(FAMILY), Bytes.toBytes(FAMILY));
+      assertNotNull(cell);
+      assertTrue(Bytes.equals(CellUtil.cloneValue(cell), Bytes.toBytes(i)));
+    }
+  }
+
+  private static WAL.Entry generateEdit(int i, TableName tableName, byte[] row) {
+    Threads.sleep(1);
+    long timestamp = System.currentTimeMillis();
+    WALKeyImpl key = new WALKeyImpl(new byte[32], tableName, i, timestamp,
+        HConstants.DEFAULT_CLUSTER_ID, null);
+    WALEdit edit = new WALEdit();
+    edit.add(new KeyValue(row, Bytes.toBytes(FAMILY), Bytes.toBytes(FAMILY), timestamp, row));
+    return new WAL.Entry(key, edit);
+  }
+}


[hbase] 03/05: HBASE-24735: Refactor ReplicationSourceManager: move logPositionAndCleanOldLogs/cleanUpHFileRefs to ReplicationSource inside (#2064)

Posted by zg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zghao pushed a commit to branch HBASE-24666
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit d490870ba8c2bbfbdcbd4cef1801bf5f0c57986e
Author: Guanghao Zhang <zg...@apache.org>
AuthorDate: Tue Aug 11 20:07:09 2020 +0800

    HBASE-24735: Refactor ReplicationSourceManager: move logPositionAndCleanOldLogs/cleanUpHFileRefs to ReplicationSource inside (#2064)
    
    Signed-off-by: Wellington Chevreuil <wc...@apache.org>
---
 .../regionserver/RecoveredReplicationSource.java   |  18 ++-
 .../regionserver/ReplicationSource.java            | 160 +++++++++++++++++++--
 .../regionserver/ReplicationSourceInterface.java   |  33 +++--
 .../regionserver/ReplicationSourceManager.java     | 158 ++------------------
 .../regionserver/ReplicationSourceShipper.java     |  17 +--
 .../regionserver/ReplicationSourceWALReader.java   |  17 +--
 .../SerialReplicationSourceWALReader.java          |   1 +
 .../replication/regionserver/WALEntryBatch.java    |   2 +-
 .../hbase/replication/ReplicationSourceDummy.java  |  24 ++--
 .../regionserver/TestReplicationSource.java        |  12 +-
 .../regionserver/TestReplicationSourceManager.java |  50 +++----
 .../regionserver/TestWALEntryStream.java           |  20 +--
 12 files changed, 258 insertions(+), 254 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
index 00aa026..62685ee 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.PriorityBlockingQueue;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -44,15 +45,18 @@ public class RecoveredReplicationSource extends ReplicationSource {
 
   private static final Logger LOG = LoggerFactory.getLogger(RecoveredReplicationSource.class);
 
+  private Path walDir;
+
   private String actualPeerId;
 
   @Override
-  public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
-      ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
-      String peerClusterZnode, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
-      MetricsSource metrics) throws IOException {
-    super.init(conf, fs, manager, queueStorage, replicationPeer, server, peerClusterZnode,
+  public void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSourceManager manager,
+    ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
+    String peerClusterZnode, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
+    MetricsSource metrics) throws IOException {
+    super.init(conf, fs, walDir, manager, queueStorage, replicationPeer, server, peerClusterZnode,
       clusterId, walFileLengthProvider, metrics);
+    this.walDir = walDir;
     this.actualPeerId = this.replicationQueueInfo.getPeerId();
   }
 
@@ -93,7 +97,7 @@ public class RecoveredReplicationSource extends ReplicationSource {
               deadRsDirectory.suffix(AbstractFSWALProvider.SPLITTING_EXT), path.getName()) };
           for (Path possibleLogLocation : locs) {
             LOG.info("Possible location " + possibleLogLocation.toUri().toString());
-            if (manager.getFs().exists(possibleLogLocation)) {
+            if (this.fs.exists(possibleLogLocation)) {
               // We found the right new location
               LOG.info("Log " + path + " still exists at " + possibleLogLocation);
               newPaths.add(possibleLogLocation);
@@ -126,7 +130,7 @@ public class RecoveredReplicationSource extends ReplicationSource {
   // N.B. the ReplicationSyncUp tool sets the manager.getWALDir to the root of the wal
   // area rather than to the wal area for a particular region server.
   private Path getReplSyncUpPath(Path path) throws IOException {
-    FileStatus[] rss = fs.listStatus(manager.getLogDir());
+    FileStatus[] rss = fs.listStatus(walDir);
     for (FileStatus rs : rss) {
       Path p = rs.getPath();
       FileStatus[] logs = fs.listStatus(p);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 85c4657..fd9fb31 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -28,7 +28,9 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.NavigableSet;
 import java.util.TreeMap;
+import java.util.TreeSet;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.PriorityBlockingQueue;
@@ -36,6 +38,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Predicate;
+import java.util.stream.Collectors;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -52,16 +55,20 @@ import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
 import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
 import org.apache.hadoop.hbase.replication.ClusterMarkingEntryFilter;
 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
+import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationPeer;
 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ReplicationUtils;
 import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
 import org.apache.hadoop.hbase.replication.WALEntryFilter;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
@@ -93,7 +100,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
   protected ReplicationQueueInfo replicationQueueInfo;
 
   // The manager of all sources to which we ping back our progress
-  protected ReplicationSourceManager manager;
+  ReplicationSourceManager manager;
   // Should we stop everything?
   protected Server server;
   // How long should we sleep for each retry
@@ -130,8 +137,6 @@ public class ReplicationSource implements ReplicationSourceInterface {
   protected final ConcurrentHashMap<String, ReplicationSourceShipper> workerThreads =
       new ConcurrentHashMap<>();
 
-  private AtomicLong totalBufferUsed;
-
   public static final String WAIT_ON_ENDPOINT_SECONDS =
     "hbase.replication.wait.on.endpoint.seconds";
   public static final int DEFAULT_WAIT_ON_ENDPOINT_SECONDS = 30;
@@ -183,7 +188,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
    * @param metrics metrics for replication source
    */
   @Override
-  public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
+  public void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSourceManager manager,
       ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
       String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
       MetricsSource metrics) throws IOException {
@@ -211,7 +216,6 @@ public class ReplicationSource implements ReplicationSourceInterface {
     defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
     currentBandwidth = getCurrentBandwidth();
     this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0);
-    this.totalBufferUsed = manager.getTotalBufferUsed();
     this.walFileLengthProvider = walFileLengthProvider;
     LOG.info("queueId={}, ReplicationSource: {}, currentBandwidth={}", queueId,
       replicationPeer.getId(), this.currentBandwidth);
@@ -399,9 +403,9 @@ public class ReplicationSource implements ReplicationSourceInterface {
 
   private ReplicationSourceWALReader createNewWALReader(String walGroupId,
       PriorityBlockingQueue<Path> queue, long startPosition) {
-    return replicationPeer.getPeerConfig().isSerial()
-      ? new SerialReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this)
-      : new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this);
+    return replicationPeer.getPeerConfig().isSerial() ?
+      new SerialReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this) :
+      new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this);
   }
 
   /**
@@ -427,11 +431,6 @@ public class ReplicationSource implements ReplicationSourceInterface {
   }
 
   @Override
-  public ReplicationSourceManager getSourceManager() {
-    return this.manager;
-  }
-
-  @Override
   public void tryThrottle(int batchSize) throws InterruptedException {
     checkBandwidthChangeAndResetThrottler();
     if (throttler.isEnabled()) {
@@ -735,7 +734,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
       throttler.addPushSize(batchSize);
     }
     totalReplicatedEdits.addAndGet(entries.size());
-    long newBufferUsed = totalBufferUsed.addAndGet(-batchSize);
+    long newBufferUsed = manager.getTotalBufferUsed().addAndGet(-batchSize);
     // Record the new buffer usage
     this.manager.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
   }
@@ -770,4 +769,137 @@ public class ReplicationSource implements ReplicationSourceInterface {
   private String logPeerId(){
     return "[Source for peer " + this.getPeer().getId() + "]:";
   }
+
+  @VisibleForTesting
+  public void setWALPosition(WALEntryBatch entryBatch) {
+    String fileName = entryBatch.getLastWalPath().getName();
+    interruptOrAbortWhenFail(() -> this.queueStorage
+      .setWALPosition(server.getServerName(), getQueueId(), fileName,
+        entryBatch.getLastWalPosition(), entryBatch.getLastSeqIds()));
+  }
+
+  @VisibleForTesting
+  public void cleanOldWALs(String log, boolean inclusive) {
+    NavigableSet<String> walsToRemove = getWalsToRemove(log, inclusive);
+    if (walsToRemove.isEmpty()) {
+      return;
+    }
+    // cleanOldWALs may spend some time, especially for sync replication where we may want to
+    // remove remote wals as the remote cluster may have already been down, so we do it outside
+    // the lock to avoid block preLogRoll
+    cleanOldWALs(walsToRemove);
+  }
+
+  private NavigableSet<String> getWalsToRemove(String log, boolean inclusive) {
+    NavigableSet<String> walsToRemove = new TreeSet<>();
+    String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(log);
+    try {
+      this.queueStorage.getWALsInQueue(this.server.getServerName(), getQueueId()).forEach(wal -> {
+        LOG.debug("getWalsToRemove wal {}", wal);
+        String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal);
+        if (walPrefix.equals(logPrefix)) {
+          walsToRemove.add(wal);
+        }
+      });
+    } catch (ReplicationException e) {
+      // Just log the exception here, as the recovered replication source will try to cleanup again.
+      LOG.warn("Failed to read wals in queue {}", getQueueId(), e);
+    }
+    return walsToRemove.headSet(log, inclusive);
+  }
+
+  private void removeRemoteWALs(String peerId, String remoteWALDir, Collection<String> wals)
+    throws IOException {
+    Path remoteWALDirForPeer = ReplicationUtils.getPeerRemoteWALDir(remoteWALDir, peerId);
+    FileSystem fs = ReplicationUtils.getRemoteWALFileSystem(conf, remoteWALDir);
+    for (String wal : wals) {
+      Path walFile = new Path(remoteWALDirForPeer, wal);
+      try {
+        if (!fs.delete(walFile, false) && fs.exists(walFile)) {
+          throw new IOException("Can not delete " + walFile);
+        }
+      } catch (FileNotFoundException e) {
+        // Just ignore since this means the file has already been deleted.
+        // The javadoc of the FileSystem.delete methods does not specify the behavior of deleting an
+        // inexistent file, so here we deal with both, i.e, check the return value of the
+        // FileSystem.delete, and also catch FNFE.
+        LOG.debug("The remote wal {} has already been deleted?", walFile, e);
+      }
+    }
+  }
+
+  private void cleanOldWALs(NavigableSet<String> wals) {
+    LOG.debug("Removing {} logs in the list: {}", wals.size(), wals);
+    // The intention here is that, we want to delete the remote wal files ASAP as it may effect the
+    // failover time if you want to transit the remote cluster from S to A. And the infinite retry
+    // is not a problem, as if we can not contact with the remote HDFS cluster, then usually we can
+    // not contact with the HBase cluster either, so the replication will be blocked either.
+    if (isSyncReplication()) {
+      String peerId = getPeerId();
+      String remoteWALDir = replicationPeer.getPeerConfig().getRemoteWALDir();
+      // Filter out the wals need to be removed from the remote directory. Its name should be the
+      // special format, and also, the peer id in its name should match the peer id for the
+      // replication source.
+      List<String> remoteWals = wals.stream().filter(w -> SyncReplicationWALProvider
+        .getSyncReplicationPeerIdFromWALName(w).map(peerId::equals).orElse(false))
+        .collect(Collectors.toList());
+      LOG.debug("Removing {} logs from remote dir {} in the list: {}", remoteWals.size(),
+        remoteWALDir, remoteWals);
+      if (!remoteWals.isEmpty()) {
+        for (int sleepMultiplier = 0;;) {
+          try {
+            removeRemoteWALs(peerId, remoteWALDir, remoteWals);
+            break;
+          } catch (IOException e) {
+            LOG.warn("Failed to delete remote wals from remote dir {} for peer {}", remoteWALDir,
+              peerId);
+          }
+          if (!isSourceActive()) {
+            // skip the following operations
+            return;
+          }
+          if (ReplicationUtils.sleepForRetries("Failed to delete remote wals", sleepForRetries,
+            sleepMultiplier, maxRetriesMultiplier)) {
+            sleepMultiplier++;
+          }
+        }
+      }
+    }
+    for (String wal : wals) {
+      interruptOrAbortWhenFail(
+        () -> this.queueStorage.removeWAL(server.getServerName(), getQueueId(), wal));
+    }
+  }
+
+  public void cleanUpHFileRefs(List<String> files) {
+    interruptOrAbortWhenFail(() -> this.queueStorage.removeHFileRefs(getPeerId(), files));
+  }
+
+  @FunctionalInterface
+  private interface ReplicationQueueOperation {
+    void exec() throws ReplicationException;
+  }
+
+  /**
+   * Refresh replication source will terminate the old source first, then the source thread will be
+   * interrupted. Need to handle it instead of abort the region server.
+   */
+  private void interruptOrAbortWhenFail(ReplicationQueueOperation op) {
+    try {
+      op.exec();
+    } catch (ReplicationException e) {
+      if (e.getCause() != null && e.getCause() instanceof KeeperException.SystemErrorException
+        && e.getCause().getCause() != null && e.getCause()
+        .getCause() instanceof InterruptedException) {
+        // ReplicationRuntimeException(a RuntimeException) is thrown out here. The reason is
+        // that thread is interrupted deep down in the stack, it should pass the following
+        // processing logic and propagate to the most top layer which can handle this exception
+        // properly. In this specific case, the top layer is ReplicationSourceShipper#run().
+        throw new ReplicationRuntimeException(
+          "Thread is interrupted, the replication source may be terminated",
+          e.getCause().getCause());
+      }
+      server.abort("Failed to operate on replication queue", e);
+    }
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
index 33a413f..321edc2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
@@ -43,15 +43,15 @@ public interface ReplicationSourceInterface {
 
   /**
    * Initializer for the source
-   * @param conf the configuration to use
-   * @param fs the file system to use
-   * @param manager the manager to use
+   *
+   * @param conf   the configuration to use
+   * @param fs     the file system to use
    * @param server the server for this region server
    */
-  void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
-      ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
-      String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
-      MetricsSource metrics) throws IOException;
+  void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSourceManager manager,
+    ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
+    String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
+    MetricsSource metrics) throws IOException;
 
   /**
    * Add a log to the list of logs to replicate
@@ -148,11 +148,6 @@ public interface ReplicationSourceInterface {
   ReplicationEndpoint getReplicationEndpoint();
 
   /**
-   * @return the replication source manager
-   */
-  ReplicationSourceManager getSourceManager();
-
-  /**
    * @return the wal file length provider
    */
   WALFileLengthProvider getWALFileLengthProvider();
@@ -192,4 +187,18 @@ public interface ReplicationSourceInterface {
   default boolean isRecovered() {
     return false;
   }
+
+  /**
+   * Set the current position of WAL to {@link ReplicationQueueStorage}
+   * @param entryBatch a batch of WAL entries to replicate
+   */
+  void setWALPosition(WALEntryBatch entryBatch);
+
+  /**
+   * Cleans a WAL and all older WALs from replication queue. Called when we are sure that a WAL is
+   * closed and has no more entries.
+   * @param walName the name of WAL
+   * @param inclusive whether we should also remove the given WAL
+   */
+  void cleanOldWALs(String walName, boolean inclusive);
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index a222f4b..3212697 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -17,10 +17,8 @@
  */
 package org.apache.hadoop.hbase.replication.regionserver;
 
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -59,14 +57,11 @@ import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
 import org.apache.hadoop.hbase.replication.ReplicationTracker;
-import org.apache.hadoop.hbase.replication.ReplicationUtils;
 import org.apache.hadoop.hbase.replication.SyncReplicationState;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
-import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider;
 import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -324,7 +319,7 @@ public class ReplicationSourceManager implements ReplicationListener {
     MetricsSource metrics = new MetricsSource(queueId);
     sourceMetrics.put(queueId, metrics);
     // init replication source
-    src.init(conf, fs, this, queueStorage, replicationPeer, server, queueId, clusterId,
+    src.init(conf, fs, logDir, this, queueStorage, replicationPeer, server, queueId, clusterId,
       walFileLengthProvider, metrics);
     return src;
   }
@@ -528,29 +523,6 @@ public class ReplicationSourceManager implements ReplicationListener {
     void exec() throws ReplicationException;
   }
 
-  /**
-   * Refresh replication source will terminate the old source first, then the source thread will be
-   * interrupted. Need to handle it instead of abort the region server.
-   */
-  private void interruptOrAbortWhenFail(ReplicationQueueOperation op) {
-    try {
-      op.exec();
-    } catch (ReplicationException e) {
-      if (e.getCause() != null && e.getCause() instanceof KeeperException.SystemErrorException
-          && e.getCause().getCause() != null && e.getCause()
-          .getCause() instanceof InterruptedException) {
-        // ReplicationRuntimeException(a RuntimeException) is thrown out here. The reason is
-        // that thread is interrupted deep down in the stack, it should pass the following
-        // processing logic and propagate to the most top layer which can handle this exception
-        // properly. In this specific case, the top layer is ReplicationSourceShipper#run().
-        throw new ReplicationRuntimeException(
-          "Thread is interrupted, the replication source may be terminated",
-          e.getCause().getCause());
-      }
-      server.abort("Failed to operate on replication queue", e);
-    }
-  }
-
   private void abortWhenFail(ReplicationQueueOperation op) {
     try {
       op.exec();
@@ -576,107 +548,6 @@ public class ReplicationSourceManager implements ReplicationListener {
     }
   }
 
-  /**
-   * This method will log the current position to storage. And also clean old logs from the
-   * replication queue.
-   * @param source the replication source
-   * @param entryBatch the wal entry batch we just shipped
-   */
-  public void logPositionAndCleanOldLogs(ReplicationSourceInterface source,
-      WALEntryBatch entryBatch) {
-    String fileName = entryBatch.getLastWalPath().getName();
-    interruptOrAbortWhenFail(() -> this.queueStorage.setWALPosition(server.getServerName(),
-      source.getQueueId(), fileName, entryBatch.getLastWalPosition(), entryBatch.getLastSeqIds()));
-    cleanOldLogs(fileName, entryBatch.isEndOfFile(), source);
-  }
-
-  /**
-   * Cleans a log file and all older logs from replication queue. Called when we are sure that a log
-   * file is closed and has no more entries.
-   * @param log Path to the log
-   * @param inclusive whether we should also remove the given log file
-   * @param source the replication source
-   */
-  @VisibleForTesting
-  void cleanOldLogs(String log, boolean inclusive,
-    ReplicationSourceInterface source) {
-    NavigableSet<String> walsToRemove;
-    synchronized (this.latestPaths) {
-      walsToRemove = getWalsToRemove(source.getQueueId(), log, inclusive);
-    }
-    if (walsToRemove.isEmpty()) {
-      return;
-    }
-    // cleanOldLogs may spend some time, especially for sync replication where we may want to
-    // remove remote wals as the remote cluster may have already been down, so we do it outside
-    // the lock to avoid block preLogRoll
-    cleanOldLogs(walsToRemove, source);
-  }
-
-  private void removeRemoteWALs(String peerId, String remoteWALDir, Collection<String> wals)
-      throws IOException {
-    Path remoteWALDirForPeer = ReplicationUtils.getPeerRemoteWALDir(remoteWALDir, peerId);
-    FileSystem fs = ReplicationUtils.getRemoteWALFileSystem(conf, remoteWALDir);
-    for (String wal : wals) {
-      Path walFile = new Path(remoteWALDirForPeer, wal);
-      try {
-        if (!fs.delete(walFile, false) && fs.exists(walFile)) {
-          throw new IOException("Can not delete " + walFile);
-        }
-      } catch (FileNotFoundException e) {
-        // Just ignore since this means the file has already been deleted.
-        // The javadoc of the FileSystem.delete methods does not specify the behavior of deleting an
-        // inexistent file, so here we deal with both, i.e, check the return value of the
-        // FileSystem.delete, and also catch FNFE.
-        LOG.debug("The remote wal {} has already been deleted?", walFile, e);
-      }
-    }
-  }
-
-  private void cleanOldLogs(NavigableSet<String> wals, ReplicationSourceInterface source) {
-    LOG.debug("Removing {} logs in the list: {}", wals.size(), wals);
-    // The intention here is that, we want to delete the remote wal files ASAP as it may effect the
-    // failover time if you want to transit the remote cluster from S to A. And the infinite retry
-    // is not a problem, as if we can not contact with the remote HDFS cluster, then usually we can
-    // not contact with the HBase cluster either, so the replication will be blocked either.
-    if (source.isSyncReplication()) {
-      String peerId = source.getPeerId();
-      String remoteWALDir = source.getPeer().getPeerConfig().getRemoteWALDir();
-      // Filter out the wals need to be removed from the remote directory. Its name should be the
-      // special format, and also, the peer id in its name should match the peer id for the
-      // replication source.
-      List<String> remoteWals = wals.stream().filter(w -> SyncReplicationWALProvider
-        .getSyncReplicationPeerIdFromWALName(w).map(peerId::equals).orElse(false))
-        .collect(Collectors.toList());
-      LOG.debug("Removing {} logs from remote dir {} in the list: {}", remoteWals.size(),
-        remoteWALDir, remoteWals);
-      if (!remoteWals.isEmpty()) {
-        for (int sleepMultiplier = 0;;) {
-          try {
-            removeRemoteWALs(peerId, remoteWALDir, remoteWals);
-            break;
-          } catch (IOException e) {
-            LOG.warn("Failed to delete remote wals from remote dir {} for peer {}", remoteWALDir,
-              peerId);
-          }
-          if (!source.isSourceActive()) {
-            // skip the following operations
-            return;
-          }
-          if (ReplicationUtils.sleepForRetries("Failed to delete remote wals", sleepForRetries,
-            sleepMultiplier, maxRetriesMultiplier)) {
-            sleepMultiplier++;
-          }
-        }
-      }
-    }
-    String queueId = source.getQueueId();
-    for (String wal : wals) {
-      interruptOrAbortWhenFail(
-        () -> this.queueStorage.removeWAL(server.getServerName(), queueId, wal));
-    }
-  }
-
   // public because of we call it in TestReplicationEmptyWALRecovery
   @VisibleForTesting
   public void preLogRoll(Path newLog) throws IOException {
@@ -1092,10 +963,6 @@ public class ReplicationSourceManager implements ReplicationListener {
     }
   }
 
-  public void cleanUpHFileRefs(String peerId, List<String> files) {
-    interruptOrAbortWhenFail(() -> this.queueStorage.removeHFileRefs(peerId, files));
-  }
-
   int activeFailoverTaskCount() {
     return executor.getActiveCount();
   }
@@ -1104,20 +971,13 @@ public class ReplicationSourceManager implements ReplicationListener {
     return this.globalMetrics;
   }
 
-  private NavigableSet<String> getWalsToRemove(String queueId, String log, boolean inclusive) {
-    NavigableSet<String> walsToRemove = new TreeSet<>();
-    String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(log);
-    try {
-      this.queueStorage.getWALsInQueue(this.server.getServerName(), queueId).forEach(wal -> {
-        String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal);
-        if (walPrefix.equals(logPrefix)) {
-          walsToRemove.add(wal);
-        }
-      });
-    } catch (ReplicationException e) {
-      // Just log the exception here, as the recovered replication source will try to cleanup again.
-      LOG.warn("Failed to read wals in queue {}", queueId, e);
-    }
-    return walsToRemove.headSet(log, inclusive);
+  @InterfaceAudience.Private
+  Server getServer() {
+    return this.server;
+  }
+
+  @InterfaceAudience.Private
+  ReplicationQueueStorage getQueueStorage() {
+    return this.queueStorage;
   }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
index 72cc5e82..4250c76 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
@@ -240,12 +240,6 @@ public class ReplicationSourceShipper extends Thread {
   }
 
   private void cleanUpHFileRefs(WALEdit edit) throws IOException {
-    String peerId = source.getPeerId();
-    if (peerId.contains("-")) {
-      // peerClusterZnode will be in the form peerId + "-" + rsZNode.
-      // A peerId will not have "-" in its name, see HBASE-11394
-      peerId = peerId.split("-")[0];
-    }
     List<Cell> cells = edit.getCells();
     int totalCells = cells.size();
     for (int i = 0; i < totalCells; i++) {
@@ -256,7 +250,7 @@ public class ReplicationSourceShipper extends Thread {
         int totalStores = stores.size();
         for (int j = 0; j < totalStores; j++) {
           List<String> storeFileList = stores.get(j).getStoreFileList();
-          source.getSourceManager().cleanUpHFileRefs(peerId, storeFileList);
+          source.cleanUpHFileRefs(storeFileList);
           source.getSourceMetrics().decrSizeOfHFileRefsQueue(storeFileList.size());
         }
       }
@@ -268,10 +262,11 @@ public class ReplicationSourceShipper extends Thread {
     // if end of file is true, then the logPositionAndCleanOldLogs method will remove the file
     // record on zk, so let's call it. The last wal position maybe zero if end of file is true and
     // there is no entry in the batch. It is OK because that the queue storage will ignore the zero
-    // position and the file will be removed soon in cleanOldLogs.
-    if (batch.isEndOfFile() || !batch.getLastWalPath().equals(currentPath) ||
-      batch.getLastWalPosition() != currentPosition) {
-      source.getSourceManager().logPositionAndCleanOldLogs(source, batch);
+    // position and the file will be removed soon in cleanOldWALs.
+    if (batch.isEndOfFile() || !batch.getLastWalPath().equals(currentPath)
+      || batch.getLastWalPosition() != currentPosition) {
+      source.setWALPosition(batch);
+      source.cleanOldWALs(batch.getLastWalPath().getName(), batch.isEndOfFile());
       updated = true;
     }
     // if end of file is true, then we can just skip to the next file in queue.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index c71db1b..22cbd97 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -25,7 +25,6 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -74,9 +73,6 @@ class ReplicationSourceWALReader extends Thread {
   //Indicates whether this particular worker is running
   private boolean isReaderRunning = true;
 
-  private AtomicLong totalBufferUsed;
-  private long totalBufferQuota;
-
   /**
    * Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the
    * entries, and puts them on a batch queue.
@@ -102,8 +98,6 @@ class ReplicationSourceWALReader extends Thread {
     // memory used will be batchSizeCapacity * (nb.batches + 1)
     // the +1 is for the current thread reading before placing onto the queue
     int batchCount = conf.getInt("replication.source.nb.batches", 1);
-    this.totalBufferUsed = source.getSourceManager().getTotalBufferUsed();
-    this.totalBufferQuota = source.getSourceManager().getTotalBufferLimit();
     this.sleepForRetries =
         this.conf.getLong("replication.source.sleepforretries", 1000);    // 1 second
     this.maxRetriesMultiplier =
@@ -273,9 +267,10 @@ class ReplicationSourceWALReader extends Thread {
   //returns false if we've already exceeded the global quota
   private boolean checkQuota() {
     // try not to go over total quota
-    if (totalBufferUsed.get() > totalBufferQuota) {
+    if (source.manager.getTotalBufferUsed().get() > source.manager.getTotalBufferLimit()) {
       LOG.warn("peer={}, can't read more edits from WAL as buffer usage {}B exceeds limit {}B",
-          this.source.getPeerId(), totalBufferUsed.get(), totalBufferQuota);
+        this.source.getPeerId(), source.manager.getTotalBufferUsed().get(),
+        source.manager.getTotalBufferLimit());
       Threads.sleep(sleepForRetries);
       return false;
     }
@@ -404,10 +399,10 @@ class ReplicationSourceWALReader extends Thread {
    * @return true if we should clear buffer and push all
    */
   private boolean acquireBufferQuota(long size) {
-    long newBufferUsed = totalBufferUsed.addAndGet(size);
+    long newBufferUsed = source.manager.getTotalBufferUsed().addAndGet(size);
     // Record the new buffer usage
-    this.source.getSourceManager().getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
-    return newBufferUsed >= totalBufferQuota;
+    source.manager.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
+    return newBufferUsed >= source.manager.getTotalBufferLimit();
   }
 
   /**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java
index 9edcc8a..2108ddc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.replication.regionserver;
 
 import java.io.IOException;
 import java.util.concurrent.PriorityBlockingQueue;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
index 4f96c96..591b44d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
@@ -31,7 +31,7 @@ import org.apache.yetus.audience.InterfaceAudience;
  * Holds a batch of WAL entries to replicate, along with some statistics
  */
 @InterfaceAudience.Private
-class WALEntryBatch {
+public class WALEntryBatch {
 
   // used by recovered replication queue to indicate that all the entries have been read.
   public static final WALEntryBatch NO_MORE_DATA = new WALEntryBatch(0, null);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
index 781a1da..b75a7ed 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
+import org.apache.hadoop.hbase.replication.regionserver.WALEntryBatch;
 import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 
@@ -38,7 +39,6 @@ import org.apache.hadoop.hbase.wal.WAL.Entry;
  */
 public class ReplicationSourceDummy implements ReplicationSourceInterface {
 
-  private ReplicationSourceManager manager;
   private ReplicationPeer replicationPeer;
   private String peerClusterId;
   private Path currentPath;
@@ -47,11 +47,10 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
   private AtomicBoolean startup = new AtomicBoolean(false);
 
   @Override
-  public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
-      ReplicationQueueStorage rq, ReplicationPeer rp, Server server, String peerClusterId,
-      UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics)
-      throws IOException {
-    this.manager = manager;
+  public void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSourceManager manager,
+    ReplicationQueueStorage rq, ReplicationPeer rp, Server server, String peerClusterId,
+    UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics)
+    throws IOException {
     this.peerClusterId = peerClusterId;
     this.metrics = metrics;
     this.walFileLengthProvider = walFileLengthProvider;
@@ -133,11 +132,6 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
   }
 
   @Override
-  public ReplicationSourceManager getSourceManager() {
-    return manager;
-  }
-
-  @Override
   public void tryThrottle(int batchSize) throws InterruptedException {
   }
 
@@ -156,6 +150,14 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
   }
 
   @Override
+  public void setWALPosition(WALEntryBatch entryBatch) {
+  }
+
+  @Override
+  public void cleanOldWALs(String walName, boolean inclusive) {
+  }
+
+  @Override
   public ReplicationPeer getPeer() {
     return replicationPeer;
   }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
index 15f202f..a58d1a3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
@@ -126,11 +126,11 @@ public class TestReplicationSource {
       thenReturn(DoNothingReplicationEndpoint.class.getName());
     Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
     ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
-    Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
+    Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
     String queueId = "qid";
     RegionServerServices rss =
       TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
-    rs.init(conf, null, manager, null, mockPeer, rss, queueId, null,
+    rs.init(conf, null, null, manager, null, mockPeer, rss, queueId, null,
       p -> OptionalLong.empty(), new MetricsSource(queueId));
     try {
       rs.startup();
@@ -164,11 +164,11 @@ public class TestReplicationSource {
       thenReturn(DoNothingReplicationEndpoint.class.getName());
     Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
     ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
-    Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
+    Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
     String queueId = "qid";
     RegionServerServices rss =
       TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
-    rs.init(conf, null, manager, null, mockPeer, rss, queueId,
+    rs.init(conf, null, null, manager, null, mockPeer, rss, queueId,
       uuid, p -> OptionalLong.empty(), new MetricsSource(queueId));
     try {
       rs.startup();
@@ -255,8 +255,8 @@ public class TestReplicationSource {
       Configuration testConf = HBaseConfiguration.create();
       testConf.setInt("replication.source.maxretriesmultiplier", 1);
       ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
-      Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
-      source.init(testConf, null, manager, null, mockPeer, null, "testPeer",
+      Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
+      source.init(testConf, null, null, manager, null, mockPeer, null, "testPeer",
         null, p -> OptionalLong.empty(), null);
       ExecutorService executor = Executors.newSingleThreadExecutor();
       Future<?> future = executor.submit(
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index 8e38114..4b685ce 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -35,6 +35,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.NavigableSet;
+import java.util.OptionalLong;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeMap;
@@ -333,8 +334,9 @@ public abstract class TestReplicationSourceManager {
     when(source.getQueueId()).thenReturn("1");
     when(source.isRecovered()).thenReturn(false);
     when(source.isSyncReplication()).thenReturn(false);
-    manager.logPositionAndCleanOldLogs(source,
-      new WALEntryBatch(0, manager.getSources().get(0).getCurrentPath()));
+    WALEntryBatch batch = new WALEntryBatch(0, manager.getSources().get(0).getCurrentPath());
+    source.setWALPosition(batch);
+    source.cleanOldWALs(batch.getLastWalPath().getName(), batch.isEndOfFile());
 
     wal.appendData(hri,
       new WALKeyImpl(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes),
@@ -408,11 +410,10 @@ public abstract class TestReplicationSourceManager {
     assertEquals(1, manager.getWalsByIdRecoveredQueues().size());
     String id = "1-" + server.getServerName().getServerName();
     assertEquals(files, manager.getWalsByIdRecoveredQueues().get(id).get(group));
-    ReplicationSourceInterface source = mock(ReplicationSourceInterface.class);
-    when(source.getQueueId()).thenReturn(id);
-    when(source.isRecovered()).thenReturn(true);
-    when(source.isSyncReplication()).thenReturn(false);
-    manager.cleanOldLogs(file2, false, source);
+    ReplicationSourceInterface source = new ReplicationSource();
+    source.init(conf, fs, null, manager, manager.getQueueStorage(), rp1.getPeer("1"),
+      manager.getServer(), id, null, p -> OptionalLong.empty(), null);
+    source.cleanOldWALs(file2, false);
     // log1 should be deleted
     assertEquals(Sets.newHashSet(file2), manager.getWalsByIdRecoveredQueues().get(id).get(group));
   }
@@ -589,19 +590,15 @@ public abstract class TestReplicationSourceManager {
     }
   }
 
-  private ReplicationSourceInterface mockReplicationSource(String peerId) {
-    ReplicationSourceInterface source = mock(ReplicationSourceInterface.class);
-    when(source.getPeerId()).thenReturn(peerId);
-    when(source.getQueueId()).thenReturn(peerId);
-    when(source.isRecovered()).thenReturn(false);
-    when(source.isSyncReplication()).thenReturn(true);
+  private ReplicationPeer mockReplicationPeerForSyncReplication(String peerId) {
     ReplicationPeerConfig config = mock(ReplicationPeerConfig.class);
     when(config.getRemoteWALDir())
       .thenReturn(remoteLogDir.makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString());
+    when(config.isSyncReplication()).thenReturn(true);
     ReplicationPeer peer = mock(ReplicationPeer.class);
     when(peer.getPeerConfig()).thenReturn(config);
-    when(source.getPeer()).thenReturn(peer);
-    return source;
+    when(peer.getId()).thenReturn(peerId);
+    return peer;
   }
 
   @Test
@@ -630,13 +627,19 @@ public abstract class TestReplicationSourceManager {
       manager.preLogRoll(wal);
       manager.postLogRoll(wal);
 
-      ReplicationSourceInterface source = mockReplicationSource(peerId2);
-      manager.cleanOldLogs(walName, true, source);
+      ReplicationSourceInterface source = new ReplicationSource();
+      source.init(conf, fs, null, manager, manager.getQueueStorage(),
+        mockReplicationPeerForSyncReplication(peerId2), manager.getServer(), peerId2, null,
+        p -> OptionalLong.empty(), null);
+      source.cleanOldWALs(walName, true);
       // still there if peer id does not match
       assertTrue(fs.exists(remoteWAL));
 
-      source = mockReplicationSource(slaveId);
-      manager.cleanOldLogs(walName, true, source);
+      source = new ReplicationSource();
+      source.init(conf, fs, null, manager, manager.getQueueStorage(),
+        mockReplicationPeerForSyncReplication(slaveId), manager.getServer(), slaveId, null,
+        p -> OptionalLong.empty(), null);
+      source.cleanOldWALs(walName, true);
       assertFalse(fs.exists(remoteWAL));
     } finally {
       removePeerAndWait(peerId2);
@@ -813,11 +816,10 @@ public abstract class TestReplicationSourceManager {
 
   static class FailInitializeDummyReplicationSource extends ReplicationSourceDummy {
 
-    @Override
-    public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
-        ReplicationQueueStorage rq, ReplicationPeer rp, Server server, String peerClusterId,
-        UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics)
-        throws IOException {
+    @Override public void init(Configuration conf, FileSystem fs, Path walDir,
+      ReplicationSourceManager manager, ReplicationQueueStorage rq, ReplicationPeer rp,
+      Server server, String peerClusterId, UUID clusterId,
+      WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
       throw new IOException("Failing deliberately");
     }
   }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
index 63e7a8b..9410604 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
@@ -369,23 +369,27 @@ public class TestWALEntryStream {
   }
 
   private ReplicationSource mockReplicationSource(boolean recovered, Configuration conf) {
-    ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class);
-    when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
-    when(mockSourceManager.getTotalBufferLimit()).thenReturn(
-        (long) HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
     Server mockServer = Mockito.mock(Server.class);
     ReplicationSource source = Mockito.mock(ReplicationSource.class);
-    when(source.getSourceManager()).thenReturn(mockSourceManager);
     when(source.getSourceMetrics()).thenReturn(new MetricsSource("1"));
     when(source.getWALFileLengthProvider()).thenReturn(log);
     when(source.getServer()).thenReturn(mockServer);
     when(source.isRecovered()).thenReturn(recovered);
-    MetricsReplicationGlobalSourceSource globalMetrics = Mockito.mock(
-        MetricsReplicationGlobalSourceSource.class);
-    when(mockSourceManager.getGlobalMetrics()).thenReturn(globalMetrics);
+    source.manager = mockReplicationSourceManager();
     return source;
   }
 
+  private ReplicationSourceManager mockReplicationSourceManager() {
+    ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class);
+    MetricsReplicationGlobalSourceSource globalMetrics =
+      Mockito.mock(MetricsReplicationGlobalSourceSource.class);
+    when(mockSourceManager.getGlobalMetrics()).thenReturn(globalMetrics);
+    when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
+    when(mockSourceManager.getTotalBufferLimit())
+      .thenReturn((long) HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
+    return mockSourceManager;
+  }
+
   private ReplicationSourceWALReader createReader(boolean recovered, Configuration conf) {
     ReplicationSource source = mockReplicationSource(recovered, conf);
     when(source.isPeerEnabled()).thenReturn(true);