You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2021/11/04 16:45:49 UTC

[hbase] branch HBASE-26233 updated (50acf0f -> 817b486)

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a change to branch HBASE-26233
in repository https://gitbox.apache.org/repos/asf/hbase.git.


 discard 50acf0f  HBASE-26407 Introduce a region replication sink for sinking WAL edits to secondary replicas directly (#3807)
     add cd7a509  HBASE-26311 Balancer gets stuck in cohosted replica distribution (#3724)
     add 6a78f0f  HBASE-26418 Update downloads.xml for release 2.4.8
     new 817b486  HBASE-26407 Introduce a region replication sink for sinking WAL edits to secondary replicas directly (#3807)

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (50acf0f)
            \
             N -- N -- N   refs/heads/HBASE-26233 (817b486)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../master/balancer/BalancerClusterState.java      | 53 ++++++----------------
 .../hbase/master/balancer/DoubleArrayCost.java     | 23 +++++++---
 .../hbase/master/balancer/MoveCostFunction.java    |  4 +-
 .../balancer/RegionCountSkewCostFunction.java      | 15 +-----
 .../master/balancer/TableSkewCostFunction.java     | 30 +++++++++++-
 .../balancer/StochasticBalancerTestBase.java       |  1 -
 .../balancer/StochasticBalancerTestBase2.java      |  1 -
 .../master/balancer/TestBaseLoadBalancer.java      |  2 -
 .../hbase/master/balancer/TestDoubleArrayCost.java |  2 +-
 .../balancer/TestStochasticLoadBalancer.java       |  6 +--
 .../TestStochasticLoadBalancerBalanceCluster.java  |  1 -
 .../TestStochasticLoadBalancerLargeCluster.java    |  1 -
 ...ochasticLoadBalancerRegionReplicaSameHosts.java |  1 -
 ...ochasticLoadBalancerRegionReplicaWithRacks.java |  6 ---
 .../TestStochasticLoadBalancerSmallCluster.java    |  2 +-
 .../balancer/TestStochasticBalancerJmxMetrics.java |  1 -
 src/site/xdoc/downloads.xml                        | 16 +++----
 17 files changed, 76 insertions(+), 89 deletions(-)

[hbase] 01/01: HBASE-26407 Introduce a region replication sink for sinking WAL edits to secondary replicas directly (#3807)

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch HBASE-26233
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 817b486424a70eb7be6e94444125d7036ed234c7
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Tue Nov 2 08:42:29 2021 +0800

    HBASE-26407 Introduce a region replication sink for sinking WAL edits to secondary replicas directly (#3807)
    
    Signed-off-by: Xiaolin Ha <ha...@apache.org>
---
 .../hbase/client/AsyncClusterConnection.java       |  13 +-
 .../hbase/client/AsyncClusterConnectionImpl.java   |  17 +-
 .../AsyncRegionReplicaReplayRetryingCaller.java    | 147 ------
 .../AsyncRegionReplicationRetryingCaller.java      | 103 +++++
 .../org/apache/hadoop/hbase/ipc/RpcServer.java     |  11 +
 .../hadoop/hbase/master/janitor/MetaFixer.java     |   9 -
 .../master/procedure/CreateTableProcedure.java     |  11 -
 .../master/procedure/ModifyTableProcedure.java     |   9 -
 .../apache/hadoop/hbase/regionserver/HRegion.java  |  79 +++-
 .../apache/hadoop/hbase/regionserver/HStore.java   |   3 +-
 .../MultiVersionConcurrencyControl.java            |  24 +-
 .../hbase/regionserver/RegionReplicationSink.java  | 228 +++++++++
 .../regionserver/handler/AssignRegionHandler.java  |  12 +-
 .../handler/UnassignRegionHandler.java             |  12 +-
 .../hbase/regionserver/wal/AbstractFSWAL.java      |   3 +-
 .../hadoop/hbase/regionserver/wal/WALUtil.java     |  44 +-
 .../regionserver/CatalogReplicationSource.java     |  47 --
 .../regionserver/CatalogReplicationSourcePeer.java |  50 --
 .../RegionReplicaReplicationEndpoint.java          | 407 ----------------
 .../regionserver/ReplicationSourceFactory.java     |   1 -
 .../regionserver/ReplicationSourceManager.java     |  87 ----
 .../hadoop/hbase/util/ServerRegionReplicaUtil.java |  32 --
 .../org/apache/hadoop/hbase/TestIOFencing.java     |   4 +-
 .../hbase/client/DummyAsyncClusterConnection.java  |  13 +-
 .../hadoop/hbase/regionserver/TestHRegion.java     |   2 +-
 .../regionserver/TestRegionReplicaFailover.java    |   4 +-
 ....java => TestMetaRegionReplicaReplication.java} | 184 ++------
 .../regionserver/TestRegionReplicaReplication.java | 273 +++++++++++
 .../TestRegionReplicaReplicationEndpoint.java      | 515 ---------------------
 ...stRegionReplicaReplicationEndpointNoMaster.java | 281 -----------
 30 files changed, 790 insertions(+), 1835 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
index 9502424..11c4f4f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
@@ -58,13 +58,6 @@ public interface AsyncClusterConnection extends AsyncConnection {
   CompletableFuture<FlushRegionResponse> flush(byte[] regionName, boolean writeFlushWALMarker);
 
   /**
-   * Replicate wal edits for replica regions. The return value is the edits we skipped, as the
-   * original return value is useless.
-   */
-  CompletableFuture<Long> replay(TableName tableName, byte[] encodedRegionName, byte[] row,
-      List<Entry> entries, int replicaId, int numRetries, long operationTimeoutNs);
-
-  /**
    * Return all the replicas for a region. Used for region replica replication.
    */
   CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[] row,
@@ -110,4 +103,10 @@ public interface AsyncClusterConnection extends AsyncConnection {
    * Get the bootstrap node list of another region server.
    */
   CompletableFuture<List<ServerName>> getAllBootstrapNodes(ServerName regionServer);
+
+  /**
+   * Replicate wal edits to a secondary replica.
+   */
+  CompletableFuture<Void> replicate(RegionInfo replica, List<Entry> entries, int numRetries,
+    long rpcTimeoutNs, long operationTimeoutNs);
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java
index 825fbb4..789d616 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java
@@ -85,14 +85,6 @@ class AsyncClusterConnectionImpl extends AsyncConnectionImpl implements AsyncClu
   }
 
   @Override
-  public CompletableFuture<Long> replay(TableName tableName, byte[] encodedRegionName, byte[] row,
-      List<Entry> entries, int replicaId, int retries, long operationTimeoutNs) {
-    return new AsyncRegionReplicaReplayRetryingCaller(RETRY_TIMER, this,
-      ConnectionUtils.retries2Attempts(retries), operationTimeoutNs, tableName, encodedRegionName,
-      row, entries, replicaId).call();
-  }
-
-  @Override
   public CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[] row,
       boolean reload) {
     return getLocator().getRegionLocations(tableName, row, RegionLocateType.CURRENT, reload, -1L);
@@ -176,4 +168,13 @@ class AsyncClusterConnectionImpl extends AsyncConnectionImpl implements AsyncClu
       });
     return future;
   }
+
+  @Override
+  public CompletableFuture<Void> replicate(RegionInfo replica,
+    List<Entry> entries, int retries, long rpcTimeoutNs,
+    long operationTimeoutNs) {
+    return new AsyncRegionReplicationRetryingCaller(RETRY_TIMER, this,
+      ConnectionUtils.retries2Attempts(retries), rpcTimeoutNs, operationTimeoutNs, replica, entries)
+        .call();
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicaReplayRetryingCaller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicaReplayRetryingCaller.java
deleted file mode 100644
index 0146c8b..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicaReplayRetryingCaller.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.client;
-
-import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
-
-import java.io.IOException;
-import java.util.List;
-import org.apache.hadoop.hbase.CellScanner;
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.wal.WAL.Entry;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
-
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
-
-/**
- * For replaying edits for region replica.
- * <p/>
- * The mainly difference here is that, every time after locating, we will check whether the region
- * name is equal, if not, we will give up, as this usually means the region has been split or
- * merged, and the new region(s) should already have all the data of the parent region(s).
- * <p/>
- * Notice that, the return value is the edits we skipped, as the original response message is not
- * used at upper layer.
- */
-@InterfaceAudience.Private
-public class AsyncRegionReplicaReplayRetryingCaller extends AsyncRpcRetryingCaller<Long> {
-
-  private static final Logger LOG =
-    LoggerFactory.getLogger(AsyncRegionReplicaReplayRetryingCaller.class);
-
-  private final TableName tableName;
-
-  private final byte[] encodedRegionName;
-
-  private final byte[] row;
-
-  private final Entry[] entries;
-
-  private final int replicaId;
-
-  public AsyncRegionReplicaReplayRetryingCaller(HashedWheelTimer retryTimer,
-      AsyncClusterConnectionImpl conn, int maxAttempts, long operationTimeoutNs,
-      TableName tableName, byte[] encodedRegionName, byte[] row, List<Entry> entries,
-      int replicaId) {
-    super(retryTimer, conn, ConnectionUtils.getPriority(tableName), conn.connConf.getPauseNs(),
-      conn.connConf.getPauseForCQTBENs(), maxAttempts, operationTimeoutNs,
-      conn.connConf.getWriteRpcTimeoutNs(), conn.connConf.getStartLogErrorsCnt());
-    this.tableName = tableName;
-    this.encodedRegionName = encodedRegionName;
-    this.row = row;
-    this.entries = entries.toArray(new Entry[0]);
-    this.replicaId = replicaId;
-  }
-
-  private void call(HRegionLocation loc) {
-    if (!Bytes.equals(encodedRegionName, loc.getRegion().getEncodedNameAsBytes())) {
-      if (LOG.isTraceEnabled()) {
-        LOG.trace(
-          "Skipping {} entries in table {} because located region {} is different than" +
-            " the original region {} from WALEdit",
-          entries.length, tableName, loc.getRegion().getEncodedName(),
-          Bytes.toStringBinary(encodedRegionName));
-        for (Entry entry : entries) {
-          LOG.trace("Skipping : " + entry);
-        }
-      }
-      future.complete(Long.valueOf(entries.length));
-      return;
-    }
-
-    AdminService.Interface stub;
-    try {
-      stub = conn.getAdminStub(loc.getServerName());
-    } catch (IOException e) {
-      onError(e,
-        () -> "Get async admin stub to " + loc.getServerName() + " for '" +
-          Bytes.toStringBinary(row) + "' in " + loc.getRegion().getEncodedName() + " of " +
-          tableName + " failed",
-        err -> conn.getLocator().updateCachedLocationOnError(loc, err));
-      return;
-    }
-    Pair<ReplicateWALEntryRequest, CellScanner> p = ReplicationProtobufUtil
-      .buildReplicateWALEntryRequest(entries, encodedRegionName, null, null, null);
-    resetCallTimeout();
-    controller.setCellScanner(p.getSecond());
-    stub.replay(controller, p.getFirst(), r -> {
-      if (controller.failed()) {
-        onError(controller.getFailed(),
-          () -> "Call to " + loc.getServerName() + " for '" + Bytes.toStringBinary(row) + "' in " +
-            loc.getRegion().getEncodedName() + " of " + tableName + " failed",
-          err -> conn.getLocator().updateCachedLocationOnError(loc, err));
-      } else {
-        future.complete(0L);
-      }
-    });
-
-  }
-
-  @Override
-  protected void doCall() {
-    long locateTimeoutNs;
-    if (operationTimeoutNs > 0) {
-      locateTimeoutNs = remainingTimeNs();
-      if (locateTimeoutNs <= 0) {
-        completeExceptionally();
-        return;
-      }
-    } else {
-      locateTimeoutNs = -1L;
-    }
-    addListener(conn.getLocator().getRegionLocation(tableName, row, replicaId,
-      RegionLocateType.CURRENT, locateTimeoutNs), (loc, error) -> {
-        if (error != null) {
-          onError(error,
-            () -> "Locate '" + Bytes.toStringBinary(row) + "' in " + tableName + " failed", err -> {
-            });
-          return;
-        }
-        call(loc);
-      });
-  }
-}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicationRetryingCaller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicationRetryingCaller.java
new file mode 100644
index 0000000..a0ce418
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicationRetryingCaller.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
+
+/**
+ * For replicating edits to secondary replicas.
+ */
+@InterfaceAudience.Private
+public class AsyncRegionReplicationRetryingCaller extends AsyncRpcRetryingCaller<Void> {
+
+  private final RegionInfo replica;
+
+  private final Entry[] entries;
+
+  public AsyncRegionReplicationRetryingCaller(HashedWheelTimer retryTimer,
+    AsyncClusterConnectionImpl conn, int maxAttempts, long rpcTimeoutNs, long operationTimeoutNs,
+    RegionInfo replica, List<Entry> entries) {
+    super(retryTimer, conn, ConnectionUtils.getPriority(replica.getTable()),
+      conn.connConf.getPauseNs(), conn.connConf.getPauseForCQTBENs(), maxAttempts,
+      operationTimeoutNs, rpcTimeoutNs, conn.connConf.getStartLogErrorsCnt());
+    this.replica = replica;
+    this.entries = entries.toArray(new Entry[0]);
+  }
+
+  private void call(HRegionLocation loc) {
+    AdminService.Interface stub;
+    try {
+      stub = conn.getAdminStub(loc.getServerName());
+    } catch (IOException e) {
+      onError(e,
+        () -> "Get async admin stub to " + loc.getServerName() + " for " + replica + " failed",
+        err -> conn.getLocator().updateCachedLocationOnError(loc, err));
+      return;
+    }
+    Pair<ReplicateWALEntryRequest, CellScanner> pair = ReplicationProtobufUtil
+      .buildReplicateWALEntryRequest(entries, replica.getEncodedNameAsBytes(), null, null, null);
+    resetCallTimeout();
+    controller.setCellScanner(pair.getSecond());
+    stub.replay(controller, pair.getFirst(), r -> {
+      if (controller.failed()) {
+        onError(controller.getFailed(),
+          () -> "Call to " + loc.getServerName() + " for " + replica + " failed",
+          err -> conn.getLocator().updateCachedLocationOnError(loc, err));
+      } else {
+        future.complete(null);
+      }
+    });
+  }
+
+  @Override
+  protected void doCall() {
+    long locateTimeoutNs;
+    if (operationTimeoutNs > 0) {
+      locateTimeoutNs = remainingTimeNs();
+      if (locateTimeoutNs <= 0) {
+        completeExceptionally();
+        return;
+      }
+    } else {
+      locateTimeoutNs = -1L;
+    }
+    addListener(conn.getLocator().getRegionLocation(replica.getTable(), replica.getStartKey(),
+      replica.getReplicaId(), RegionLocateType.CURRENT, locateTimeoutNs), (loc, error) -> {
+        if (error != null) {
+          onError(error, () -> "Locate " + replica + " failed", err -> {
+          });
+          return;
+        }
+        call(loc);
+      });
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index b41619a..9a7ba92 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -679,6 +679,17 @@ public abstract class RpcServer implements RpcServerInterface,
     return Optional.ofNullable(CurCall.get());
   }
 
+  /**
+   * Just return the current rpc call if it is a {@link ServerCall} and also has {@link CellScanner}
+   * attached.
+   * <p/>
+   * Mainly used for reference counting as {@link CellScanner} may reference non heap memory.
+   */
+  public static Optional<ServerCall<?>> getCurrentServerCallWithCellScanner() {
+    return getCurrentCall().filter(c -> c instanceof ServerCall)
+      .filter(c -> c.getCellScanner() != null).map(c -> (ServerCall<?>) c);
+  }
+
   public static boolean isInRpcCallContext() {
     return CurCall.get() != null;
   }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/janitor/MetaFixer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/janitor/MetaFixer.java
index 4a5aa0a..64ee49e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/janitor/MetaFixer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/janitor/MetaFixer.java
@@ -29,7 +29,6 @@ import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.stream.Collectors;
-import org.apache.hadoop.hbase.HBaseIOException;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.TableName;
@@ -40,10 +39,8 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.exceptions.MergeRegionException;
 import org.apache.hadoop.hbase.master.MasterServices;
 import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure;
-import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -197,15 +194,9 @@ public class MetaFixer {
           MetaTableAccessor.addRegionsToMeta(masterServices.getConnection(), newRegions,
             td.getRegionReplication());
 
-          // Setup replication for region replicas if needed
-          if (td.getRegionReplication() > 1) {
-            ServerRegionReplicaUtil.setupRegionReplicaReplication(masterServices);
-          }
           return Either.<List<RegionInfo>, IOException> ofLeft(newRegions);
         } catch (IOException e) {
           return Either.<List<RegionInfo>, IOException> ofRight(e);
-        } catch (ReplicationException e) {
-          return Either.<List<RegionInfo>, IOException> ofRight(new HBaseIOException(e));
         }
       })
       .collect(Collectors.toList());
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java
index 2313e70..723f851 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java
@@ -25,7 +25,6 @@ import java.util.function.Supplier;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.hadoop.hbase.HBaseIOException;
 import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.NamespaceDescriptor;
 import org.apache.hadoop.hbase.TableExistsException;
@@ -37,12 +36,10 @@ import org.apache.hadoop.hbase.client.TableState;
 import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
 import org.apache.hadoop.hbase.master.MasterFileSystem;
 import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
-import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.rsgroup.RSGroupInfo;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.apache.hadoop.hbase.util.FSTableDescriptors;
 import org.apache.hadoop.hbase.util.ModifyRegionUtils;
-import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -378,14 +375,6 @@ public class CreateTableProcedure
     // Add regions to META
     addRegionsToMeta(env, tableDescriptor, newRegions);
 
-    // Setup replication for region replicas if needed
-    if (tableDescriptor.getRegionReplication() > 1) {
-      try {
-        ServerRegionReplicaUtil.setupRegionReplicaReplication(env.getMasterServices());
-      } catch (ReplicationException e) {
-        throw new HBaseIOException(e);
-      }
-    }
     return newRegions;
   }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java
index 247dd9c..aedb42c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java
@@ -38,10 +38,8 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
 import org.apache.hadoop.hbase.master.zksyncer.MetaLocationSyncer;
 import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
-import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.rsgroup.RSGroupInfo;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -414,13 +412,6 @@ public class ModifyTableProcedure
         .collect(Collectors.toList());
       addChildProcedure(env.getAssignmentManager().createAssignProcedures(newReplicas));
     }
-    if (oldReplicaCount <= 1) {
-      try {
-        ServerRegionReplicaUtil.setupRegionReplicaReplication(env.getMasterServices());
-      } catch (ReplicationException e) {
-        throw new HBaseIOException(e);
-      }
-    }
   }
 
   private void closeExcessReplicasIfNeeded(MasterProcedureEnv env) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 8afadc7..c9d456b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver;
 import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL;
 import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY;
 import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
+
 import edu.umd.cs.findbugs.annotations.Nullable;
 import io.opentelemetry.api.trace.Span;
 import java.io.EOFException;
@@ -135,6 +136,7 @@ import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
 import org.apache.hadoop.hbase.ipc.RpcCall;
 import org.apache.hadoop.hbase.ipc.RpcServer;
+import org.apache.hadoop.hbase.ipc.ServerCall;
 import org.apache.hadoop.hbase.mob.MobFileCache;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
@@ -177,6 +179,7 @@ import org.apache.hadoop.util.StringUtils;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
 import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
 import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
@@ -191,6 +194,7 @@ import org.apache.hbase.thirdparty.com.google.protobuf.Service;
 import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
 import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
 import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
+
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall;
@@ -708,6 +712,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
   private final StoreHotnessProtector storeHotnessProtector;
 
+  private Optional<RegionReplicationSink> regionReplicationSink = Optional.empty();
+
   /**
    * HRegion constructor. This constructor should only be used for testing and
    * extensions.  Instances of HRegion should be instantiated with the
@@ -1080,11 +1086,28 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       status.setStatus("Running coprocessor post-open hooks");
       coprocessorHost.postOpen();
     }
-
+    initializeRegionReplicationSink(reporter, status);
     status.markComplete("Region opened successfully");
     return nextSeqId;
   }
 
+  private void initializeRegionReplicationSink(CancelableProgressable reporter,
+    MonitoredTask status) {
+    RegionServerServices rss = getRegionServerServices();
+    TableDescriptor td = getTableDescriptor();
+    int regionReplication = td.getRegionReplication();
+    RegionInfo regionInfo = getRegionInfo();
+    if (regionReplication <= 1 || !RegionReplicaUtil.isDefaultReplica(regionInfo) ||
+      !ServerRegionReplicaUtil.isRegionReplicaReplicationEnabled(conf, regionInfo.getTable()) ||
+      rss == null) {
+      regionReplicationSink = Optional.empty();
+      return;
+    }
+    status.setStatus("Initializaing region replication sink");
+    regionReplicationSink = Optional.of(new RegionReplicationSink(conf, regionInfo,
+      regionReplication, td.hasRegionMemStoreReplication(), rss.getAsyncClusterConnection()));
+  }
+
   /**
    * Open all Stores.
    * @param reporter
@@ -1207,7 +1230,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       RegionEventDescriptor.EventType.REGION_OPEN, getRegionInfo(), openSeqId,
       getRegionServerServices().getServerName(), storeFiles);
     WALUtil.writeRegionEventMarker(wal, getReplicationScope(), getRegionInfo(), regionOpenDesc,
-        mvcc);
+        mvcc, regionReplicationSink.orElse(null));
   }
 
   private void writeRegionCloseMarker(WAL wal) throws IOException {
@@ -1216,7 +1239,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       RegionEventDescriptor.EventType.REGION_CLOSE, getRegionInfo(), mvcc.getReadPoint(),
       getRegionServerServices().getServerName(), storeFiles);
     WALUtil.writeRegionEventMarker(wal, getReplicationScope(), getRegionInfo(), regionEventDesc,
-        mvcc);
+      mvcc, null);
 
     // Store SeqId in WAL FileSystem when a region closes
     // checking region folder exists is due to many tests which delete the table folder while a
@@ -1861,7 +1884,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         RegionReplicaUtil.isDefaultReplica(getRegionInfo())) {
         writeRegionCloseMarker(wal);
       }
-
+      if (regionReplicationSink.isPresent()) {
+        // stop replicating to secondary replicas
+        RegionReplicationSink sink = regionReplicationSink.get();
+        sink.stop();
+        try {
+          regionReplicationSink.get().waitUntilStopped();
+        } catch (InterruptedException e) {
+          throw throwOnInterrupt(e);
+        }
+      }
       this.closed.set(true);
       if (!canFlush) {
         decrMemStoreSize(this.memStoreSizing.getMemStoreSize());
@@ -2822,7 +2854,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
             getRegionInfo(), flushOpSeqId, committedFiles);
         // No sync. Sync is below where no updates lock and we do FlushAction.COMMIT_FLUSH
         WALUtil.writeFlushMarker(wal, this.getReplicationScope(), getRegionInfo(), desc, false,
-            mvcc);
+          mvcc, null);
       }
 
       // Prepare flush (take a snapshot)
@@ -2883,8 +2915,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     try {
       FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
           getRegionInfo(), flushOpSeqId, committedFiles);
-      WALUtil.writeFlushMarker(wal, this.getReplicationScope(), getRegionInfo(), desc, false,
-          mvcc);
+      WALUtil.writeFlushMarker(wal, this.getReplicationScope(), getRegionInfo(), desc, false, mvcc,
+        null);
     } catch (Throwable t) {
       LOG.warn("Received unexpected exception trying to write ABORT_FLUSH marker to WAL: {} in "
         + " region {}", StringUtils.stringifyException(t), this);
@@ -2928,8 +2960,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.CANNOT_FLUSH,
         getRegionInfo(), -1, new TreeMap<>(Bytes.BYTES_COMPARATOR));
       try {
-        WALUtil.writeFlushMarker(wal, this.getReplicationScope(), getRegionInfo(), desc, true,
-            mvcc);
+        WALUtil.writeFlushMarker(wal, this.getReplicationScope(), getRegionInfo(), desc, true, mvcc,
+          null);
         return true;
       } catch (IOException e) {
         LOG.warn(getRegionInfo().getEncodedName() + " : "
@@ -3008,8 +3040,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         // write flush marker to WAL. If fail, we should throw DroppedSnapshotException
         FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH,
           getRegionInfo(), flushOpSeqId, committedFiles);
-        WALUtil.writeFlushMarker(wal, this.getReplicationScope(), getRegionInfo(), desc, true,
-            mvcc);
+        WALUtil.writeFlushMarker(wal, this.getReplicationScope(), getRegionInfo(), desc, true, mvcc,
+          regionReplicationSink.orElse(null));
       }
     } catch (Throwable t) {
       // An exception here means that the snapshot was not persisted.
@@ -3022,7 +3054,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         try {
           FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
             getRegionInfo(), flushOpSeqId, committedFiles);
-          WALUtil.writeFlushMarker(wal, this.replicationScope, getRegionInfo(), desc, false, mvcc);
+          WALUtil.writeFlushMarker(wal, this.replicationScope, getRegionInfo(), desc, false, mvcc,
+            null);
         } catch (Throwable ex) {
           LOG.warn(getRegionInfo().getEncodedName() + " : "
               + "failed writing ABORT_FLUSH marker to WAL", ex);
@@ -7067,10 +7100,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         try {
           WALProtos.BulkLoadDescriptor loadDescriptor =
               ProtobufUtil.toBulkLoadDescriptor(this.getRegionInfo().getTable(),
-                  UnsafeByteOperations.unsafeWrap(this.getRegionInfo().getEncodedNameAsBytes()),
-                  storeFiles, storeFilesSizes, seqId, clusterIds, replicate);
+              UnsafeByteOperations.unsafeWrap(this.getRegionInfo().getEncodedNameAsBytes()),
+              storeFiles, storeFilesSizes, seqId, clusterIds, replicate);
           WALUtil.writeBulkLoadMarkerAndSync(this.wal, this.getReplicationScope(), getRegionInfo(),
-              loadDescriptor, mvcc);
+            loadDescriptor, mvcc, regionReplicationSink.orElse(null));
         } catch (IOException ioe) {
           if (this.rsServices != null) {
             // Have to abort region server because some hfiles has been loaded but we can't write
@@ -7757,21 +7790,25 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     if (this.coprocessorHost != null && !walEdit.isMetaEdit()) {
       this.coprocessorHost.preWALAppend(walKey, walEdit);
     }
-    WriteEntry writeEntry = null;
+    ServerCall<?> rpcCall = RpcServer.getCurrentServerCallWithCellScanner().orElse(null);
     try {
       long txid = this.wal.appendData(this.getRegionInfo(), walKey, walEdit);
+      WriteEntry writeEntry = walKey.getWriteEntry();
+      regionReplicationSink.ifPresent(sink -> writeEntry.attachCompletionAction(() -> {
+        sink.add(walKey, walEdit, rpcCall);
+      }));
       // Call sync on our edit.
       if (txid != 0) {
         sync(txid, durability);
       }
-      writeEntry = walKey.getWriteEntry();
+      return writeEntry;
     } catch (IOException ioe) {
-      if (walKey != null && walKey.getWriteEntry() != null) {
+      if (walKey.getWriteEntry() != null) {
         mvcc.complete(walKey.getWriteEntry());
       }
       throw ioe;
     }
-    return writeEntry;
+
   }
 
   public static final long FIXED_OVERHEAD = ClassSize.estimateBase(HRegion.class, false);
@@ -8426,6 +8463,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     }
   }
 
+  public Optional<RegionReplicationSink> getRegionReplicationSink() {
+    return regionReplicationSink;
+  }
+
   public void addReadRequestsCount(long readRequestsCount) {
     this.readRequestsCount.add(readRequestsCount);
   }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 47cb795..dc46f6b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -1574,7 +1574,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
     // Does this method belong in Region altogether given it is making so many references up there?
     // Could be Region#writeCompactionMarker(compactionDescriptor);
     WALUtil.writeCompactionMarker(this.region.getWAL(), this.region.getReplicationScope(),
-        this.region.getRegionInfo(), compactionDescriptor, this.region.getMVCC());
+      this.region.getRegionInfo(), compactionDescriptor, this.region.getMVCC(),
+      region.getRegionReplicationSink().orElse(null));
   }
 
   void replaceStoreFiles(Collection<HStoreFile> compactedFiles, Collection<HStoreFile> result)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java
index d821eec..91ae03d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.util.LinkedList;
+import java.util.Optional;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ClassSize;
@@ -202,6 +203,7 @@ public class MultiVersionConcurrencyControl {
         if (queueFirst.isCompleted()) {
           nextReadValue = queueFirst.getWriteNumber();
           writeQueue.removeFirst();
+          queueFirst.runCompletionAction();
         } else {
           break;
         }
@@ -271,22 +273,36 @@ public class MultiVersionConcurrencyControl {
    * Every created WriteEntry must be completed by calling mvcc#complete or #completeAndWait.
    */
   @InterfaceAudience.Private
-  public static class WriteEntry {
+  public static final class WriteEntry {
     private final long writeNumber;
     private boolean completed = false;
+    /**
+     * Will be called after completion, i.e, when being removed from the
+     * {@link MultiVersionConcurrencyControl#writeQueue}.
+     */
+    private Optional<Runnable> completionAction = Optional.empty();
 
-    WriteEntry(long writeNumber) {
+    private WriteEntry(long writeNumber) {
       this.writeNumber = writeNumber;
     }
 
-    void markCompleted() {
+    private void markCompleted() {
       this.completed = true;
     }
 
-    boolean isCompleted() {
+    private boolean isCompleted() {
       return this.completed;
     }
 
+    public void attachCompletionAction(Runnable action) {
+      assert !completionAction.isPresent();
+      completionAction = Optional.of(action);
+    }
+
+    private void runCompletionAction() {
+      completionAction.ifPresent(Runnable::run);
+    }
+
     public long getWriteNumber() {
       return this.writeNumber;
     }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionReplicationSink.java
new file mode 100644
index 0000000..6911289
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionReplicationSink.java
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionReplicaUtil;
+import org.apache.hadoop.hbase.ipc.ServerCall;
+import org.apache.hadoop.hbase.util.FutureUtils;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALKeyImpl;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+
+/**
+ * The class for replicating WAL edits to secondary replicas, one instance per region.
+ */
+@InterfaceAudience.Private
+public class RegionReplicationSink {
+
+  private static final Logger LOG = LoggerFactory.getLogger(RegionReplicationSink.class);
+
+  public static final String MAX_PENDING_SIZE = "hbase.region.read-replica.sink.max-pending-size";
+
+  public static final long MAX_PENDING_SIZE_DEFAULT = 10L * 1024 * 1024;
+
+  public static final String RETRIES_NUMBER = "hbase.region.read-replica.sink.retries.number";
+
+  public static final int RETRIES_NUMBER_DEFAULT = 3;
+
+  public static final String RPC_TIMEOUT_MS = "hbase.region.read-replica.sink.rpc.timeout.ms";
+
+  public static final long RPC_TIMEOUT_MS_DEFAULT = 200;
+
+  public static final String OPERATION_TIMEOUT_MS =
+    "hbase.region.read-replica.sink.operation.timeout.ms";
+
+  public static final long OPERATION_TIMEOUT_MS_DEFAULT = 1000;
+
+  private static final class SinkEntry {
+
+    final WALKeyImpl key;
+
+    final WALEdit edit;
+
+    final ServerCall<?> rpcCall;
+
+    SinkEntry(WALKeyImpl key, WALEdit edit, ServerCall<?> rpcCall) {
+      this.key = key;
+      this.edit = edit;
+      this.rpcCall = rpcCall;
+      if (rpcCall != null) {
+        // increase the reference count to avoid the rpc framework free the memory before we
+        // actually sending them out.
+        rpcCall.retainByWAL();
+      }
+    }
+
+    /**
+     * Should be called regardless of the result of the replicating operation. Unless you still want
+     * to reuse this entry, otherwise you must call this method to release the possible off heap
+     * memories.
+     */
+    void replicated() {
+      if (rpcCall != null) {
+        rpcCall.releaseByWAL();
+      }
+    }
+  }
+
+  private final RegionInfo primary;
+
+  private final int regionReplication;
+
+  private final boolean hasRegionMemStoreReplication;
+
+  private final Queue<SinkEntry> entries = new ArrayDeque<>();
+
+  private final AsyncClusterConnection conn;
+
+  private final int retries;
+
+  private final long rpcTimeoutNs;
+
+  private final long operationTimeoutNs;
+
+  private CompletableFuture<Void> future;
+
+  private boolean stopping;
+
+  private boolean stopped;
+
+  RegionReplicationSink(Configuration conf, RegionInfo primary, int regionReplication,
+    boolean hasRegionMemStoreReplication, AsyncClusterConnection conn) {
+    Preconditions.checkArgument(RegionReplicaUtil.isDefaultReplica(primary), "%s is not primary",
+      primary);
+    Preconditions.checkArgument(regionReplication > 1,
+      "region replication should be greater than 1 but got %s", regionReplication);
+    this.primary = primary;
+    this.regionReplication = regionReplication;
+    this.hasRegionMemStoreReplication = hasRegionMemStoreReplication;
+    this.conn = conn;
+    this.retries = conf.getInt(RETRIES_NUMBER, RETRIES_NUMBER_DEFAULT);
+    this.rpcTimeoutNs =
+      TimeUnit.MILLISECONDS.toNanos(conf.getLong(RPC_TIMEOUT_MS, RPC_TIMEOUT_MS_DEFAULT));
+    this.operationTimeoutNs = TimeUnit.MILLISECONDS
+      .toNanos(conf.getLong(OPERATION_TIMEOUT_MS, OPERATION_TIMEOUT_MS_DEFAULT));
+  }
+
+  private void send() {
+    List<SinkEntry> toSend = new ArrayList<>();
+    for (SinkEntry entry;;) {
+      entry = entries.poll();
+      if (entry == null) {
+        break;
+      }
+      toSend.add(entry);
+    }
+    List<WAL.Entry> walEntries =
+      toSend.stream().map(e -> new WAL.Entry(e.key, e.edit)).collect(Collectors.toList());
+    List<CompletableFuture<Void>> futures = new ArrayList<>();
+    for (int replicaId = 1; replicaId < regionReplication; replicaId++) {
+      RegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(primary, replicaId);
+      futures.add(conn.replicate(replica, walEntries, retries, rpcTimeoutNs, operationTimeoutNs));
+    }
+    future = CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[0]));
+    FutureUtils.addListener(future, (r, e) -> {
+      if (e != null) {
+        // TODO: drop pending edits and issue a flush
+        LOG.warn("Failed to replicate to secondary replicas for {}", primary, e);
+      }
+      toSend.forEach(SinkEntry::replicated);
+      synchronized (entries) {
+        future = null;
+        if (stopping) {
+          stopped = true;
+          entries.notifyAll();
+          return;
+        }
+        if (!entries.isEmpty()) {
+          send();
+        }
+      }
+    });
+  }
+
+  /**
+   * Add this edit to replication queue.
+   * <p/>
+   * The {@code rpcCall} is for retaining the cells if the edit is built within an rpc call and the
+   * rpc call has cell scanner, which is off heap.
+   */
+  public void add(WALKeyImpl key, WALEdit edit, ServerCall<?> rpcCall) {
+    if (!hasRegionMemStoreReplication && !edit.isMetaEdit()) {
+      // only replicate meta edit if region memstore replication is not enabled
+      return;
+    }
+    synchronized (entries) {
+      if (stopping) {
+        return;
+      }
+      // TODO: limit the total cached entries here, and we should have a global limitation, not for
+      // only this region.
+      entries.add(new SinkEntry(key, edit, rpcCall));
+      if (future == null) {
+        send();
+      }
+    }
+  }
+
+  /**
+   * Stop the replication sink.
+   * <p/>
+   * Usually this should only be called when you want to close a region.
+   */
+  void stop() {
+    synchronized (entries) {
+      stopping = true;
+      if (future == null) {
+        stopped = true;
+        entries.notifyAll();
+      }
+    }
+  }
+
+  /**
+   * Make sure that we have finished all the replicating requests.
+   * <p/>
+   * After returning, we can make sure there will be no new replicating requests to secondary
+   * replicas.
+   * <p/>
+   * This is used to keep the replicating order the same with the WAL edit order when writing.
+   */
+  void waitUntilStopped() throws InterruptedException {
+    synchronized (entries) {
+      while (!stopped) {
+        entries.wait();
+      }
+    }
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/AssignRegionHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/AssignRegionHandler.java
index 5d9819c..101c9c3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/AssignRegionHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/AssignRegionHandler.java
@@ -22,9 +22,7 @@ import java.io.IOException;
 import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.client.RegionReplicaUtil;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.executor.EventHandler;
 import org.apache.hadoop.hbase.executor.EventType;
@@ -34,10 +32,10 @@ import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionServerServices.PostOpenDeployContext;
 import org.apache.hadoop.hbase.regionserver.RegionServerServices.RegionStateTransitionContext;
 import org.apache.hadoop.hbase.util.RetryCounter;
-import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
 
 /**
@@ -134,14 +132,6 @@ public class AssignRegionHandler extends EventHandler {
       // pass null for the last parameter, which used to be a CancelableProgressable, as now the
       // opening can not be interrupted by a close request any more.
       Configuration conf = rs.getConfiguration();
-      TableName tn = htd.getTableName();
-      if (ServerRegionReplicaUtil.isMetaRegionReplicaReplicationEnabled(conf, tn)) {
-        if (RegionReplicaUtil.isDefaultReplica(this.regionInfo.getReplicaId())) {
-          // Add the hbase:meta replication source on replica zero/default.
-          rs.getReplicationSourceService().getReplicationManager().
-            addCatalogReplicationSource(this.regionInfo);
-        }
-      }
       region = HRegion.openHRegion(regionInfo, htd, rs.getWAL(regionInfo), conf, rs, null);
     } catch (IOException e) {
       cleanUpAndReportFailure(e);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/UnassignRegionHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/UnassignRegionHandler.java
index 0d02f30..2ac55ec 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/UnassignRegionHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/UnassignRegionHandler.java
@@ -22,7 +22,6 @@ import java.io.IOException;
 import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.client.RegionReplicaUtil;
 import org.apache.hadoop.hbase.executor.EventHandler;
 import org.apache.hadoop.hbase.executor.EventType;
 import org.apache.hadoop.hbase.regionserver.HRegion;
@@ -31,10 +30,10 @@ import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionServerServices.RegionStateTransitionContext;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.RetryCounter;
-import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
 
 /**
@@ -121,15 +120,6 @@ public class UnassignRegionHandler extends EventHandler {
     }
 
     rs.removeRegion(region, destination);
-    if (ServerRegionReplicaUtil.isMetaRegionReplicaReplicationEnabled(rs.getConfiguration(),
-        region.getTableDescriptor().getTableName())) {
-      if (RegionReplicaUtil.isDefaultReplica(region.getRegionInfo().getReplicaId())) {
-        // If hbase:meta read replicas enabled, remove replication source for hbase:meta Regions.
-        // See assign region handler where we add the replication source on open.
-        rs.getReplicationSourceService().getReplicationManager().
-          removeCatalogReplicationSource(region.getRegionInfo());
-      }
-    }
     if (!rs.reportRegionStateTransition(
       new RegionStateTransitionContext(TransitionCode.CLOSED, HConstants.NO_SEQNUM, closeProcId,
         -1, region.getRegionInfo()))) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
index 002d9b7..f0e4a1f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
@@ -1164,8 +1164,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
       txidHolder.setValue(ringBuffer.next());
     });
     long txid = txidHolder.longValue();
-    ServerCall<?> rpcCall = RpcServer.getCurrentCall().filter(c -> c instanceof ServerCall)
-      .filter(c -> c.getCellScanner() != null).map(c -> (ServerCall) c).orElse(null);
+    ServerCall<?> rpcCall = RpcServer.getCurrentServerCallWithCellScanner().orElse(null);
     try {
       FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore, rpcCall);
       entry.stampRegionSequenceId(we);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java
index 23db3dc..15be95e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java
@@ -28,7 +28,10 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.ipc.RpcServer;
 import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl.WriteEntry;
+import org.apache.hadoop.hbase.regionserver.RegionReplicationSink;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.wal.WAL;
@@ -71,9 +74,9 @@ public class WALUtil {
    */
   public static WALKeyImpl writeCompactionMarker(WAL wal,
     NavigableMap<byte[], Integer> replicationScope, RegionInfo hri, final CompactionDescriptor c,
-    MultiVersionConcurrencyControl mvcc) throws IOException {
+    MultiVersionConcurrencyControl mvcc, RegionReplicationSink sink) throws IOException {
     WALKeyImpl walKey =
-      writeMarker(wal, replicationScope, hri, WALEdit.createCompaction(hri, c), mvcc, null);
+      writeMarker(wal, replicationScope, hri, WALEdit.createCompaction(hri, c), mvcc, null, sink);
     if (LOG.isTraceEnabled()) {
       LOG.trace("Appended compaction marker " + TextFormat.shortDebugString(c));
     }
@@ -86,10 +89,10 @@ public class WALUtil {
    * This write is for internal use only. Not for external client consumption.
    */
   public static WALKeyImpl writeFlushMarker(WAL wal, NavigableMap<byte[], Integer> replicationScope,
-    RegionInfo hri, final FlushDescriptor f, boolean sync, MultiVersionConcurrencyControl mvcc)
-    throws IOException {
+    RegionInfo hri, final FlushDescriptor f, boolean sync, MultiVersionConcurrencyControl mvcc,
+    RegionReplicationSink sink) throws IOException {
     WALKeyImpl walKey = doFullMarkerAppendTransaction(wal, replicationScope, hri,
-      WALEdit.createFlushWALEdit(hri, f), mvcc, null, sync);
+      WALEdit.createFlushWALEdit(hri, f), mvcc, null, sync, sink);
     if (LOG.isTraceEnabled()) {
       LOG.trace("Appended flush marker " + TextFormat.shortDebugString(f));
     }
@@ -102,9 +105,9 @@ public class WALUtil {
    */
   public static WALKeyImpl writeRegionEventMarker(WAL wal,
     NavigableMap<byte[], Integer> replicationScope, RegionInfo hri, RegionEventDescriptor r,
-    MultiVersionConcurrencyControl mvcc) throws IOException {
-    WALKeyImpl walKey =
-      writeMarker(wal, replicationScope, hri, WALEdit.createRegionEventWALEdit(hri, r), mvcc, null);
+    MultiVersionConcurrencyControl mvcc, RegionReplicationSink sink) throws IOException {
+    WALKeyImpl walKey = writeMarker(wal, replicationScope, hri,
+      WALEdit.createRegionEventWALEdit(hri, r), mvcc, null, sink);
     if (LOG.isTraceEnabled()) {
       LOG.trace("Appended region event marker " + TextFormat.shortDebugString(r));
     }
@@ -122,11 +125,11 @@ public class WALUtil {
    * @throws IOException We will throw an IOException if we can not append to the HLog.
    */
   public static WALKeyImpl writeBulkLoadMarkerAndSync(final WAL wal,
-      final NavigableMap<byte[], Integer> replicationScope, final RegionInfo hri,
-      final WALProtos.BulkLoadDescriptor desc, final MultiVersionConcurrencyControl mvcc)
-    throws IOException {
+    final NavigableMap<byte[], Integer> replicationScope, final RegionInfo hri,
+    final WALProtos.BulkLoadDescriptor desc, final MultiVersionConcurrencyControl mvcc,
+    final RegionReplicationSink sink) throws IOException {
     WALKeyImpl walKey = writeMarker(wal, replicationScope, hri,
-      WALEdit.createBulkLoadEvent(hri, desc), mvcc, null);
+      WALEdit.createBulkLoadEvent(hri, desc), mvcc, null, sink);
     if (LOG.isTraceEnabled()) {
       LOG.trace("Appended Bulk Load marker " + TextFormat.shortDebugString(desc));
     }
@@ -135,11 +138,11 @@ public class WALUtil {
 
   private static WALKeyImpl writeMarker(final WAL wal,
     final NavigableMap<byte[], Integer> replicationScope, final RegionInfo hri, final WALEdit edit,
-    final MultiVersionConcurrencyControl mvcc, final Map<String, byte[]> extendedAttributes)
-    throws IOException {
+    final MultiVersionConcurrencyControl mvcc, final Map<String, byte[]> extendedAttributes,
+    final RegionReplicationSink sink) throws IOException {
     // If sync == true in below, then timeout is not used; safe to pass UNSPECIFIED_TIMEOUT
     return doFullMarkerAppendTransaction(wal, replicationScope, hri, edit, mvcc, extendedAttributes,
-      true);
+      true, sink);
   }
 
   /**
@@ -152,19 +155,24 @@ public class WALUtil {
    */
   private static WALKeyImpl doFullMarkerAppendTransaction(final WAL wal,
     final NavigableMap<byte[], Integer> replicationScope, final RegionInfo hri, final WALEdit edit,
-    final MultiVersionConcurrencyControl mvcc,
-    final Map<String, byte[]> extendedAttributes, final boolean sync) throws IOException {
+    final MultiVersionConcurrencyControl mvcc, final Map<String, byte[]> extendedAttributes,
+    final boolean sync, final RegionReplicationSink sink) throws IOException {
     // TODO: Pass in current time to use?
     WALKeyImpl walKey = new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(),
       EnvironmentEdgeManager.currentTime(), mvcc, replicationScope, extendedAttributes);
     long trx = MultiVersionConcurrencyControl.NONE;
     try {
       trx = wal.appendMarker(hri, walKey, edit);
+      WriteEntry writeEntry = walKey.getWriteEntry();
+      if (sink != null) {
+        writeEntry.attachCompletionAction(() -> sink.add(walKey, edit,
+          RpcServer.getCurrentServerCallWithCellScanner().orElse(null)));
+      }
       if (sync) {
         wal.sync(trx);
       }
       // Call complete only here because these are markers only. They are not for clients to read.
-      mvcc.complete(walKey.getWriteEntry());
+      mvcc.complete(writeEntry);
     } catch (IOException ioe) {
       if (walKey.getWriteEntry() != null) {
         mvcc.complete(walKey.getWriteEntry());
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/CatalogReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/CatalogReplicationSource.java
deleted file mode 100644
index 8cb7860..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/CatalogReplicationSource.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication.regionserver;
-
-import java.util.Collections;
-import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
-import org.apache.yetus.audience.InterfaceAudience;
-
-/**
- * ReplicationSource that reads catalog WAL files -- e.g. hbase:meta WAL files -- and lets through
- * all WALEdits from these WALs. This ReplicationSource is NOT created via
- * {@link ReplicationSourceFactory}.
- */
-@InterfaceAudience.Private
-class CatalogReplicationSource extends ReplicationSource {
-  CatalogReplicationSource() {
-    // Filters in hbase:meta WAL files and allows all edits, including 'meta' edits (these are
-    // filtered out in the 'super' class default implementation).
-    super(p -> AbstractFSWALProvider.isMetaFile(p), Collections.emptyList());
-  }
-
-  @Override
-  public void logPositionAndCleanOldLogs(WALEntryBatch entryBatch) {
-    // Noop. This CatalogReplicationSource implementation does not persist state to backing storage
-    // nor does it keep its WALs in a general map up in ReplicationSourceManager --
-    // CatalogReplicationSource is used by the Catalog Read Replica feature which resets everytime
-    // the WAL source process crashes. Skip calling through to the default implementation.
-    // See "4.1 Skip maintaining zookeeper replication queue (offsets/WALs)" in the
-    // design doc attached to HBASE-18070 'Enable memstore replication for meta replica for detail'
-    // for background on why no need to keep WAL state.
-  }
-}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/CatalogReplicationSourcePeer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/CatalogReplicationSourcePeer.java
deleted file mode 100644
index 3bcd414..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/CatalogReplicationSourcePeer.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication.regionserver;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
-import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
-import org.apache.hadoop.hbase.replication.SyncReplicationState;
-import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
-import org.apache.yetus.audience.InterfaceAudience;
-
-/**
- * The 'peer' used internally by Catalog Region Replicas Replication Source.
- * The Replication system has 'peer' baked into its core so though we do not need 'peering', we
- * need a 'peer' and its configuration else the replication system breaks at a few locales.
- * Set "hbase.region.replica.catalog.replication" if you want to change the configured endpoint.
- */
-@InterfaceAudience.Private
-class CatalogReplicationSourcePeer extends ReplicationPeerImpl {
-  /**
-   * @param clusterKey Usually the UUID from zk passed in by caller as a String.
-   */
-  CatalogReplicationSourcePeer(Configuration configuration, String clusterKey) {
-    super(configuration, ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER + "_catalog",
-      ReplicationPeerConfig.newBuilder().
-        setClusterKey(clusterKey).
-        setReplicationEndpointImpl(
-          configuration.get("hbase.region.replica.catalog.replication",
-            RegionReplicaReplicationEndpoint.class.getName())).
-        setBandwidth(0). // '0' means no bandwidth.
-        setSerial(false).
-        build(),
-      true, SyncReplicationState.NONE, SyncReplicationState.NONE);
-  }
-}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
deleted file mode 100644
index 17e7a53..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
+++ /dev/null
@@ -1,407 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.replication.regionserver;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.TreeMap;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HBaseIOException;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.RegionLocations;
-import org.apache.hadoop.hbase.TableDescriptors;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.client.AsyncClusterConnection;
-import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.client.RegionReplicaUtil;
-import org.apache.hadoop.hbase.client.TableDescriptor;
-import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
-import org.apache.hadoop.hbase.replication.WALEntryFilter;
-import org.apache.hadoop.hbase.util.AtomicUtils;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.FutureUtils;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.util.RetryCounter;
-import org.apache.hadoop.hbase.util.RetryCounterFactory;
-import org.apache.hadoop.hbase.wal.WAL.Entry;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
-import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
-import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader;
-import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache;
-
-/**
- * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint} endpoint which receives the WAL
- * edits from the WAL, and sends the edits to replicas of regions.
- */
-@InterfaceAudience.Private
-public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
-
-  private static final Logger LOG = LoggerFactory.getLogger(RegionReplicaReplicationEndpoint.class);
-
-  // Can be configured differently than hbase.client.retries.number
-  private static String CLIENT_RETRIES_NUMBER =
-    "hbase.region.replica.replication.client.retries.number";
-
-  private Configuration conf;
-  private AsyncClusterConnection connection;
-  private TableDescriptors tableDescriptors;
-
-  private int numRetries;
-
-  private long operationTimeoutNs;
-
-  private LoadingCache<TableName, Optional<TableDescriptor>> tableDescriptorCache;
-
-  private Cache<TableName, TableName> disabledTableCache;
-
-  private final RetryCounterFactory retryCounterFactory =
-    new RetryCounterFactory(Integer.MAX_VALUE, 1000, 60000);
-
-  @Override
-  public void init(Context context) throws IOException {
-    super.init(context);
-    this.conf = context.getConfiguration();
-    this.tableDescriptors = context.getTableDescriptors();
-    int memstoreReplicationEnabledCacheExpiryMs = conf
-      .getInt("hbase.region.replica.replication.cache.memstoreReplicationEnabled.expiryMs", 5000);
-    // A cache for the table "memstore replication enabled" flag.
-    // It has a default expiry of 5 sec. This means that if the table is altered
-    // with a different flag value, we might miss to replicate for that amount of
-    // time. But this cache avoid the slow lookup and parsing of the TableDescriptor.
-    tableDescriptorCache = CacheBuilder.newBuilder()
-      .expireAfterWrite(memstoreReplicationEnabledCacheExpiryMs, TimeUnit.MILLISECONDS)
-      .initialCapacity(10).maximumSize(1000)
-      .build(new CacheLoader<TableName, Optional<TableDescriptor>>() {
-
-        @Override
-        public Optional<TableDescriptor> load(TableName tableName) throws Exception {
-          // check if the table requires memstore replication
-          // some unit-test drop the table, so we should do a bypass check and always replicate.
-          return Optional.ofNullable(tableDescriptors.get(tableName));
-        }
-      });
-    int nonExistentTableCacheExpiryMs =
-      conf.getInt("hbase.region.replica.replication.cache.disabledAndDroppedTables.expiryMs", 5000);
-    // A cache for non existing tables that have a default expiry of 5 sec. This means that if the
-    // table is created again with the same name, we might miss to replicate for that amount of
-    // time. But this cache prevents overloading meta requests for every edit from a deleted file.
-    disabledTableCache = CacheBuilder.newBuilder()
-      .expireAfterWrite(nonExistentTableCacheExpiryMs, TimeUnit.MILLISECONDS).initialCapacity(10)
-      .maximumSize(1000).build();
-    // HRS multiplies client retries by 10 globally for meta operations, but we do not want this.
-    // We are resetting it here because we want default number of retries (35) rather than 10 times
-    // that which makes very long retries for disabled tables etc.
-    int defaultNumRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
-      HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
-    if (defaultNumRetries > 10) {
-      int mult = conf.getInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER,
-        HConstants.DEFAULT_HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER);
-      defaultNumRetries = defaultNumRetries / mult; // reset if HRS has multiplied this already
-    }
-    this.numRetries = conf.getInt(CLIENT_RETRIES_NUMBER, defaultNumRetries);
-    // use the regular RPC timeout for replica replication RPC's
-    this.operationTimeoutNs =
-      TimeUnit.MILLISECONDS.toNanos(conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
-        HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT));
-    this.connection = context.getServer().getAsyncClusterConnection();
-  }
-
-  /**
-   * returns true if the specified entry must be replicated. We should always replicate meta
-   * operations (e.g. flush) and use the user HTD flag to decide whether or not replicate the
-   * memstore.
-   */
-  private boolean requiresReplication(Optional<TableDescriptor> tableDesc,
-      Entry entry) {
-    // empty edit does not need to be replicated
-    if (entry.getEdit().isEmpty() || !tableDesc.isPresent()) {
-      return false;
-    }
-    // meta edits (e.g. flush) must be always replicated
-    return entry.getEdit().isMetaEdit() || tableDesc.get().hasRegionMemStoreReplication();
-  }
-
-  private void getRegionLocations(CompletableFuture<RegionLocations> future,
-      TableDescriptor tableDesc, byte[] encodedRegionName, byte[] row, boolean reload) {
-    FutureUtils.addListener(connection.getRegionLocations(tableDesc.getTableName(), row, reload),
-      (locs, e) -> {
-        if (e != null) {
-          future.completeExceptionally(e);
-          return;
-        }
-        // if we are not loading from cache, just return
-        if (reload) {
-          future.complete(locs);
-          return;
-        }
-        // check if the number of region replicas is correct, and also the primary region name
-        // matches.
-        if (locs.size() == tableDesc.getRegionReplication() &&
-          locs.getDefaultRegionLocation() != null &&
-          Bytes.equals(locs.getDefaultRegionLocation().getRegion().getEncodedNameAsBytes(),
-            encodedRegionName)) {
-          future.complete(locs);
-        } else {
-          // reload again as the information in cache maybe stale
-          getRegionLocations(future, tableDesc, encodedRegionName, row, true);
-        }
-      });
-  }
-
-  private void replicate(CompletableFuture<Long> future, RegionLocations locs,
-      TableDescriptor tableDesc, byte[] encodedRegionName, byte[] row, List<Entry> entries) {
-    if (locs.size() == 1) {
-      LOG.info("Only one location for {}.{}, refresh the location cache only for meta now",
-        tableDesc.getTableName(), Bytes.toString(encodedRegionName));
-
-      // This could happen to meta table. In case of meta table comes with no replica and
-      // later it is changed to multiple replicas. The cached location for meta may only has
-      // the primary region. In this case, it needs to clean up and refresh the cached meta
-      // locations.
-      if (tableDesc.isMetaTable()) {
-        connection.getRegionLocator(tableDesc.getTableName()).clearRegionLocationCache();
-      }
-      future.complete(Long.valueOf(entries.size()));
-      return;
-    }
-    RegionInfo defaultReplica = locs.getDefaultRegionLocation().getRegion();
-    if (!Bytes.equals(defaultReplica.getEncodedNameAsBytes(), encodedRegionName)) {
-      // the region name is not equal, this usually means the region has been split or merged, so
-      // give up replicating as the new region(s) should already have all the data of the parent
-      // region(s).
-      if (LOG.isTraceEnabled()) {
-        LOG.trace(
-          "Skipping {} entries in table {} because located region {} is different than" +
-            " the original region {} from WALEdit",
-          tableDesc.getTableName(), defaultReplica.getEncodedName(),
-          Bytes.toStringBinary(encodedRegionName));
-      }
-      future.complete(Long.valueOf(entries.size()));
-      return;
-    }
-    AtomicReference<Throwable> error = new AtomicReference<>();
-    AtomicInteger remainingTasks = new AtomicInteger(locs.size() - 1);
-    AtomicLong skippedEdits = new AtomicLong(0);
-
-    for (int i = 1, n = locs.size(); i < n; i++) {
-      // Do not use the elements other than the default replica as they may be null. We will fail
-      // earlier if the location for default replica is null.
-      final RegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(defaultReplica, i);
-      FutureUtils
-        .addListener(connection.replay(tableDesc.getTableName(), replica.getEncodedNameAsBytes(),
-          row, entries, replica.getReplicaId(), numRetries, operationTimeoutNs), (r, e) -> {
-            if (e != null) {
-              LOG.warn("Failed to replicate to {}", replica, e);
-              error.compareAndSet(null, e);
-            } else {
-              AtomicUtils.updateMax(skippedEdits, r.longValue());
-            }
-            if (remainingTasks.decrementAndGet() == 0) {
-              if (error.get() != null) {
-                future.completeExceptionally(error.get());
-              } else {
-                future.complete(skippedEdits.get());
-              }
-            }
-          });
-    }
-  }
-
-  private void logSkipped(TableName tableName, List<Entry> entries, String reason) {
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("Skipping {} entries because table {} is {}", entries.size(), tableName, reason);
-      for (Entry entry : entries) {
-        LOG.trace("Skipping : {}", entry);
-      }
-    }
-  }
-
-  private CompletableFuture<Long> replicate(TableDescriptor tableDesc, byte[] encodedRegionName,
-      List<Entry> entries) {
-    if (disabledTableCache.getIfPresent(tableDesc.getTableName()) != null) {
-      logSkipped(tableDesc.getTableName(), entries, "cached as a disabled table");
-      return CompletableFuture.completedFuture(Long.valueOf(entries.size()));
-    }
-    byte[] row = CellUtil.cloneRow(entries.get(0).getEdit().getCells().get(0));
-    CompletableFuture<RegionLocations> locateFuture = new CompletableFuture<>();
-    getRegionLocations(locateFuture, tableDesc, encodedRegionName, row, false);
-    CompletableFuture<Long> future = new CompletableFuture<>();
-    FutureUtils.addListener(locateFuture, (locs, error) -> {
-      if (error != null) {
-        future.completeExceptionally(error);
-      } else if (locs.getDefaultRegionLocation() == null) {
-        future.completeExceptionally(
-          new HBaseIOException("No location found for default replica of table=" +
-            tableDesc.getTableName() + " row='" + Bytes.toStringBinary(row) + "'"));
-      } else {
-        replicate(future, locs, tableDesc, encodedRegionName, row, entries);
-      }
-    });
-    return future;
-  }
-
-  @Override
-  public boolean replicate(ReplicateContext replicateContext) {
-    Map<byte[], Pair<TableDescriptor, List<Entry>>> encodedRegionName2Entries =
-      new TreeMap<>(Bytes.BYTES_COMPARATOR);
-    long skippedEdits = 0;
-    RetryCounter retryCounter = retryCounterFactory.create();
-    outer: while (isRunning()) {
-      encodedRegionName2Entries.clear();
-      skippedEdits = 0;
-      for (Entry entry : replicateContext.getEntries()) {
-        Optional<TableDescriptor> tableDesc;
-        try {
-          tableDesc = tableDescriptorCache.get(entry.getKey().getTableName());
-        } catch (ExecutionException e) {
-          LOG.warn("Failed to load table descriptor for {}, attempts={}",
-            entry.getKey().getTableName(), retryCounter.getAttemptTimes(), e.getCause());
-          if (!retryCounter.shouldRetry()) {
-            return false;
-          }
-          try {
-            retryCounter.sleepUntilNextRetry();
-          } catch (InterruptedException e1) {
-            // restore the interrupted state
-            Thread.currentThread().interrupt();
-            return false;
-          }
-          continue outer;
-        }
-        if (!requiresReplication(tableDesc, entry)) {
-          skippedEdits++;
-          continue;
-        }
-        byte[] encodedRegionName = entry.getKey().getEncodedRegionName();
-        encodedRegionName2Entries
-          .computeIfAbsent(encodedRegionName, k -> Pair.newPair(tableDesc.get(), new ArrayList<>()))
-          .getSecond().add(entry);
-      }
-      break;
-    }
-    // send the request to regions
-    retryCounter = retryCounterFactory.create();
-    while (isRunning()) {
-      List<Pair<CompletableFuture<Long>, byte[]>> futureAndEncodedRegionNameList =
-        new ArrayList<Pair<CompletableFuture<Long>, byte[]>>();
-      for (Map.Entry<byte[], Pair<TableDescriptor, List<Entry>>> entry : encodedRegionName2Entries
-        .entrySet()) {
-        CompletableFuture<Long> future =
-          replicate(entry.getValue().getFirst(), entry.getKey(), entry.getValue().getSecond());
-        futureAndEncodedRegionNameList.add(Pair.newPair(future, entry.getKey()));
-      }
-      for (Pair<CompletableFuture<Long>, byte[]> pair : futureAndEncodedRegionNameList) {
-        byte[] encodedRegionName = pair.getSecond();
-        try {
-          skippedEdits += pair.getFirst().get().longValue();
-          encodedRegionName2Entries.remove(encodedRegionName);
-        } catch (InterruptedException e) {
-          // restore the interrupted state
-          Thread.currentThread().interrupt();
-          return false;
-        } catch (ExecutionException e) {
-          Pair<TableDescriptor, List<Entry>> tableAndEntries =
-            encodedRegionName2Entries.get(encodedRegionName);
-          TableName tableName = tableAndEntries.getFirst().getTableName();
-          List<Entry> entries = tableAndEntries.getSecond();
-          Throwable cause = e.getCause();
-          // The table can be disabled or dropped at this time. For disabled tables, we have no
-          // cheap mechanism to detect this case because meta does not contain this information.
-          // ClusterConnection.isTableDisabled() is a zk call which we cannot do for every replay
-          // RPC. So instead we start the replay RPC with retries and check whether the table is
-          // dropped or disabled which might cause SocketTimeoutException, or
-          // RetriesExhaustedException or similar if we get IOE.
-          if (cause instanceof TableNotFoundException) {
-            // add to cache that the table does not exist
-            tableDescriptorCache.put(tableName, Optional.empty());
-            logSkipped(tableName, entries, "dropped");
-            skippedEdits += entries.size();
-            encodedRegionName2Entries.remove(encodedRegionName);
-            continue;
-          }
-          boolean disabled = false;
-          try {
-            disabled = connection.getAdmin().isTableDisabled(tableName).get();
-          } catch (InterruptedException e1) {
-            // restore the interrupted state
-            Thread.currentThread().interrupt();
-            return false;
-          } catch (ExecutionException e1) {
-            LOG.warn("Failed to test whether {} is disabled, assume it is not disabled", tableName,
-              e1.getCause());
-          }
-          if (disabled) {
-            disabledTableCache.put(tableName, tableName);
-            logSkipped(tableName, entries, "disabled");
-            skippedEdits += entries.size();
-            encodedRegionName2Entries.remove(encodedRegionName);
-            continue;
-          }
-          LOG.warn("Failed to replicate {} entries for region {} of table {}", entries.size(),
-            Bytes.toStringBinary(encodedRegionName), tableName);
-        }
-      }
-      // we have done
-      if (encodedRegionName2Entries.isEmpty()) {
-        ctx.getMetrics().incrLogEditsFiltered(skippedEdits);
-        return true;
-      } else {
-        LOG.warn("Failed to replicate all entries, retry={}", retryCounter.getAttemptTimes());
-        if (!retryCounter.shouldRetry()) {
-          return false;
-        }
-        try {
-          retryCounter.sleepUntilNextRetry();
-        } catch (InterruptedException e) {
-          // restore the interrupted state
-          Thread.currentThread().interrupt();
-          return false;
-        }
-      }
-    }
-
-    return false;
-  }
-
-  @Override
-  public boolean canReplicateToSameCluster() {
-    return true;
-  }
-
-  @Override
-  protected WALEntryFilter getScopeWALEntryFilter() {
-    // we do not care about scope. We replicate everything.
-    return null;
-  }
-}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java
index 8863f14..b055902 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java
@@ -27,7 +27,6 @@ import org.slf4j.LoggerFactory;
 /**
  * Constructs a {@link ReplicationSourceInterface}
  * Note, not used to create specialized ReplicationSources
- * @see CatalogReplicationSource
  */
 @InterfaceAudience.Private
 public final class ReplicationSourceFactory {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 73efcfe..9f8d8dc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -39,7 +39,6 @@ import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -49,8 +48,6 @@ import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
-import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationPeer;
 import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
@@ -64,9 +61,7 @@ import org.apache.hadoop.hbase.replication.SyncReplicationState;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider;
-import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALFactory;
-import org.apache.hadoop.hbase.wal.WALProvider;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
@@ -183,16 +178,6 @@ public class ReplicationSourceManager {
   private final MetricsReplicationGlobalSourceSource globalMetrics;
 
   /**
-   * A special ReplicationSource for hbase:meta Region Read Replicas.
-   * Usually this reference remains empty. If an hbase:meta Region is opened on this server, we
-   * will create an instance of a hbase:meta CatalogReplicationSource and it will live the life of
-   * the Server thereafter; i.e. we will not shut it down even if the hbase:meta moves away from
-   * this server (in case it later gets moved back). We synchronize on this instance testing for
-   * presence and if absent, while creating so only created and started once.
-   */
-  AtomicReference<ReplicationSourceInterface> catalogReplicationSource = new AtomicReference<>();
-
-  /**
    * Creates a replication manager and sets the watch on all the other registered region servers
    * @param queueStorage the interface for manipulating replication queues
    * @param conf the configuration to use
@@ -1066,78 +1051,6 @@ public class ReplicationSourceManager {
     return this.globalMetrics;
   }
 
-  /**
-   * Add an hbase:meta Catalog replication source. Called on open of an hbase:meta Region.
-   * Create it once only. If exists already, use the existing one.
-   * @see #removeCatalogReplicationSource(RegionInfo)
-   * @see #addSource(String) This is specialization on the addSource method.
-   */
-  public ReplicationSourceInterface addCatalogReplicationSource(RegionInfo regionInfo)
-      throws IOException {
-    // Poor-man's putIfAbsent
-    synchronized (this.catalogReplicationSource) {
-      ReplicationSourceInterface rs = this.catalogReplicationSource.get();
-      return rs != null ? rs :
-        this.catalogReplicationSource.getAndSet(createCatalogReplicationSource(regionInfo));
-    }
-  }
-
-  /**
-   * Remove the hbase:meta Catalog replication source.
-   * Called when we close hbase:meta.
-   * @see #addCatalogReplicationSource(RegionInfo regionInfo)
-   */
-  public void removeCatalogReplicationSource(RegionInfo regionInfo) {
-    // Nothing to do. Leave any CatalogReplicationSource in place in case an hbase:meta Region
-    // comes back to this server.
-  }
-
-  /**
-   * Create, initialize, and start the Catalog ReplicationSource.
-   * Presumes called one-time only (caller must ensure one-time only call).
-   * This ReplicationSource is NOT created via {@link ReplicationSourceFactory}.
-   * @see #addSource(String) This is a specialization of the addSource call.
-   * @see #catalogReplicationSource for a note on this ReplicationSource's lifecycle (and more on
-   *    why the special handling).
-   */
-  private ReplicationSourceInterface createCatalogReplicationSource(RegionInfo regionInfo)
-      throws IOException {
-    // Instantiate meta walProvider. Instantiated here or over in the #warmupRegion call made by the
-    // Master on a 'move' operation. Need to do extra work if we did NOT instantiate the provider.
-    WALProvider walProvider = this.walFactory.getMetaWALProvider();
-    boolean instantiate = walProvider == null;
-    if (instantiate) {
-      walProvider = this.walFactory.getMetaProvider();
-    }
-    // Here we do a specialization on what {@link ReplicationSourceFactory} does. There is no need
-    // for persisting offset into WALs up in zookeeper (via ReplicationQueueInfo) as the catalog
-    // read replicas feature that makes use of the source does a reset on a crash of the WAL
-    // source process. See "4.1 Skip maintaining zookeeper replication queue (offsets/WALs)" in the
-    // design doc attached to HBASE-18070 'Enable memstore replication for meta replica' for detail.
-    CatalogReplicationSourcePeer peer = new CatalogReplicationSourcePeer(this.conf,
-      this.clusterId.toString());
-    final ReplicationSourceInterface crs = new CatalogReplicationSource();
-    crs.init(conf, fs, this, new NoopReplicationQueueStorage(), peer, server, peer.getId(),
-      clusterId, walProvider.getWALFileLengthProvider(), new MetricsSource(peer.getId()));
-    // Add listener on the provider so we can pick up the WAL to replicate on roll.
-    WALActionsListener listener = new WALActionsListener() {
-      @Override public void postLogRoll(Path oldPath, Path newPath) throws IOException {
-        crs.enqueueLog(newPath);
-      }
-    };
-    walProvider.addWALActionsListener(listener);
-    if (!instantiate) {
-      // If we did not instantiate provider, need to add our listener on already-created WAL
-      // instance too (listeners are passed by provider to WAL instance on creation but if provider
-      // created already, our listener add above is missed). And add the current WAL file to the
-      // Replication Source so it can start replicating it.
-      WAL wal = walProvider.getWAL(regionInfo);
-      wal.registerWALActionsListener(listener);
-      crs.enqueueLog(((AbstractFSWAL)wal).getCurrentFileName());
-    }
-    return crs.startup();
-  }
-
   ReplicationQueueStorage getQueueStorage() {
     return queueStorage;
   }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java
index 5583a47..6a46bd4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java
@@ -22,23 +22,15 @@ import java.io.IOException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
 import org.apache.hadoop.hbase.io.HFileLink;
 import org.apache.hadoop.hbase.io.Reference;
-import org.apache.hadoop.hbase.master.MasterServices;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
-import org.apache.hadoop.hbase.replication.ReplicationException;
-import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
-import org.apache.hadoop.hbase.replication.regionserver.RegionReplicaReplicationEndpoint;
-import org.apache.hadoop.hbase.zookeeper.ZKConfig;
 import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Similar to {@link RegionReplicaUtil} but for the server side
@@ -46,8 +38,6 @@ import org.slf4j.LoggerFactory;
 @InterfaceAudience.Private
 public class ServerRegionReplicaUtil extends RegionReplicaUtil {
 
-  private static final Logger LOG = LoggerFactory.getLogger(ServerRegionReplicaUtil.class);
-
   /**
    * Whether asynchronous WAL replication to the secondary region replicas is enabled or not.
    * If this is enabled, a replication peer named "region_replica_replication" will be created
@@ -59,7 +49,6 @@ public class ServerRegionReplicaUtil extends RegionReplicaUtil {
   public static final String REGION_REPLICA_REPLICATION_CONF_KEY
     = "hbase.region.replica.replication.enabled";
   private static final boolean DEFAULT_REGION_REPLICA_REPLICATION = false;
-  public static final String REGION_REPLICA_REPLICATION_PEER = "region_replica_replication";
 
   /**
    * Same as for {@link #REGION_REPLICA_REPLICATION_CONF_KEY} but for catalog replication.
@@ -162,27 +151,6 @@ public class ServerRegionReplicaUtil extends RegionReplicaUtil {
   }
 
   /**
-   * Create replication peer for replicating user-space Region Read Replicas.
-   * This methods should only be called at master side.
-   */
-  public static void setupRegionReplicaReplication(MasterServices services)
-    throws IOException, ReplicationException {
-    if (!isRegionReplicaReplicationEnabled(services.getConfiguration())) {
-      return;
-    }
-    if (services.getReplicationPeerManager().getPeerConfig(REGION_REPLICA_REPLICATION_PEER)
-      .isPresent()) {
-      return;
-    }
-    LOG.info("Region replica replication peer id=" + REGION_REPLICA_REPLICATION_PEER +
-      " not exist. Creating...");
-    ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder()
-      .setClusterKey(ZKConfig.getZooKeeperClusterKey(services.getConfiguration()))
-      .setReplicationEndpointImpl(RegionReplicaReplicationEndpoint.class.getName()).build();
-    services.addReplicationPeer(REGION_REPLICA_REPLICATION_PEER, peerConfig, true);
-  }
-
-  /**
    * @return True if Region Read Replica is enabled for <code>tn</code> (whether hbase:meta or
    *   user-space tables).
    */
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
index 3c2bc3f..fe16c31 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
@@ -296,8 +296,8 @@ public class TestIOFencing {
         FAMILY, Lists.newArrayList(new Path("/a")), Lists.newArrayList(new Path("/b")),
         new Path("store_dir"));
       WALUtil.writeCompactionMarker(compactingRegion.getWAL(),
-          ((HRegion)compactingRegion).getReplicationScope(),
-        oldHri, compactionDescriptor, compactingRegion.getMVCC());
+        ((HRegion) compactingRegion).getReplicationScope(), oldHri, compactionDescriptor,
+        compactingRegion.getMVCC(), null);
 
       // Wait till flush has happened, otherwise there won't be multiple store files
       long startWaitTime = EnvironmentEdgeManager.currentTime();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java
index 4b10110..ef3511c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java
@@ -126,12 +126,6 @@ public class DummyAsyncClusterConnection implements AsyncClusterConnection {
   }
 
   @Override
-  public CompletableFuture<Long> replay(TableName tableName, byte[] encodedRegionName, byte[] row,
-      List<Entry> entries, int replicaId, int numRetries, long operationTimeoutNs) {
-    return null;
-  }
-
-  @Override
   public CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[] row,
       boolean reload) {
     return null;
@@ -169,4 +163,11 @@ public class DummyAsyncClusterConnection implements AsyncClusterConnection {
   public CompletableFuture<List<ServerName>> getAllBootstrapNodes(ServerName regionServer) {
     return null;
   }
+
+  @Override
+  public CompletableFuture<Void> replicate(RegionInfo replica,
+    List<Entry> entries, int numRetries, long rpcTimeoutNs,
+    long operationTimeoutNs) {
+    return null;
+  }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index a5584ff..187dca5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -996,7 +996,7 @@ public class TestHRegion {
             region.getRegionFileSystem().getStoreDir(Bytes.toString(family)));
 
       WALUtil.writeCompactionMarker(region.getWAL(), this.region.getReplicationScope(),
-          this.region.getRegionInfo(), compactionDescriptor, region.getMVCC());
+          this.region.getRegionInfo(), compactionDescriptor, region.getMVCC(), null);
 
       Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicaFailover.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicaFailover.java
index ea6589d..ba7e9d1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicaFailover.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicaFailover.java
@@ -38,7 +38,7 @@ import org.apache.hadoop.hbase.client.Consistency;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.client.TableDescriptor;
-import org.apache.hadoop.hbase.replication.regionserver.TestRegionReplicaReplicationEndpoint;
+import org.apache.hadoop.hbase.replication.regionserver.TestRegionReplicaReplication;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
@@ -64,7 +64,7 @@ public class TestRegionReplicaFailover {
       HBaseClassTestRule.forClass(TestRegionReplicaFailover.class);
 
   private static final Logger LOG =
-      LoggerFactory.getLogger(TestRegionReplicaReplicationEndpoint.class);
+      LoggerFactory.getLogger(TestRegionReplicaReplication.class);
 
   private static final HBaseTestingUtil HTU = new HBaseTestingUtil();
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplication.java
similarity index 74%
rename from hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplicationEndpoint.java
rename to hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplication.java
index 5a06a11..b501ab2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplication.java
@@ -20,14 +20,13 @@ package org.apache.hadoop.hbase.replication.regionserver;
 import static org.apache.hadoop.hbase.client.RegionLocator.LOCATOR_META_REPLICAS_MODE;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import java.util.Objects;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellScanner;
@@ -36,7 +35,6 @@ import org.apache.hadoop.hbase.ClientMetaTableAccessor;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtil;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
 import org.apache.hadoop.hbase.TableName;
@@ -45,7 +43,6 @@ import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.client.RegionLocator;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
@@ -67,18 +64,17 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Tests RegionReplicaReplicationEndpoint class for hbase:meta by setting up region replicas and
- * verifying async wal replication replays the edits to the secondary region in various scenarios.
- *
- * @see TestRegionReplicaReplicationEndpoint
+ * Tests region replication for hbase:meta by setting up region replicas and verifying async wal
+ * replication replays the edits to the secondary region in various scenarios.
+ * @see TestRegionReplicaReplication
  */
-@Category({LargeTests.class})
-public class TestMetaRegionReplicaReplicationEndpoint {
+@Category({ LargeTests.class })
+public class TestMetaRegionReplicaReplication {
+
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
-    HBaseClassTestRule.forClass(TestMetaRegionReplicaReplicationEndpoint.class);
-  private static final Logger LOG =
-    LoggerFactory.getLogger(TestMetaRegionReplicaReplicationEndpoint.class);
+    HBaseClassTestRule.forClass(TestMetaRegionReplicaReplication.class);
+  private static final Logger LOG = LoggerFactory.getLogger(TestMetaRegionReplicaReplication.class);
   private static final int NB_SERVERS = 4;
   private final HBaseTestingUtil HTU = new HBaseTestingUtil();
   private int numOfMetaReplica = NB_SERVERS - 1;
@@ -102,17 +98,15 @@ public class TestMetaRegionReplicaReplicationEndpoint {
     conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
     conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);
     // Enable hbase:meta replication.
-    conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CATALOG_CONF_KEY,
-      true);
+    conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CATALOG_CONF_KEY, true);
     // Set hbase:meta replicas to be 3.
     // conf.setInt(HConstants.META_REPLICAS_NUM, numOfMetaReplica);
     HTU.startMiniCluster(NB_SERVERS);
     // Enable hbase:meta replication.
     HBaseTestingUtil.setReplicas(HTU.getAdmin(), TableName.META_TABLE_NAME, numOfMetaReplica);
 
-    HTU.waitFor(30000,
-      () -> HTU.getMiniHBaseCluster().getRegions(TableName.META_TABLE_NAME).size()
-      >= numOfMetaReplica);
+    HTU.waitFor(30000, () -> HTU.getMiniHBaseCluster().getRegions(TableName.META_TABLE_NAME)
+      .size() >= numOfMetaReplica);
   }
 
   @After
@@ -121,83 +115,19 @@ public class TestMetaRegionReplicaReplicationEndpoint {
   }
 
   /**
-   * Assert that the ReplicationSource for hbase:meta gets created when hbase:meta is opened.
-   */
-  @Test
-  public void testHBaseMetaReplicationSourceCreatedOnOpen() throws Exception {
-    SingleProcessHBaseCluster cluster = HTU.getMiniHBaseCluster();
-    HRegionServer hrs = cluster.getRegionServer(cluster.getServerHoldingMeta());
-    // Replicate a row to prove all working.
-    testHBaseMetaReplicatesOneRow(0);
-    assertTrue(isMetaRegionReplicaReplicationSource(hrs));
-    // Now move the hbase:meta and make sure the ReplicationSource is in both places.
-    HRegionServer hrsOther = null;
-    for (int i = 0; i < cluster.getNumLiveRegionServers(); i++) {
-      hrsOther = cluster.getRegionServer(i);
-      if (hrsOther.getServerName().equals(hrs.getServerName())) {
-        hrsOther = null;
-        continue;
-      }
-      break;
-    }
-    assertNotNull(hrsOther);
-    assertFalse(isMetaRegionReplicaReplicationSource(hrsOther));
-    Region meta = null;
-    for (Region region : hrs.getOnlineRegionsLocalContext()) {
-      if (region.getRegionInfo().isMetaRegion()) {
-        meta = region;
-        break;
-      }
-    }
-    assertNotNull(meta);
-    HTU.moveRegionAndWait(meta.getRegionInfo(), hrsOther.getServerName());
-    // Assert that there is a ReplicationSource in both places now.
-    assertTrue(isMetaRegionReplicaReplicationSource(hrs));
-    assertTrue(isMetaRegionReplicaReplicationSource(hrsOther));
-    // Replicate to show stuff still works.
-    testHBaseMetaReplicatesOneRow(1);
-    // Now pretend a few hours have gone by... roll the meta WAL in original location... Move the
-    // meta back and retry replication. See if it works.
-    hrs.getWAL(meta.getRegionInfo()).rollWriter(true);
-    testHBaseMetaReplicatesOneRow(2);
-    hrs.getWAL(meta.getRegionInfo()).rollWriter(true);
-    testHBaseMetaReplicatesOneRow(3);
-  }
-
-  /**
-   * Test meta region replica replication. Create some tables and see if replicas pick up the
-   * additions.
-   */
-  private void testHBaseMetaReplicatesOneRow(int i) throws Exception {
-    waitForMetaReplicasToOnline();
-    try (Table table = HTU.createTable(TableName.valueOf(this.name.getMethodName() + "_" + i),
-      HConstants.CATALOG_FAMILY)) {
-      verifyReplication(TableName.META_TABLE_NAME, numOfMetaReplica, getMetaCells(table.getName()));
-    }
-  }
-
-  /**
-   * @return Whether the special meta region replica peer is enabled on <code>hrs</code>
-   */
-  private boolean isMetaRegionReplicaReplicationSource(HRegionServer hrs) {
-    return hrs.getReplicationSourceService().getReplicationManager().
-      catalogReplicationSource.get() != null;
-  }
-
-  /**
    * Test meta region replica replication. Create some tables and see if replicas pick up the
    * additions.
    */
   @Test
   public void testHBaseMetaReplicates() throws Exception {
-    try (Table table = HTU
-      .createTable(TableName.valueOf(this.name.getMethodName() + "_0"), HConstants.CATALOG_FAMILY,
-        Arrays.copyOfRange(HBaseTestingUtil.KEYS, 1, HBaseTestingUtil.KEYS.length))) {
+    try (Table table = HTU.createTable(TableName.valueOf(this.name.getMethodName() + "_0"),
+      HConstants.CATALOG_FAMILY,
+      Arrays.copyOfRange(HBaseTestingUtil.KEYS, 1, HBaseTestingUtil.KEYS.length))) {
       verifyReplication(TableName.META_TABLE_NAME, numOfMetaReplica, getMetaCells(table.getName()));
     }
-    try (Table table = HTU
-      .createTable(TableName.valueOf(this.name.getMethodName() + "_1"), HConstants.CATALOG_FAMILY,
-        Arrays.copyOfRange(HBaseTestingUtil.KEYS, 1, HBaseTestingUtil.KEYS.length))) {
+    try (Table table = HTU.createTable(TableName.valueOf(this.name.getMethodName() + "_1"),
+      HConstants.CATALOG_FAMILY,
+      Arrays.copyOfRange(HBaseTestingUtil.KEYS, 1, HBaseTestingUtil.KEYS.length))) {
       verifyReplication(TableName.META_TABLE_NAME, numOfMetaReplica, getMetaCells(table.getName()));
       // Try delete.
       HTU.deleteTableIfAny(table.getName());
@@ -207,26 +137,22 @@ public class TestMetaRegionReplicaReplicationEndpoint {
 
   @Test
   public void testCatalogReplicaReplicationWithFlushAndCompaction() throws Exception {
-    Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
-    TableName tableName = TableName.valueOf("hbase:meta");
-    Table table = connection.getTable(tableName);
-    try {
+    try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
+      Table table = connection.getTable(TableName.META_TABLE_NAME)) {
       // load the data to the table
       for (int i = 0; i < 5; i++) {
         LOG.info("Writing data from " + i * 1000 + " to " + (i * 1000 + 1000));
         HTU.loadNumericRows(table, HConstants.CATALOG_FAMILY, i * 1000, i * 1000 + 1000);
         LOG.info("flushing table");
-        HTU.flush(tableName);
+        HTU.flush(TableName.META_TABLE_NAME);
         LOG.info("compacting table");
         if (i < 4) {
-          HTU.compact(tableName, false);
+          HTU.compact(TableName.META_TABLE_NAME, false);
         }
       }
 
-      verifyReplication(tableName, numOfMetaReplica, 0, 5000, HConstants.CATALOG_FAMILY);
-    } finally {
-      table.close();
-      connection.close();
+      verifyReplication(TableName.META_TABLE_NAME, numOfMetaReplica, 0, 5000,
+        HConstants.CATALOG_FAMILY);
     }
   }
 
@@ -235,7 +161,6 @@ public class TestMetaRegionReplicaReplicationEndpoint {
     SingleProcessHBaseCluster cluster = HTU.getMiniHBaseCluster();
     HRegionServer hrs = cluster.getRegionServer(cluster.getServerHoldingMeta());
 
-    HRegionServer hrsMetaReplica = null;
     HRegionServer hrsNoMetaReplica = null;
     HRegionServer server = null;
     Region metaReplica = null;
@@ -260,11 +185,8 @@ public class TestMetaRegionReplicaReplicationEndpoint {
         hrsNoMetaReplica = server;
       }
     }
-
-    Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
-    TableName tableName = TableName.valueOf("hbase:meta");
-    Table table = connection.getTable(tableName);
-    try {
+    try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
+      Table table = connection.getTable(TableName.META_TABLE_NAME)) {
       // load the data to the table
       for (int i = 0; i < 5; i++) {
         LOG.info("Writing data from " + i * 1000 + " to " + (i * 1000 + 1000));
@@ -274,10 +196,8 @@ public class TestMetaRegionReplicaReplicationEndpoint {
         }
       }
 
-      verifyReplication(tableName, numOfMetaReplica, 0, 5000, HConstants.CATALOG_FAMILY);
-    } finally {
-      table.close();
-      connection.close();
+      verifyReplication(TableName.META_TABLE_NAME, numOfMetaReplica, 0, 5000,
+        HConstants.CATALOG_FAMILY);
     }
   }
 
@@ -324,22 +244,6 @@ public class TestMetaRegionReplicaReplicationEndpoint {
   }
 
   /**
-   * Replicas come online after primary.
-   */
-  private void waitForMetaReplicasToOnline() throws IOException {
-    final RegionLocator regionLocator =
-      HTU.getConnection().getRegionLocator(TableName.META_TABLE_NAME);
-    HTU.waitFor(10000,
-      // getRegionLocations returns an entry for each replica but if unassigned, entry is null.
-      // Pass reload to force us to skip cache else it just keeps returning default.
-      () -> regionLocator.getRegionLocations(HConstants.EMPTY_START_ROW, true).stream().
-        filter(Objects::nonNull).count() >= numOfMetaReplica);
-    List<HRegionLocation> locations = regionLocator.getRegionLocations(HConstants.EMPTY_START_ROW);
-    LOG.info("Found locations {}", locations);
-    assertEquals(numOfMetaReplica, locations.size());
-  }
-
-  /**
    * Scan hbase:meta for <code>tableName</code> content.
    */
   private List<Result> getMetaCells(TableName tableName) throws IOException {
@@ -373,20 +277,9 @@ public class TestMetaRegionReplicaReplicationEndpoint {
     return regions;
   }
 
-  private Region getOneRegion(TableName tableName) {
-    for (int i = 0; i < NB_SERVERS; i++) {
-      HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i);
-      List<HRegion> onlineRegions = rs.getRegions(tableName);
-      if (onlineRegions.size() > 1) {
-        return onlineRegions.get(0);
-      }
-    }
-    return null;
-  }
-
   /**
-   * Verify when a Table is deleted from primary, then there are no references in replicas
-   * (because they get the delete of the table rows too).
+   * Verify when a Table is deleted from primary, then there are no references in replicas (because
+   * they get the delete of the table rows too).
    */
   private void verifyDeletedReplication(TableName tableName, int regionReplication,
     final TableName deletedTableName) {
@@ -417,8 +310,8 @@ public class TestMetaRegionReplicaReplicationEndpoint {
   }
 
   /**
-   * Cells are from hbase:meta replica so will start w/ 'tableName,'; i.e. the tablename followed
-   * by HConstants.DELIMITER. Make sure the deleted table is no longer present in passed
+   * Cells are from hbase:meta replica so will start w/ 'tableName,'; i.e. the tablename followed by
+   * HConstants.DELIMITER. Make sure the deleted table is no longer present in passed
    * <code>cells</code>.
    */
   private boolean doesNotContain(List<Cell> cells, TableName tableName) {
@@ -491,21 +384,19 @@ public class TestMetaRegionReplicaReplicationEndpoint {
   }
 
   private void primaryNoChangeReplicaIncrease(final long[] before, final long[] after) {
-    assertEquals(before[RegionInfo.DEFAULT_REPLICA_ID],
-      after[RegionInfo.DEFAULT_REPLICA_ID]);
+    assertEquals(before[RegionInfo.DEFAULT_REPLICA_ID], after[RegionInfo.DEFAULT_REPLICA_ID]);
 
-    for (int i = 1; i < after.length; i ++) {
+    for (int i = 1; i < after.length; i++) {
       assertTrue(after[i] > before[i]);
     }
   }
 
   private void primaryIncreaseReplicaNoChange(final long[] before, final long[] after) {
     // There are read requests increase for primary meta replica.
-    assertTrue(after[RegionInfo.DEFAULT_REPLICA_ID] >
-      before[RegionInfo.DEFAULT_REPLICA_ID]);
+    assertTrue(after[RegionInfo.DEFAULT_REPLICA_ID] > before[RegionInfo.DEFAULT_REPLICA_ID]);
 
     // No change for replica regions
-    for (int i = 1; i < after.length; i ++) {
+    for (int i = 1; i < after.length; i++) {
       assertEquals(before[i], after[i]);
     }
   }
@@ -515,13 +406,12 @@ public class TestMetaRegionReplicaReplicationEndpoint {
     for (Region r : metaRegions) {
       LOG.info("read request for region {} is {}", r, r.getReadRequestsCount());
       counters[i] = r.getReadRequestsCount();
-      i ++;
+      i++;
     }
   }
 
   @Test
   public void testHBaseMetaReplicaGets() throws Exception {
-
     TableName tn = TableName.valueOf(this.name.getMethodName());
     final Region[] metaRegions = getAllRegions(TableName.META_TABLE_NAME, numOfMetaReplica);
     long[] readReqsForMetaReplicas = new long[numOfMetaReplica];
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplication.java
new file mode 100644
index 0000000..7dd4255
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplication.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import static org.junit.Assert.assertNotNull;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.testclassification.FlakeyTests;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles;
+
+/**
+ * Tests RegionReplicaReplicationEndpoint class by setting up region replicas and verifying
+ * async wal replication replays the edits to the secondary region in various scenarios.
+ */
+@Category({FlakeyTests.class, LargeTests.class})
+public class TestRegionReplicaReplication {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestRegionReplicaReplication.class);
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestRegionReplicaReplication.class);
+
+  private static final int NB_SERVERS = 2;
+
+  private static final HBaseTestingUtil HTU = new HBaseTestingUtil();
+
+  @Rule
+  public TestName name = new TestName();
+
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    Configuration conf = HTU.getConfiguration();
+    conf.setFloat("hbase.regionserver.logroll.multiplier", 0.0003f);
+    conf.setInt("replication.source.size.capacity", 10240);
+    conf.setLong("replication.source.sleepforretries", 100);
+    conf.setInt("hbase.regionserver.maxlogs", 10);
+    conf.setLong("hbase.master.logcleaner.ttl", 10);
+    conf.setInt("zookeeper.recovery.retry", 1);
+    conf.setInt("zookeeper.recovery.retry.intervalmill", 10);
+    conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true);
+    conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
+    conf.setInt("replication.stats.thread.period.seconds", 5);
+    conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
+    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); // less number of retries is needed
+    conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);
+
+    HTU.startMiniCluster(NB_SERVERS);
+  }
+
+  @AfterClass
+  public static void afterClass() throws Exception {
+    HTU.shutdownMiniCluster();
+  }
+
+  private void testRegionReplicaReplication(int regionReplication) throws Exception {
+    // test region replica replication. Create a table with single region, write some data
+    // ensure that data is replicated to the secondary region
+    TableName tableName = TableName.valueOf("testRegionReplicaReplicationWithReplicas_"
+        + regionReplication);
+    TableDescriptor htd = HTU
+      .createModifyableTableDescriptor(TableName.valueOf(tableName.toString()),
+        ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
+        ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED)
+      .setRegionReplication(regionReplication).build();
+    createOrEnableTableWithRetries(htd, true);
+    TableName tableNameNoReplicas =
+        TableName.valueOf("testRegionReplicaReplicationWithReplicas_NO_REPLICAS");
+    HTU.deleteTableIfAny(tableNameNoReplicas);
+    HTU.createTable(tableNameNoReplicas, HBaseTestingUtil.fam1);
+
+    try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
+      Table table = connection.getTable(tableName);
+      Table tableNoReplicas = connection.getTable(tableNameNoReplicas)) {
+      // load some data to the non-replicated table
+      HTU.loadNumericRows(tableNoReplicas, HBaseTestingUtil.fam1, 6000, 7000);
+
+      // load the data to the table
+      HTU.loadNumericRows(table, HBaseTestingUtil.fam1, 0, 1000);
+
+      verifyReplication(tableName, regionReplication, 0, 1000);
+    } finally {
+      HTU.deleteTableIfAny(tableNameNoReplicas);
+    }
+  }
+
+  private void verifyReplication(TableName tableName, int regionReplication,
+      final int startRow, final int endRow) throws Exception {
+    verifyReplication(tableName, regionReplication, startRow, endRow, true);
+  }
+
+  private void verifyReplication(TableName tableName, int regionReplication,
+      final int startRow, final int endRow, final boolean present) throws Exception {
+    // find the regions
+    final Region[] regions = new Region[regionReplication];
+
+    for (int i=0; i < NB_SERVERS; i++) {
+      HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i);
+      List<HRegion> onlineRegions = rs.getRegions(tableName);
+      for (HRegion region : onlineRegions) {
+        regions[region.getRegionInfo().getReplicaId()] = region;
+      }
+    }
+
+    for (Region region : regions) {
+      assertNotNull(region);
+    }
+
+    for (int i = 1; i < regionReplication; i++) {
+      final Region region = regions[i];
+      // wait until all the data is replicated to all secondary regions
+      Waiter.waitFor(HTU.getConfiguration(), 90000, 1000, new Waiter.Predicate<Exception>() {
+        @Override
+        public boolean evaluate() throws Exception {
+          LOG.info("verifying replication for region replica:" + region.getRegionInfo());
+          try {
+            HTU.verifyNumericRows(region, HBaseTestingUtil.fam1, startRow, endRow, present);
+          } catch(Throwable ex) {
+            LOG.warn("Verification from secondary region is not complete yet", ex);
+            // still wait
+            return false;
+          }
+          return true;
+        }
+      });
+    }
+  }
+
+  @Test
+  public void testRegionReplicaReplicationWith2Replicas() throws Exception {
+    testRegionReplicaReplication(2);
+  }
+
+  @Test
+  public void testRegionReplicaReplicationWith3Replicas() throws Exception {
+    testRegionReplicaReplication(3);
+  }
+
+  @Test
+  public void testRegionReplicaReplicationWith10Replicas() throws Exception {
+    testRegionReplicaReplication(10);
+  }
+
+  @Test
+  public void testRegionReplicaWithoutMemstoreReplication() throws Exception {
+    int regionReplication = 3;
+    TableDescriptor htd = HTU.createModifyableTableDescriptor(name.getMethodName())
+      .setRegionReplication(regionReplication).setRegionMemStoreReplication(false).build();
+    createOrEnableTableWithRetries(htd, true);
+    final TableName tableName = htd.getTableName();
+    Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
+    Table table = connection.getTable(tableName);
+    try {
+      // write data to the primary. The replicas should not receive the data
+      final int STEP = 100;
+      for (int i = 0; i < 3; ++i) {
+        final int startRow = i * STEP;
+        final int endRow = (i + 1) * STEP;
+        LOG.info("Writing data from " + startRow + " to " + endRow);
+        HTU.loadNumericRows(table, HBaseTestingUtil.fam1, startRow, endRow);
+        verifyReplication(tableName, regionReplication, startRow, endRow, false);
+
+        // Flush the table, now the data should show up in the replicas
+        LOG.info("flushing table");
+        HTU.flush(tableName);
+        verifyReplication(tableName, regionReplication, 0, endRow, true);
+      }
+    } finally {
+      table.close();
+      connection.close();
+    }
+  }
+
+  @Test
+  public void testRegionReplicaReplicationForFlushAndCompaction() throws Exception {
+    // Tests a table with region replication 3. Writes some data, and causes flushes and
+    // compactions. Verifies that the data is readable from the replicas. Note that this
+    // does not test whether the replicas actually pick up flushed files and apply compaction
+    // to their stores
+    int regionReplication = 3;
+    TableDescriptor htd = HTU.createModifyableTableDescriptor(name.getMethodName())
+      .setRegionReplication(regionReplication).build();
+    createOrEnableTableWithRetries(htd, true);
+    final TableName tableName = htd.getTableName();
+
+    Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
+    Table table = connection.getTable(tableName);
+    try {
+      // load the data to the table
+
+      for (int i = 0; i < 6000; i += 1000) {
+        LOG.info("Writing data from " + i + " to " + (i+1000));
+        HTU.loadNumericRows(table, HBaseTestingUtil.fam1, i, i+1000);
+        LOG.info("flushing table");
+        HTU.flush(tableName);
+        LOG.info("compacting table");
+        HTU.compact(tableName, false);
+      }
+
+      verifyReplication(tableName, regionReplication, 0, 1000);
+    } finally {
+      table.close();
+      connection.close();
+    }
+  }
+
+  private void createOrEnableTableWithRetries(TableDescriptor htd, boolean createTableOperation) {
+    // Helper function to run create/enable table operations with a retry feature
+    boolean continueToRetry = true;
+    int tries = 0;
+    while (continueToRetry && tries < 50) {
+      try {
+        continueToRetry = false;
+        if (createTableOperation) {
+          HTU.getAdmin().createTable(htd);
+        } else {
+          HTU.getAdmin().enableTable(htd.getTableName());
+        }
+      } catch (IOException e) {
+        if (e.getCause() instanceof ReplicationException) {
+          continueToRetry = true;
+          tries++;
+          Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+        }
+      }
+    }
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java
deleted file mode 100644
index d238e09..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java
+++ /dev/null
@@ -1,515 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication.regionserver;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.Cell.Type;
-import org.apache.hadoop.hbase.CellBuilderFactory;
-import org.apache.hadoop.hbase.CellBuilderType;
-import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HBaseTestingUtil;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.Waiter;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.RegionLocator;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.client.TableDescriptor;
-import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
-import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
-import org.apache.hadoop.hbase.regionserver.Region;
-import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
-import org.apache.hadoop.hbase.replication.ReplicationException;
-import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
-import org.apache.hadoop.hbase.testclassification.FlakeyTests;
-import org.apache.hadoop.hbase.testclassification.LargeTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
-import org.apache.hadoop.hbase.wal.WAL.Entry;
-import org.apache.hadoop.hbase.wal.WALEdit;
-import org.apache.hadoop.hbase.wal.WALKeyImpl;
-import org.apache.hadoop.hbase.zookeeper.ZKConfig;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
-import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles;
-
-/**
- * Tests RegionReplicaReplicationEndpoint class by setting up region replicas and verifying
- * async wal replication replays the edits to the secondary region in various scenarios.
- */
-@Category({FlakeyTests.class, LargeTests.class})
-public class TestRegionReplicaReplicationEndpoint {
-
-  @ClassRule
-  public static final HBaseClassTestRule CLASS_RULE =
-      HBaseClassTestRule.forClass(TestRegionReplicaReplicationEndpoint.class);
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(TestRegionReplicaReplicationEndpoint.class);
-
-  private static final int NB_SERVERS = 2;
-
-  private static final HBaseTestingUtil HTU = new HBaseTestingUtil();
-
-  @Rule
-  public TestName name = new TestName();
-
-  @BeforeClass
-  public static void beforeClass() throws Exception {
-    Configuration conf = HTU.getConfiguration();
-    conf.setFloat("hbase.regionserver.logroll.multiplier", 0.0003f);
-    conf.setInt("replication.source.size.capacity", 10240);
-    conf.setLong("replication.source.sleepforretries", 100);
-    conf.setInt("hbase.regionserver.maxlogs", 10);
-    conf.setLong("hbase.master.logcleaner.ttl", 10);
-    conf.setInt("zookeeper.recovery.retry", 1);
-    conf.setInt("zookeeper.recovery.retry.intervalmill", 10);
-    conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true);
-    conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
-    conf.setInt("replication.stats.thread.period.seconds", 5);
-    conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
-    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); // less number of retries is needed
-    conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);
-
-    HTU.startMiniCluster(NB_SERVERS);
-  }
-
-  @AfterClass
-  public static void afterClass() throws Exception {
-    HTU.shutdownMiniCluster();
-  }
-
-  @Test
-  public void testRegionReplicaReplicationPeerIsCreated() throws IOException {
-    // create a table with region replicas. Check whether the replication peer is created
-    // and replication started.
-    try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
-      Admin admin = connection.getAdmin()) {
-      String peerId = ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER;
-
-      ReplicationPeerConfig peerConfig = null;
-      try {
-        peerConfig = admin.getReplicationPeerConfig(peerId);
-      } catch (ReplicationPeerNotFoundException e) {
-        LOG.warn("Region replica replication peer id=" + peerId + " not exist", e);
-      }
-
-      if (peerConfig != null) {
-        admin.removeReplicationPeer(peerId);
-        peerConfig = null;
-      }
-
-      TableDescriptor htd = HTU
-        .createTableDescriptor(TableName.valueOf("testReplicationPeerIsCreated_no_region_replicas"),
-          ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
-          ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
-      createOrEnableTableWithRetries(htd, true);
-      try {
-        peerConfig = admin.getReplicationPeerConfig(peerId);
-        fail("Should throw ReplicationException, because replication peer id=" + peerId
-          + " not exist");
-      } catch (ReplicationPeerNotFoundException e) {
-      }
-      assertNull(peerConfig);
-
-      htd = HTU.createModifyableTableDescriptor(TableName.valueOf("testReplicationPeerIsCreated"),
-        ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
-        ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED).setRegionReplication(2).build();
-      createOrEnableTableWithRetries(htd, true);
-
-      // assert peer configuration is correct
-      peerConfig = admin.getReplicationPeerConfig(peerId);
-      assertNotNull(peerConfig);
-      assertEquals(peerConfig.getClusterKey(),
-        ZKConfig.getZooKeeperClusterKey(HTU.getConfiguration()));
-      assertEquals(RegionReplicaReplicationEndpoint.class.getName(),
-        peerConfig.getReplicationEndpointImpl());
-    }
-  }
-
-  @Test
-  public void testRegionReplicaReplicationPeerIsCreatedForModifyTable() throws Exception {
-    // modify a table by adding region replicas. Check whether the replication peer is created
-    // and replication started.
-    try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
-      Admin admin = connection.getAdmin()) {
-      String peerId = "region_replica_replication";
-
-      ReplicationPeerConfig peerConfig = null;
-      try {
-        peerConfig = admin.getReplicationPeerConfig(peerId);
-      } catch (ReplicationPeerNotFoundException e) {
-        LOG.warn("Region replica replication peer id=" + peerId + " not exist", e);
-      }
-
-      if (peerConfig != null) {
-        admin.removeReplicationPeer(peerId);
-        peerConfig = null;
-      }
-
-      TableDescriptor htd = HTU.createTableDescriptor(
-        TableName.valueOf("testRegionReplicaReplicationPeerIsCreatedForModifyTable"),
-        ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
-        ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
-      createOrEnableTableWithRetries(htd, true);
-
-      // assert that replication peer is not created yet
-      try {
-        peerConfig = admin.getReplicationPeerConfig(peerId);
-        fail("Should throw ReplicationException, because replication peer id=" + peerId
-          + " not exist");
-      } catch (ReplicationPeerNotFoundException e) {
-      }
-      assertNull(peerConfig);
-
-      HTU.getAdmin().disableTable(htd.getTableName());
-      htd = TableDescriptorBuilder.newBuilder(htd).setRegionReplication(2).build();
-      HTU.getAdmin().modifyTable(htd);
-      createOrEnableTableWithRetries(htd, false);
-
-      // assert peer configuration is correct
-      peerConfig = admin.getReplicationPeerConfig(peerId);
-      assertNotNull(peerConfig);
-      assertEquals(peerConfig.getClusterKey(),
-        ZKConfig.getZooKeeperClusterKey(HTU.getConfiguration()));
-      assertEquals(RegionReplicaReplicationEndpoint.class.getName(),
-        peerConfig.getReplicationEndpointImpl());
-    }
-  }
-
-  public void testRegionReplicaReplication(int regionReplication) throws Exception {
-    // test region replica replication. Create a table with single region, write some data
-    // ensure that data is replicated to the secondary region
-    TableName tableName = TableName.valueOf("testRegionReplicaReplicationWithReplicas_"
-        + regionReplication);
-    TableDescriptor htd = HTU
-      .createModifyableTableDescriptor(TableName.valueOf(tableName.toString()),
-        ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
-        ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED)
-      .setRegionReplication(regionReplication).build();
-    createOrEnableTableWithRetries(htd, true);
-    TableName tableNameNoReplicas =
-        TableName.valueOf("testRegionReplicaReplicationWithReplicas_NO_REPLICAS");
-    HTU.deleteTableIfAny(tableNameNoReplicas);
-    HTU.createTable(tableNameNoReplicas, HBaseTestingUtil.fam1);
-
-    Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
-    Table table = connection.getTable(tableName);
-    Table tableNoReplicas = connection.getTable(tableNameNoReplicas);
-
-    try {
-      // load some data to the non-replicated table
-      HTU.loadNumericRows(tableNoReplicas, HBaseTestingUtil.fam1, 6000, 7000);
-
-      // load the data to the table
-      HTU.loadNumericRows(table, HBaseTestingUtil.fam1, 0, 1000);
-
-      verifyReplication(tableName, regionReplication, 0, 1000);
-
-    } finally {
-      table.close();
-      tableNoReplicas.close();
-      HTU.deleteTableIfAny(tableNameNoReplicas);
-      connection.close();
-    }
-  }
-
-  private void verifyReplication(TableName tableName, int regionReplication,
-      final int startRow, final int endRow) throws Exception {
-    verifyReplication(tableName, regionReplication, startRow, endRow, true);
-  }
-
-  private void verifyReplication(TableName tableName, int regionReplication,
-      final int startRow, final int endRow, final boolean present) throws Exception {
-    // find the regions
-    final Region[] regions = new Region[regionReplication];
-
-    for (int i=0; i < NB_SERVERS; i++) {
-      HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i);
-      List<HRegion> onlineRegions = rs.getRegions(tableName);
-      for (HRegion region : onlineRegions) {
-        regions[region.getRegionInfo().getReplicaId()] = region;
-      }
-    }
-
-    for (Region region : regions) {
-      assertNotNull(region);
-    }
-
-    for (int i = 1; i < regionReplication; i++) {
-      final Region region = regions[i];
-      // wait until all the data is replicated to all secondary regions
-      Waiter.waitFor(HTU.getConfiguration(), 90000, 1000, new Waiter.Predicate<Exception>() {
-        @Override
-        public boolean evaluate() throws Exception {
-          LOG.info("verifying replication for region replica:" + region.getRegionInfo());
-          try {
-            HTU.verifyNumericRows(region, HBaseTestingUtil.fam1, startRow, endRow, present);
-          } catch(Throwable ex) {
-            LOG.warn("Verification from secondary region is not complete yet", ex);
-            // still wait
-            return false;
-          }
-          return true;
-        }
-      });
-    }
-  }
-
-  @Test
-  public void testRegionReplicaReplicationWith2Replicas() throws Exception {
-    testRegionReplicaReplication(2);
-  }
-
-  @Test
-  public void testRegionReplicaReplicationWith3Replicas() throws Exception {
-    testRegionReplicaReplication(3);
-  }
-
-  @Test
-  public void testRegionReplicaReplicationWith10Replicas() throws Exception {
-    testRegionReplicaReplication(10);
-  }
-
-  @Test
-  public void testRegionReplicaWithoutMemstoreReplication() throws Exception {
-    int regionReplication = 3;
-    TableDescriptor htd = HTU.createModifyableTableDescriptor(name.getMethodName())
-      .setRegionReplication(regionReplication).setRegionMemStoreReplication(false).build();
-    createOrEnableTableWithRetries(htd, true);
-    final TableName tableName = htd.getTableName();
-    Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
-    Table table = connection.getTable(tableName);
-    try {
-      // write data to the primary. The replicas should not receive the data
-      final int STEP = 100;
-      for (int i = 0; i < 3; ++i) {
-        final int startRow = i * STEP;
-        final int endRow = (i + 1) * STEP;
-        LOG.info("Writing data from " + startRow + " to " + endRow);
-        HTU.loadNumericRows(table, HBaseTestingUtil.fam1, startRow, endRow);
-        verifyReplication(tableName, regionReplication, startRow, endRow, false);
-
-        // Flush the table, now the data should show up in the replicas
-        LOG.info("flushing table");
-        HTU.flush(tableName);
-        verifyReplication(tableName, regionReplication, 0, endRow, true);
-      }
-    } finally {
-      table.close();
-      connection.close();
-    }
-  }
-
-  @Test
-  public void testRegionReplicaReplicationForFlushAndCompaction() throws Exception {
-    // Tests a table with region replication 3. Writes some data, and causes flushes and
-    // compactions. Verifies that the data is readable from the replicas. Note that this
-    // does not test whether the replicas actually pick up flushed files and apply compaction
-    // to their stores
-    int regionReplication = 3;
-    TableDescriptor htd = HTU.createModifyableTableDescriptor(name.getMethodName())
-      .setRegionReplication(regionReplication).build();
-    createOrEnableTableWithRetries(htd, true);
-    final TableName tableName = htd.getTableName();
-
-    Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
-    Table table = connection.getTable(tableName);
-    try {
-      // load the data to the table
-
-      for (int i = 0; i < 6000; i += 1000) {
-        LOG.info("Writing data from " + i + " to " + (i+1000));
-        HTU.loadNumericRows(table, HBaseTestingUtil.fam1, i, i+1000);
-        LOG.info("flushing table");
-        HTU.flush(tableName);
-        LOG.info("compacting table");
-        HTU.compact(tableName, false);
-      }
-
-      verifyReplication(tableName, regionReplication, 0, 1000);
-    } finally {
-      table.close();
-      connection.close();
-    }
-  }
-
-  @Test
-  public void testRegionReplicaReplicationIgnoresDisabledTables() throws Exception {
-    testRegionReplicaReplicationIgnores(false, false);
-  }
-
-  @Test
-  public void testRegionReplicaReplicationIgnoresDroppedTables() throws Exception {
-    testRegionReplicaReplicationIgnores(true, false);
-  }
-
-  @Test
-  public void testRegionReplicaReplicationIgnoresNonReplicatedTables() throws Exception {
-    testRegionReplicaReplicationIgnores(false, true);
-  }
-
-  private void testRegionReplicaReplicationIgnores(boolean dropTable, boolean disableReplication)
-      throws Exception {
-    // tests having edits from a disabled or dropped table is handled correctly by skipping those
-    // entries and further edits after the edits from dropped/disabled table can be replicated
-    // without problems.
-    int regionReplication = 3;
-    TableDescriptor htd = HTU
-      .createModifyableTableDescriptor(
-        name.getMethodName() + "_drop_" + dropTable + "_disabledReplication_" + disableReplication)
-      .setRegionReplication(regionReplication).build();
-    final TableName tableName = htd.getTableName();
-    HTU.deleteTableIfAny(tableName);
-
-    createOrEnableTableWithRetries(htd, true);
-    TableName toBeDisabledTable = TableName.valueOf(
-      dropTable ? "droppedTable" : (disableReplication ? "disableReplication" : "disabledTable"));
-    HTU.deleteTableIfAny(toBeDisabledTable);
-    htd = HTU
-      .createModifyableTableDescriptor(toBeDisabledTable,
-        ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
-        ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED)
-      .setRegionReplication(regionReplication).build();
-    createOrEnableTableWithRetries(htd, true);
-
-    // both tables are created, now pause replication
-    HTU.getAdmin().disableReplicationPeer(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER);
-
-    // now that the replication is disabled, write to the table to be dropped, then drop the table.
-
-    Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
-    Table table = connection.getTable(tableName);
-    Table tableToBeDisabled = connection.getTable(toBeDisabledTable);
-
-    HTU.loadNumericRows(tableToBeDisabled, HBaseTestingUtil.fam1, 6000, 7000);
-
-    RegionLocator rl = connection.getRegionLocator(toBeDisabledTable);
-    HRegionLocation hrl = rl.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY);
-    byte[] encodedRegionName = hrl.getRegion().getEncodedNameAsBytes();
-
-    Cell cell = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(Bytes.toBytes("A"))
-        .setFamily(HTU.fam1).setValue(Bytes.toBytes("VAL")).setType(Type.Put).build();
-    Entry entry = new Entry(
-      new WALKeyImpl(encodedRegionName, toBeDisabledTable, 1),
-        new WALEdit()
-            .add(cell));
-    HTU.getAdmin().disableTable(toBeDisabledTable); // disable the table
-    if (dropTable) {
-      HTU.getAdmin().deleteTable(toBeDisabledTable);
-    } else if (disableReplication) {
-      htd =
-        TableDescriptorBuilder.newBuilder(htd).setRegionReplication(regionReplication - 2).build();
-      HTU.getAdmin().modifyTable(htd);
-      createOrEnableTableWithRetries(htd, false);
-    }
-
-    HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(0);
-    MetricsSource metrics = mock(MetricsSource.class);
-    ReplicationEndpoint.Context ctx =
-      new ReplicationEndpoint.Context(rs, HTU.getConfiguration(), HTU.getConfiguration(),
-        HTU.getTestFileSystem(), ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER,
-        UUID.fromString(rs.getClusterId()), rs.getReplicationSourceService().getReplicationPeers()
-          .getPeer(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER),
-        metrics, rs.getTableDescriptors(), rs);
-    RegionReplicaReplicationEndpoint rrpe = new RegionReplicaReplicationEndpoint();
-    rrpe.init(ctx);
-    rrpe.start();
-    ReplicationEndpoint.ReplicateContext repCtx = new ReplicationEndpoint.ReplicateContext();
-    repCtx.setEntries(Lists.newArrayList(entry, entry));
-    assertTrue(rrpe.replicate(repCtx));
-    verify(metrics, times(1)).incrLogEditsFiltered(eq(2L));
-    rrpe.stop();
-    if (disableReplication) {
-      // enable replication again so that we can verify replication
-      HTU.getAdmin().disableTable(toBeDisabledTable); // disable the table
-      htd = TableDescriptorBuilder.newBuilder(htd).setRegionReplication(regionReplication).build();
-      HTU.getAdmin().modifyTable(htd);
-      createOrEnableTableWithRetries(htd, false);
-    }
-
-    try {
-      // load some data to the to-be-dropped table
-      // load the data to the table
-      HTU.loadNumericRows(table, HBaseTestingUtil.fam1, 0, 1000);
-
-      // now enable the replication
-      HTU.getAdmin().enableReplicationPeer(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER);
-
-      verifyReplication(tableName, regionReplication, 0, 1000);
-    } finally {
-      table.close();
-      rl.close();
-      tableToBeDisabled.close();
-      HTU.deleteTableIfAny(toBeDisabledTable);
-      connection.close();
-    }
-  }
-
-  private void createOrEnableTableWithRetries(TableDescriptor htd, boolean createTableOperation) {
-    // Helper function to run create/enable table operations with a retry feature
-    boolean continueToRetry = true;
-    int tries = 0;
-    while (continueToRetry && tries < 50) {
-      try {
-        continueToRetry = false;
-        if (createTableOperation) {
-          HTU.getAdmin().createTable(htd);
-        } else {
-          HTU.getAdmin().enableTable(htd.getTableName());
-        }
-      } catch (IOException e) {
-        if (e.getCause() instanceof ReplicationException) {
-          continueToRetry = true;
-          tries++;
-          Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
-        }
-      }
-    }
-  }
-}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
deleted file mode 100644
index a37ab5f..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
+++ /dev/null
@@ -1,281 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication.regionserver;
-
-import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.closeRegion;
-import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.openRegion;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Optional;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HBaseTestingUtil;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.RegionLocations;
-import org.apache.hadoop.hbase.StartTestingClusterOption;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.AsyncClusterConnection;
-import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
-import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
-import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.client.RegionLocator;
-import org.apache.hadoop.hbase.client.RegionReplicaUtil;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.client.TableDescriptor;
-import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
-import org.apache.hadoop.hbase.coprocessor.ObserverContext;
-import org.apache.hadoop.hbase.coprocessor.WALCoprocessor;
-import org.apache.hadoop.hbase.coprocessor.WALCoprocessorEnvironment;
-import org.apache.hadoop.hbase.coprocessor.WALObserver;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
-import org.apache.hadoop.hbase.regionserver.Region;
-import org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster;
-import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
-import org.apache.hadoop.hbase.replication.ReplicationEndpoint.ReplicateContext;
-import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.testclassification.ReplicationTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
-import org.apache.hadoop.hbase.wal.WAL.Entry;
-import org.apache.hadoop.hbase.wal.WALEdit;
-import org.apache.hadoop.hbase.wal.WALKey;
-import org.apache.hadoop.hbase.wal.WALKeyImpl;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
-
-/**
- * Tests RegionReplicaReplicationEndpoint. Unlike TestRegionReplicaReplicationEndpoint this
- * class contains lower level tests using callables.
- */
-@Category({ReplicationTests.class, MediumTests.class})
-public class TestRegionReplicaReplicationEndpointNoMaster {
-
-  @ClassRule
-  public static final HBaseClassTestRule CLASS_RULE =
-      HBaseClassTestRule.forClass(TestRegionReplicaReplicationEndpointNoMaster.class);
-
-  private static final int NB_SERVERS = 2;
-  private static TableName tableName = TableName.valueOf(
-    TestRegionReplicaReplicationEndpointNoMaster.class.getSimpleName());
-  private static Table table;
-  private static final byte[] row = Bytes.toBytes("TestRegionReplicaReplicator");
-
-  private static HRegionServer rs0;
-  private static HRegionServer rs1;
-
-  private static RegionInfo hriPrimary;
-  private static RegionInfo hriSecondary;
-
-  private static final HBaseTestingUtil HTU = new HBaseTestingUtil();
-  private static final byte[] f = HConstants.CATALOG_FAMILY;
-
-  @BeforeClass
-  public static void beforeClass() throws Exception {
-    Configuration conf = HTU.getConfiguration();
-    conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true);
-    conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_WAIT_FOR_PRIMARY_FLUSH_CONF_KEY, false);
-
-    // install WALObserver coprocessor for tests
-    String walCoprocs = HTU.getConfiguration().get(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY);
-    if (walCoprocs == null) {
-      walCoprocs = WALEditCopro.class.getName();
-    } else {
-      walCoprocs += "," + WALEditCopro.class.getName();
-    }
-    HTU.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY,
-      walCoprocs);
-    StartTestingClusterOption option = StartTestingClusterOption.builder()
-      .numAlwaysStandByMasters(1).numRegionServers(NB_SERVERS).numDataNodes(NB_SERVERS).build();
-    HTU.startMiniCluster(option);
-
-    // Create table then get the single region for our new table.
-    TableDescriptor htd = HTU.createTableDescriptor(TableName.valueOf(tableName.getNameAsString()),
-      ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
-      ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
-    table = HTU.createTable(htd, new byte[][]{f}, null);
-
-    try (RegionLocator locator = HTU.getConnection().getRegionLocator(tableName)) {
-      hriPrimary = locator.getRegionLocation(row, false).getRegion();
-    }
-
-    // mock a secondary region info to open
-    hriSecondary = RegionReplicaUtil.getRegionInfoForReplica(hriPrimary, 1);
-
-    // No master
-    TestRegionServerNoMaster.stopMasterAndCacheMetaLocation(HTU);
-    rs0 = HTU.getMiniHBaseCluster().getRegionServer(0);
-    rs1 = HTU.getMiniHBaseCluster().getRegionServer(1);
-  }
-
-  @AfterClass
-  public static void afterClass() throws Exception {
-    HRegionServer.TEST_SKIP_REPORTING_TRANSITION = false;
-    table.close();
-    HTU.shutdownMiniCluster();
-  }
-
-  @Before
-  public void before() throws Exception {
-    entries.clear();
-  }
-
-  @After
-  public void after() throws Exception {
-  }
-
-  static ConcurrentLinkedQueue<Entry> entries = new ConcurrentLinkedQueue<>();
-
-  public static class WALEditCopro implements WALCoprocessor, WALObserver {
-    public WALEditCopro() {
-      entries.clear();
-    }
-
-    @Override
-    public Optional<WALObserver> getWALObserver() {
-      return Optional.of(this);
-    }
-
-    @Override
-    public void postWALWrite(ObserverContext<? extends WALCoprocessorEnvironment> ctx,
-                             RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {
-      // only keep primary region's edits
-      if (logKey.getTableName().equals(tableName) && info.getReplicaId() == 0) {
-        // Presume type is a WALKeyImpl
-        entries.add(new Entry((WALKeyImpl)logKey, logEdit));
-      }
-    }
-  }
-
-  @Test
-  public void testReplayCallable() throws Exception {
-    // tests replaying the edits to a secondary region replica using the Callable directly
-    openRegion(HTU, rs0, hriSecondary);
-
-    // load some data to primary
-    HTU.loadNumericRows(table, f, 0, 1000);
-
-    Assert.assertEquals(1000, entries.size());
-    try (AsyncClusterConnection conn = ClusterConnectionFactory
-      .createAsyncClusterConnection(HTU.getConfiguration(), null, User.getCurrent())) {
-      // replay the edits to the secondary using replay callable
-      replicateUsingCallable(conn, entries);
-    }
-
-    Region region = rs0.getRegion(hriSecondary.getEncodedName());
-    HTU.verifyNumericRows(region, f, 0, 1000);
-
-    HTU.deleteNumericRows(table, f, 0, 1000);
-    closeRegion(HTU, rs0, hriSecondary);
-  }
-
-  private void replicateUsingCallable(AsyncClusterConnection connection, Queue<Entry> entries)
-      throws IOException, ExecutionException, InterruptedException {
-    Entry entry;
-    while ((entry = entries.poll()) != null) {
-      byte[] row = CellUtil.cloneRow(entry.getEdit().getCells().get(0));
-      RegionLocations locations = connection.getRegionLocations(tableName, row, true).get();
-      connection
-        .replay(tableName, locations.getRegionLocation(1).getRegion().getEncodedNameAsBytes(), row,
-          Collections.singletonList(entry), 1, Integer.MAX_VALUE, TimeUnit.SECONDS.toNanos(10))
-        .get();
-    }
-  }
-
-  @Test
-  public void testReplayCallableWithRegionMove() throws Exception {
-    // tests replaying the edits to a secondary region replica using the Callable directly while
-    // the region is moved to another location.It tests handling of RME.
-    try (AsyncClusterConnection conn = ClusterConnectionFactory
-      .createAsyncClusterConnection(HTU.getConfiguration(), null, User.getCurrent())) {
-      openRegion(HTU, rs0, hriSecondary);
-      // load some data to primary
-      HTU.loadNumericRows(table, f, 0, 1000);
-
-      Assert.assertEquals(1000, entries.size());
-
-      // replay the edits to the secondary using replay callable
-      replicateUsingCallable(conn, entries);
-
-      Region region = rs0.getRegion(hriSecondary.getEncodedName());
-      HTU.verifyNumericRows(region, f, 0, 1000);
-
-      HTU.loadNumericRows(table, f, 1000, 2000); // load some more data to primary
-
-      // move the secondary region from RS0 to RS1
-      closeRegion(HTU, rs0, hriSecondary);
-      openRegion(HTU, rs1, hriSecondary);
-
-      // replicate the new data
-      replicateUsingCallable(conn, entries);
-
-      region = rs1.getRegion(hriSecondary.getEncodedName());
-      // verify the new data. old data may or may not be there
-      HTU.verifyNumericRows(region, f, 1000, 2000);
-
-      HTU.deleteNumericRows(table, f, 0, 2000);
-      closeRegion(HTU, rs1, hriSecondary);
-    }
-  }
-
-  @Test
-  public void testRegionReplicaReplicationEndpointReplicate() throws Exception {
-    // tests replaying the edits to a secondary region replica using the RRRE.replicate()
-    openRegion(HTU, rs0, hriSecondary);
-    RegionReplicaReplicationEndpoint replicator = new RegionReplicaReplicationEndpoint();
-
-    ReplicationEndpoint.Context context = mock(ReplicationEndpoint.Context.class);
-    when(context.getConfiguration()).thenReturn(HTU.getConfiguration());
-    when(context.getMetrics()).thenReturn(mock(MetricsSource.class));
-    when(context.getServer()).thenReturn(rs0);
-    when(context.getTableDescriptors()).thenReturn(rs0.getTableDescriptors());
-    replicator.init(context);
-    replicator.startAsync();
-
-    //load some data to primary
-    HTU.loadNumericRows(table, f, 0, 1000);
-
-    Assert.assertEquals(1000, entries.size());
-    // replay the edits to the secondary using replay callable
-    final String fakeWalGroupId = "fakeWALGroup";
-    replicator.replicate(new ReplicateContext().setEntries(Lists.newArrayList(entries))
-        .setWalGroupId(fakeWalGroupId));
-    replicator.stop();
-    Region region = rs0.getRegion(hriSecondary.getEncodedName());
-    HTU.verifyNumericRows(region, f, 0, 1000);
-
-    HTU.deleteNumericRows(table, f, 0, 1000);
-    closeRegion(HTU, rs0, hriSecondary);
-  }
-}