You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2021/11/23 14:37:11 UTC

[hbase] 01/08: HBASE-26407 Introduce a region replication sink for sinking WAL edits to secondary replicas directly (#3807)

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch HBASE-26233
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 6d5490bb87f672fa15ea748d2f0597377fc9a61f
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Tue Nov 2 08:42:29 2021 +0800

    HBASE-26407 Introduce a region replication sink for sinking WAL edits to secondary replicas directly (#3807)
    
    Signed-off-by: Xiaolin Ha <ha...@apache.org>
---
 .../hbase/client/AsyncClusterConnection.java       |  13 +-
 .../hbase/client/AsyncClusterConnectionImpl.java   |  17 +-
 .../AsyncRegionReplicaReplayRetryingCaller.java    | 147 ------
 .../AsyncRegionReplicationRetryingCaller.java      | 103 +++++
 .../org/apache/hadoop/hbase/ipc/RpcServer.java     |  11 +
 .../hadoop/hbase/master/janitor/MetaFixer.java     |   9 -
 .../master/procedure/CreateTableProcedure.java     |  11 -
 .../master/procedure/ModifyTableProcedure.java     |   9 -
 .../apache/hadoop/hbase/regionserver/HRegion.java  |  77 ++-
 .../apache/hadoop/hbase/regionserver/HStore.java   |   3 +-
 .../MultiVersionConcurrencyControl.java            |  24 +-
 .../hbase/regionserver/RegionReplicationSink.java  | 228 +++++++++
 .../regionserver/handler/AssignRegionHandler.java  |  12 +-
 .../handler/UnassignRegionHandler.java             |  12 +-
 .../hbase/regionserver/wal/AbstractFSWAL.java      |   3 +-
 .../hadoop/hbase/regionserver/wal/WALUtil.java     |  44 +-
 .../regionserver/CatalogReplicationSource.java     |  47 --
 .../regionserver/CatalogReplicationSourcePeer.java |  50 --
 .../RegionReplicaReplicationEndpoint.java          | 407 ----------------
 .../regionserver/ReplicationSourceFactory.java     |   1 -
 .../regionserver/ReplicationSourceManager.java     |  87 ----
 .../hadoop/hbase/util/ServerRegionReplicaUtil.java |  32 --
 .../org/apache/hadoop/hbase/TestIOFencing.java     |   4 +-
 .../hbase/client/DummyAsyncClusterConnection.java  |  13 +-
 .../hadoop/hbase/regionserver/TestHRegion.java     |   2 +-
 .../regionserver/TestRegionReplicaFailover.java    |   4 +-
 ....java => TestMetaRegionReplicaReplication.java} | 184 ++------
 .../regionserver/TestRegionReplicaReplication.java | 273 +++++++++++
 .../TestRegionReplicaReplicationEndpoint.java      | 515 ---------------------
 ...stRegionReplicaReplicationEndpointNoMaster.java | 281 -----------
 30 files changed, 788 insertions(+), 1835 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
index 9502424..11c4f4f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
@@ -58,13 +58,6 @@ public interface AsyncClusterConnection extends AsyncConnection {
   CompletableFuture<FlushRegionResponse> flush(byte[] regionName, boolean writeFlushWALMarker);
 
   /**
-   * Replicate wal edits for replica regions. The return value is the edits we skipped, as the
-   * original return value is useless.
-   */
-  CompletableFuture<Long> replay(TableName tableName, byte[] encodedRegionName, byte[] row,
-      List<Entry> entries, int replicaId, int numRetries, long operationTimeoutNs);
-
-  /**
    * Return all the replicas for a region. Used for region replica replication.
    */
   CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[] row,
@@ -110,4 +103,10 @@ public interface AsyncClusterConnection extends AsyncConnection {
    * Get the bootstrap node list of another region server.
    */
   CompletableFuture<List<ServerName>> getAllBootstrapNodes(ServerName regionServer);
+
+  /**
+   * Replicate wal edits to a secondary replica.
+   */
+  CompletableFuture<Void> replicate(RegionInfo replica, List<Entry> entries, int numRetries,
+    long rpcTimeoutNs, long operationTimeoutNs);
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java
index 825fbb4..789d616 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java
@@ -85,14 +85,6 @@ class AsyncClusterConnectionImpl extends AsyncConnectionImpl implements AsyncClu
   }
 
   @Override
-  public CompletableFuture<Long> replay(TableName tableName, byte[] encodedRegionName, byte[] row,
-      List<Entry> entries, int replicaId, int retries, long operationTimeoutNs) {
-    return new AsyncRegionReplicaReplayRetryingCaller(RETRY_TIMER, this,
-      ConnectionUtils.retries2Attempts(retries), operationTimeoutNs, tableName, encodedRegionName,
-      row, entries, replicaId).call();
-  }
-
-  @Override
   public CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[] row,
       boolean reload) {
     return getLocator().getRegionLocations(tableName, row, RegionLocateType.CURRENT, reload, -1L);
@@ -176,4 +168,13 @@ class AsyncClusterConnectionImpl extends AsyncConnectionImpl implements AsyncClu
       });
     return future;
   }
+
+  @Override
+  public CompletableFuture<Void> replicate(RegionInfo replica,
+    List<Entry> entries, int retries, long rpcTimeoutNs,
+    long operationTimeoutNs) {
+    return new AsyncRegionReplicationRetryingCaller(RETRY_TIMER, this,
+      ConnectionUtils.retries2Attempts(retries), rpcTimeoutNs, operationTimeoutNs, replica, entries)
+        .call();
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicaReplayRetryingCaller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicaReplayRetryingCaller.java
deleted file mode 100644
index 0146c8b..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicaReplayRetryingCaller.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.client;
-
-import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
-
-import java.io.IOException;
-import java.util.List;
-import org.apache.hadoop.hbase.CellScanner;
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.wal.WAL.Entry;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
-
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
-
-/**
- * For replaying edits for region replica.
- * <p/>
- * The mainly difference here is that, every time after locating, we will check whether the region
- * name is equal, if not, we will give up, as this usually means the region has been split or
- * merged, and the new region(s) should already have all the data of the parent region(s).
- * <p/>
- * Notice that, the return value is the edits we skipped, as the original response message is not
- * used at upper layer.
- */
-@InterfaceAudience.Private
-public class AsyncRegionReplicaReplayRetryingCaller extends AsyncRpcRetryingCaller<Long> {
-
-  private static final Logger LOG =
-    LoggerFactory.getLogger(AsyncRegionReplicaReplayRetryingCaller.class);
-
-  private final TableName tableName;
-
-  private final byte[] encodedRegionName;
-
-  private final byte[] row;
-
-  private final Entry[] entries;
-
-  private final int replicaId;
-
-  public AsyncRegionReplicaReplayRetryingCaller(HashedWheelTimer retryTimer,
-      AsyncClusterConnectionImpl conn, int maxAttempts, long operationTimeoutNs,
-      TableName tableName, byte[] encodedRegionName, byte[] row, List<Entry> entries,
-      int replicaId) {
-    super(retryTimer, conn, ConnectionUtils.getPriority(tableName), conn.connConf.getPauseNs(),
-      conn.connConf.getPauseForCQTBENs(), maxAttempts, operationTimeoutNs,
-      conn.connConf.getWriteRpcTimeoutNs(), conn.connConf.getStartLogErrorsCnt());
-    this.tableName = tableName;
-    this.encodedRegionName = encodedRegionName;
-    this.row = row;
-    this.entries = entries.toArray(new Entry[0]);
-    this.replicaId = replicaId;
-  }
-
-  private void call(HRegionLocation loc) {
-    if (!Bytes.equals(encodedRegionName, loc.getRegion().getEncodedNameAsBytes())) {
-      if (LOG.isTraceEnabled()) {
-        LOG.trace(
-          "Skipping {} entries in table {} because located region {} is different than" +
-            " the original region {} from WALEdit",
-          entries.length, tableName, loc.getRegion().getEncodedName(),
-          Bytes.toStringBinary(encodedRegionName));
-        for (Entry entry : entries) {
-          LOG.trace("Skipping : " + entry);
-        }
-      }
-      future.complete(Long.valueOf(entries.length));
-      return;
-    }
-
-    AdminService.Interface stub;
-    try {
-      stub = conn.getAdminStub(loc.getServerName());
-    } catch (IOException e) {
-      onError(e,
-        () -> "Get async admin stub to " + loc.getServerName() + " for '" +
-          Bytes.toStringBinary(row) + "' in " + loc.getRegion().getEncodedName() + " of " +
-          tableName + " failed",
-        err -> conn.getLocator().updateCachedLocationOnError(loc, err));
-      return;
-    }
-    Pair<ReplicateWALEntryRequest, CellScanner> p = ReplicationProtobufUtil
-      .buildReplicateWALEntryRequest(entries, encodedRegionName, null, null, null);
-    resetCallTimeout();
-    controller.setCellScanner(p.getSecond());
-    stub.replay(controller, p.getFirst(), r -> {
-      if (controller.failed()) {
-        onError(controller.getFailed(),
-          () -> "Call to " + loc.getServerName() + " for '" + Bytes.toStringBinary(row) + "' in " +
-            loc.getRegion().getEncodedName() + " of " + tableName + " failed",
-          err -> conn.getLocator().updateCachedLocationOnError(loc, err));
-      } else {
-        future.complete(0L);
-      }
-    });
-
-  }
-
-  @Override
-  protected void doCall() {
-    long locateTimeoutNs;
-    if (operationTimeoutNs > 0) {
-      locateTimeoutNs = remainingTimeNs();
-      if (locateTimeoutNs <= 0) {
-        completeExceptionally();
-        return;
-      }
-    } else {
-      locateTimeoutNs = -1L;
-    }
-    addListener(conn.getLocator().getRegionLocation(tableName, row, replicaId,
-      RegionLocateType.CURRENT, locateTimeoutNs), (loc, error) -> {
-        if (error != null) {
-          onError(error,
-            () -> "Locate '" + Bytes.toStringBinary(row) + "' in " + tableName + " failed", err -> {
-            });
-          return;
-        }
-        call(loc);
-      });
-  }
-}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicationRetryingCaller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicationRetryingCaller.java
new file mode 100644
index 0000000..a0ce418
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicationRetryingCaller.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
+
+/**
+ * For replicating edits to secondary replicas.
+ */
+@InterfaceAudience.Private
+public class AsyncRegionReplicationRetryingCaller extends AsyncRpcRetryingCaller<Void> {
+
+  private final RegionInfo replica;
+
+  private final Entry[] entries;
+
+  public AsyncRegionReplicationRetryingCaller(HashedWheelTimer retryTimer,
+    AsyncClusterConnectionImpl conn, int maxAttempts, long rpcTimeoutNs, long operationTimeoutNs,
+    RegionInfo replica, List<Entry> entries) {
+    super(retryTimer, conn, ConnectionUtils.getPriority(replica.getTable()),
+      conn.connConf.getPauseNs(), conn.connConf.getPauseForCQTBENs(), maxAttempts,
+      operationTimeoutNs, rpcTimeoutNs, conn.connConf.getStartLogErrorsCnt());
+    this.replica = replica;
+    this.entries = entries.toArray(new Entry[0]);
+  }
+
+  private void call(HRegionLocation loc) {
+    AdminService.Interface stub;
+    try {
+      stub = conn.getAdminStub(loc.getServerName());
+    } catch (IOException e) {
+      onError(e,
+        () -> "Get async admin stub to " + loc.getServerName() + " for " + replica + " failed",
+        err -> conn.getLocator().updateCachedLocationOnError(loc, err));
+      return;
+    }
+    Pair<ReplicateWALEntryRequest, CellScanner> pair = ReplicationProtobufUtil
+      .buildReplicateWALEntryRequest(entries, replica.getEncodedNameAsBytes(), null, null, null);
+    resetCallTimeout();
+    controller.setCellScanner(pair.getSecond());
+    stub.replay(controller, pair.getFirst(), r -> {
+      if (controller.failed()) {
+        onError(controller.getFailed(),
+          () -> "Call to " + loc.getServerName() + " for " + replica + " failed",
+          err -> conn.getLocator().updateCachedLocationOnError(loc, err));
+      } else {
+        future.complete(null);
+      }
+    });
+  }
+
+  @Override
+  protected void doCall() {
+    long locateTimeoutNs;
+    if (operationTimeoutNs > 0) {
+      locateTimeoutNs = remainingTimeNs();
+      if (locateTimeoutNs <= 0) {
+        completeExceptionally();
+        return;
+      }
+    } else {
+      locateTimeoutNs = -1L;
+    }
+    addListener(conn.getLocator().getRegionLocation(replica.getTable(), replica.getStartKey(),
+      replica.getReplicaId(), RegionLocateType.CURRENT, locateTimeoutNs), (loc, error) -> {
+        if (error != null) {
+          onError(error, () -> "Locate " + replica + " failed", err -> {
+          });
+          return;
+        }
+        call(loc);
+      });
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index b41619a..9a7ba92 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -679,6 +679,17 @@ public abstract class RpcServer implements RpcServerInterface,
     return Optional.ofNullable(CurCall.get());
   }
 
+  /**
+   * Just return the current rpc call if it is a {@link ServerCall} and also has {@link CellScanner}
+   * attached.
+   * <p/>
+   * Mainly used for reference counting as {@link CellScanner} may reference non heap memory.
+   */
+  public static Optional<ServerCall<?>> getCurrentServerCallWithCellScanner() {
+    return getCurrentCall().filter(c -> c instanceof ServerCall)
+      .filter(c -> c.getCellScanner() != null).map(c -> (ServerCall<?>) c);
+  }
+
   public static boolean isInRpcCallContext() {
     return CurCall.get() != null;
   }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/janitor/MetaFixer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/janitor/MetaFixer.java
index 4a5aa0a..64ee49e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/janitor/MetaFixer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/janitor/MetaFixer.java
@@ -29,7 +29,6 @@ import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.stream.Collectors;
-import org.apache.hadoop.hbase.HBaseIOException;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.TableName;
@@ -40,10 +39,8 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.exceptions.MergeRegionException;
 import org.apache.hadoop.hbase.master.MasterServices;
 import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure;
-import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -197,15 +194,9 @@ public class MetaFixer {
           MetaTableAccessor.addRegionsToMeta(masterServices.getConnection(), newRegions,
             td.getRegionReplication());
 
-          // Setup replication for region replicas if needed
-          if (td.getRegionReplication() > 1) {
-            ServerRegionReplicaUtil.setupRegionReplicaReplication(masterServices);
-          }
           return Either.<List<RegionInfo>, IOException> ofLeft(newRegions);
         } catch (IOException e) {
           return Either.<List<RegionInfo>, IOException> ofRight(e);
-        } catch (ReplicationException e) {
-          return Either.<List<RegionInfo>, IOException> ofRight(new HBaseIOException(e));
         }
       })
       .collect(Collectors.toList());
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java
index 2313e70..723f851 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java
@@ -25,7 +25,6 @@ import java.util.function.Supplier;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.hadoop.hbase.HBaseIOException;
 import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.NamespaceDescriptor;
 import org.apache.hadoop.hbase.TableExistsException;
@@ -37,12 +36,10 @@ import org.apache.hadoop.hbase.client.TableState;
 import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
 import org.apache.hadoop.hbase.master.MasterFileSystem;
 import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
-import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.rsgroup.RSGroupInfo;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.apache.hadoop.hbase.util.FSTableDescriptors;
 import org.apache.hadoop.hbase.util.ModifyRegionUtils;
-import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -378,14 +375,6 @@ public class CreateTableProcedure
     // Add regions to META
     addRegionsToMeta(env, tableDescriptor, newRegions);
 
-    // Setup replication for region replicas if needed
-    if (tableDescriptor.getRegionReplication() > 1) {
-      try {
-        ServerRegionReplicaUtil.setupRegionReplicaReplication(env.getMasterServices());
-      } catch (ReplicationException e) {
-        throw new HBaseIOException(e);
-      }
-    }
     return newRegions;
   }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java
index 247dd9c..aedb42c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java
@@ -38,10 +38,8 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
 import org.apache.hadoop.hbase.master.zksyncer.MetaLocationSyncer;
 import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
-import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.rsgroup.RSGroupInfo;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -414,13 +412,6 @@ public class ModifyTableProcedure
         .collect(Collectors.toList());
       addChildProcedure(env.getAssignmentManager().createAssignProcedures(newReplicas));
     }
-    if (oldReplicaCount <= 1) {
-      try {
-        ServerRegionReplicaUtil.setupRegionReplicaReplication(env.getMasterServices());
-      } catch (ReplicationException e) {
-        throw new HBaseIOException(e);
-      }
-    }
   }
 
   private void closeExcessReplicasIfNeeded(MasterProcedureEnv env) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index b534f68..6f6f6b3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -137,6 +137,7 @@ import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
 import org.apache.hadoop.hbase.ipc.RpcCall;
 import org.apache.hadoop.hbase.ipc.RpcServer;
+import org.apache.hadoop.hbase.ipc.ServerCall;
 import org.apache.hadoop.hbase.mob.MobFileCache;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
@@ -194,6 +195,7 @@ import org.apache.hbase.thirdparty.com.google.protobuf.Service;
 import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
 import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
 import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
+
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall;
@@ -711,6 +713,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
   private final StoreHotnessProtector storeHotnessProtector;
 
+  private Optional<RegionReplicationSink> regionReplicationSink = Optional.empty();
+
   /**
    * HRegion constructor. This constructor should only be used for testing and
    * extensions.  Instances of HRegion should be instantiated with the
@@ -1083,11 +1087,28 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       status.setStatus("Running coprocessor post-open hooks");
       coprocessorHost.postOpen();
     }
-
+    initializeRegionReplicationSink(reporter, status);
     status.markComplete("Region opened successfully");
     return nextSeqId;
   }
 
+  private void initializeRegionReplicationSink(CancelableProgressable reporter,
+    MonitoredTask status) {
+    RegionServerServices rss = getRegionServerServices();
+    TableDescriptor td = getTableDescriptor();
+    int regionReplication = td.getRegionReplication();
+    RegionInfo regionInfo = getRegionInfo();
+    if (regionReplication <= 1 || !RegionReplicaUtil.isDefaultReplica(regionInfo) ||
+      !ServerRegionReplicaUtil.isRegionReplicaReplicationEnabled(conf, regionInfo.getTable()) ||
+      rss == null) {
+      regionReplicationSink = Optional.empty();
+      return;
+    }
+    status.setStatus("Initializaing region replication sink");
+    regionReplicationSink = Optional.of(new RegionReplicationSink(conf, regionInfo,
+      regionReplication, td.hasRegionMemStoreReplication(), rss.getAsyncClusterConnection()));
+  }
+
   /**
    * Open all Stores.
    * @param reporter
@@ -1210,7 +1231,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       RegionEventDescriptor.EventType.REGION_OPEN, getRegionInfo(), openSeqId,
       getRegionServerServices().getServerName(), storeFiles);
     WALUtil.writeRegionEventMarker(wal, getReplicationScope(), getRegionInfo(), regionOpenDesc,
-        mvcc);
+        mvcc, regionReplicationSink.orElse(null));
   }
 
   private void writeRegionCloseMarker(WAL wal) throws IOException {
@@ -1219,7 +1240,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       RegionEventDescriptor.EventType.REGION_CLOSE, getRegionInfo(), mvcc.getReadPoint(),
       getRegionServerServices().getServerName(), storeFiles);
     WALUtil.writeRegionEventMarker(wal, getReplicationScope(), getRegionInfo(), regionEventDesc,
-        mvcc);
+      mvcc, null);
 
     // Store SeqId in WAL FileSystem when a region closes
     // checking region folder exists is due to many tests which delete the table folder while a
@@ -1864,7 +1885,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         RegionReplicaUtil.isDefaultReplica(getRegionInfo())) {
         writeRegionCloseMarker(wal);
       }
-
+      if (regionReplicationSink.isPresent()) {
+        // stop replicating to secondary replicas
+        RegionReplicationSink sink = regionReplicationSink.get();
+        sink.stop();
+        try {
+          regionReplicationSink.get().waitUntilStopped();
+        } catch (InterruptedException e) {
+          throw throwOnInterrupt(e);
+        }
+      }
       this.closed.set(true);
       if (!canFlush) {
         decrMemStoreSize(this.memStoreSizing.getMemStoreSize());
@@ -2825,7 +2855,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
             getRegionInfo(), flushOpSeqId, committedFiles);
         // No sync. Sync is below where no updates lock and we do FlushAction.COMMIT_FLUSH
         WALUtil.writeFlushMarker(wal, this.getReplicationScope(), getRegionInfo(), desc, false,
-            mvcc);
+          mvcc, null);
       }
 
       // Prepare flush (take a snapshot)
@@ -2886,8 +2916,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     try {
       FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
           getRegionInfo(), flushOpSeqId, committedFiles);
-      WALUtil.writeFlushMarker(wal, this.getReplicationScope(), getRegionInfo(), desc, false,
-          mvcc);
+      WALUtil.writeFlushMarker(wal, this.getReplicationScope(), getRegionInfo(), desc, false, mvcc,
+        null);
     } catch (Throwable t) {
       LOG.warn("Received unexpected exception trying to write ABORT_FLUSH marker to WAL: {} in "
         + " region {}", StringUtils.stringifyException(t), this);
@@ -2931,8 +2961,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.CANNOT_FLUSH,
         getRegionInfo(), -1, new TreeMap<>(Bytes.BYTES_COMPARATOR));
       try {
-        WALUtil.writeFlushMarker(wal, this.getReplicationScope(), getRegionInfo(), desc, true,
-            mvcc);
+        WALUtil.writeFlushMarker(wal, this.getReplicationScope(), getRegionInfo(), desc, true, mvcc,
+          null);
         return true;
       } catch (IOException e) {
         LOG.warn(getRegionInfo().getEncodedName() + " : "
@@ -3011,8 +3041,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         // write flush marker to WAL. If fail, we should throw DroppedSnapshotException
         FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH,
           getRegionInfo(), flushOpSeqId, committedFiles);
-        WALUtil.writeFlushMarker(wal, this.getReplicationScope(), getRegionInfo(), desc, true,
-            mvcc);
+        WALUtil.writeFlushMarker(wal, this.getReplicationScope(), getRegionInfo(), desc, true, mvcc,
+          regionReplicationSink.orElse(null));
       }
     } catch (Throwable t) {
       // An exception here means that the snapshot was not persisted.
@@ -3025,7 +3055,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         try {
           FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
             getRegionInfo(), flushOpSeqId, committedFiles);
-          WALUtil.writeFlushMarker(wal, this.replicationScope, getRegionInfo(), desc, false, mvcc);
+          WALUtil.writeFlushMarker(wal, this.replicationScope, getRegionInfo(), desc, false, mvcc,
+            null);
         } catch (Throwable ex) {
           LOG.warn(getRegionInfo().getEncodedName() + " : "
               + "failed writing ABORT_FLUSH marker to WAL", ex);
@@ -7069,10 +7100,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         try {
           WALProtos.BulkLoadDescriptor loadDescriptor =
               ProtobufUtil.toBulkLoadDescriptor(this.getRegionInfo().getTable(),
-                  UnsafeByteOperations.unsafeWrap(this.getRegionInfo().getEncodedNameAsBytes()),
-                  storeFiles, storeFilesSizes, seqId, clusterIds, replicate);
+              UnsafeByteOperations.unsafeWrap(this.getRegionInfo().getEncodedNameAsBytes()),
+              storeFiles, storeFilesSizes, seqId, clusterIds, replicate);
           WALUtil.writeBulkLoadMarkerAndSync(this.wal, this.getReplicationScope(), getRegionInfo(),
-              loadDescriptor, mvcc);
+            loadDescriptor, mvcc, regionReplicationSink.orElse(null));
         } catch (IOException ioe) {
           if (this.rsServices != null) {
             // Have to abort region server because some hfiles has been loaded but we can't write
@@ -7759,21 +7790,25 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     if (this.coprocessorHost != null && !walEdit.isMetaEdit()) {
       this.coprocessorHost.preWALAppend(walKey, walEdit);
     }
-    WriteEntry writeEntry = null;
+    ServerCall<?> rpcCall = RpcServer.getCurrentServerCallWithCellScanner().orElse(null);
     try {
       long txid = this.wal.appendData(this.getRegionInfo(), walKey, walEdit);
+      WriteEntry writeEntry = walKey.getWriteEntry();
+      regionReplicationSink.ifPresent(sink -> writeEntry.attachCompletionAction(() -> {
+        sink.add(walKey, walEdit, rpcCall);
+      }));
       // Call sync on our edit.
       if (txid != 0) {
         sync(txid, durability);
       }
-      writeEntry = walKey.getWriteEntry();
+      return writeEntry;
     } catch (IOException ioe) {
-      if (walKey != null && walKey.getWriteEntry() != null) {
+      if (walKey.getWriteEntry() != null) {
         mvcc.complete(walKey.getWriteEntry());
       }
       throw ioe;
     }
-    return writeEntry;
+
   }
 
   public static final long FIXED_OVERHEAD = ClassSize.estimateBase(HRegion.class, false);
@@ -8428,6 +8463,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     }
   }
 
+  public Optional<RegionReplicationSink> getRegionReplicationSink() {
+    return regionReplicationSink;
+  }
+
   public void addReadRequestsCount(long readRequestsCount) {
     this.readRequestsCount.add(readRequestsCount);
   }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 32693ab..73abc84 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -1572,7 +1572,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
     // Does this method belong in Region altogether given it is making so many references up there?
     // Could be Region#writeCompactionMarker(compactionDescriptor);
     WALUtil.writeCompactionMarker(this.region.getWAL(), this.region.getReplicationScope(),
-        this.region.getRegionInfo(), compactionDescriptor, this.region.getMVCC());
+      this.region.getRegionInfo(), compactionDescriptor, this.region.getMVCC(),
+      region.getRegionReplicationSink().orElse(null));
   }
 
   void replaceStoreFiles(Collection<HStoreFile> compactedFiles, Collection<HStoreFile> result)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java
index d821eec..91ae03d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.util.LinkedList;
+import java.util.Optional;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ClassSize;
@@ -202,6 +203,7 @@ public class MultiVersionConcurrencyControl {
         if (queueFirst.isCompleted()) {
           nextReadValue = queueFirst.getWriteNumber();
           writeQueue.removeFirst();
+          queueFirst.runCompletionAction();
         } else {
           break;
         }
@@ -271,22 +273,36 @@ public class MultiVersionConcurrencyControl {
    * Every created WriteEntry must be completed by calling mvcc#complete or #completeAndWait.
    */
   @InterfaceAudience.Private
-  public static class WriteEntry {
+  public static final class WriteEntry {
     private final long writeNumber;
     private boolean completed = false;
+    /**
+     * Will be called after completion, i.e, when being removed from the
+     * {@link MultiVersionConcurrencyControl#writeQueue}.
+     */
+    private Optional<Runnable> completionAction = Optional.empty();
 
-    WriteEntry(long writeNumber) {
+    private WriteEntry(long writeNumber) {
       this.writeNumber = writeNumber;
     }
 
-    void markCompleted() {
+    private void markCompleted() {
       this.completed = true;
     }
 
-    boolean isCompleted() {
+    private boolean isCompleted() {
       return this.completed;
     }
 
+    public void attachCompletionAction(Runnable action) {
+      assert !completionAction.isPresent();
+      completionAction = Optional.of(action);
+    }
+
+    private void runCompletionAction() {
+      completionAction.ifPresent(Runnable::run);
+    }
+
     public long getWriteNumber() {
       return this.writeNumber;
     }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionReplicationSink.java
new file mode 100644
index 0000000..6911289
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionReplicationSink.java
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionReplicaUtil;
+import org.apache.hadoop.hbase.ipc.ServerCall;
+import org.apache.hadoop.hbase.util.FutureUtils;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALKeyImpl;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+
+/**
+ * The class for replicating WAL edits to secondary replicas, one instance per region.
+ */
+@InterfaceAudience.Private
+public class RegionReplicationSink {
+
+  private static final Logger LOG = LoggerFactory.getLogger(RegionReplicationSink.class);
+
+  public static final String MAX_PENDING_SIZE = "hbase.region.read-replica.sink.max-pending-size";
+
+  public static final long MAX_PENDING_SIZE_DEFAULT = 10L * 1024 * 1024;
+
+  public static final String RETRIES_NUMBER = "hbase.region.read-replica.sink.retries.number";
+
+  public static final int RETRIES_NUMBER_DEFAULT = 3;
+
+  public static final String RPC_TIMEOUT_MS = "hbase.region.read-replica.sink.rpc.timeout.ms";
+
+  public static final long RPC_TIMEOUT_MS_DEFAULT = 200;
+
+  public static final String OPERATION_TIMEOUT_MS =
+    "hbase.region.read-replica.sink.operation.timeout.ms";
+
+  public static final long OPERATION_TIMEOUT_MS_DEFAULT = 1000;
+
+  private static final class SinkEntry {
+
+    final WALKeyImpl key;
+
+    final WALEdit edit;
+
+    final ServerCall<?> rpcCall;
+
+    SinkEntry(WALKeyImpl key, WALEdit edit, ServerCall<?> rpcCall) {
+      this.key = key;
+      this.edit = edit;
+      this.rpcCall = rpcCall;
+      if (rpcCall != null) {
+        // increase the reference count to avoid the rpc framework free the memory before we
+        // actually sending them out.
+        rpcCall.retainByWAL();
+      }
+    }
+
+    /**
+     * Should be called regardless of the result of the replicating operation. Unless you still want
+     * to reuse this entry, otherwise you must call this method to release the possible off heap
+     * memories.
+     */
+    void replicated() {
+      if (rpcCall != null) {
+        rpcCall.releaseByWAL();
+      }
+    }
+  }
+
+  private final RegionInfo primary;
+
+  private final int regionReplication;
+
+  private final boolean hasRegionMemStoreReplication;
+
+  private final Queue<SinkEntry> entries = new ArrayDeque<>();
+
+  private final AsyncClusterConnection conn;
+
+  private final int retries;
+
+  private final long rpcTimeoutNs;
+
+  private final long operationTimeoutNs;
+
+  private CompletableFuture<Void> future;
+
+  private boolean stopping;
+
+  private boolean stopped;
+
+  RegionReplicationSink(Configuration conf, RegionInfo primary, int regionReplication,
+    boolean hasRegionMemStoreReplication, AsyncClusterConnection conn) {
+    Preconditions.checkArgument(RegionReplicaUtil.isDefaultReplica(primary), "%s is not primary",
+      primary);
+    Preconditions.checkArgument(regionReplication > 1,
+      "region replication should be greater than 1 but got %s", regionReplication);
+    this.primary = primary;
+    this.regionReplication = regionReplication;
+    this.hasRegionMemStoreReplication = hasRegionMemStoreReplication;
+    this.conn = conn;
+    this.retries = conf.getInt(RETRIES_NUMBER, RETRIES_NUMBER_DEFAULT);
+    this.rpcTimeoutNs =
+      TimeUnit.MILLISECONDS.toNanos(conf.getLong(RPC_TIMEOUT_MS, RPC_TIMEOUT_MS_DEFAULT));
+    this.operationTimeoutNs = TimeUnit.MILLISECONDS
+      .toNanos(conf.getLong(OPERATION_TIMEOUT_MS, OPERATION_TIMEOUT_MS_DEFAULT));
+  }
+
+  private void send() {
+    List<SinkEntry> toSend = new ArrayList<>();
+    for (SinkEntry entry;;) {
+      entry = entries.poll();
+      if (entry == null) {
+        break;
+      }
+      toSend.add(entry);
+    }
+    List<WAL.Entry> walEntries =
+      toSend.stream().map(e -> new WAL.Entry(e.key, e.edit)).collect(Collectors.toList());
+    List<CompletableFuture<Void>> futures = new ArrayList<>();
+    for (int replicaId = 1; replicaId < regionReplication; replicaId++) {
+      RegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(primary, replicaId);
+      futures.add(conn.replicate(replica, walEntries, retries, rpcTimeoutNs, operationTimeoutNs));
+    }
+    future = CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[0]));
+    FutureUtils.addListener(future, (r, e) -> {
+      if (e != null) {
+        // TODO: drop pending edits and issue a flush
+        LOG.warn("Failed to replicate to secondary replicas for {}", primary, e);
+      }
+      toSend.forEach(SinkEntry::replicated);
+      synchronized (entries) {
+        future = null;
+        if (stopping) {
+          stopped = true;
+          entries.notifyAll();
+          return;
+        }
+        if (!entries.isEmpty()) {
+          send();
+        }
+      }
+    });
+  }
+
+  /**
+   * Add this edit to replication queue.
+   * <p/>
+   * The {@code rpcCall} is for retaining the cells if the edit is built within an rpc call and the
+   * rpc call has cell scanner, which is off heap.
+   */
+  public void add(WALKeyImpl key, WALEdit edit, ServerCall<?> rpcCall) {
+    if (!hasRegionMemStoreReplication && !edit.isMetaEdit()) {
+      // only replicate meta edit if region memstore replication is not enabled
+      return;
+    }
+    synchronized (entries) {
+      if (stopping) {
+        return;
+      }
+      // TODO: limit the total cached entries here, and we should have a global limitation, not for
+      // only this region.
+      entries.add(new SinkEntry(key, edit, rpcCall));
+      if (future == null) {
+        send();
+      }
+    }
+  }
+
+  /**
+   * Stop the replication sink.
+   * <p/>
+   * Usually this should only be called when you want to close a region.
+   */
+  void stop() {
+    synchronized (entries) {
+      stopping = true;
+      if (future == null) {
+        stopped = true;
+        entries.notifyAll();
+      }
+    }
+  }
+
+  /**
+   * Make sure that we have finished all the replicating requests.
+   * <p/>
+   * After returning, we can make sure there will be no new replicating requests to secondary
+   * replicas.
+   * <p/>
+   * This is used to keep the replicating order the same with the WAL edit order when writing.
+   */
+  void waitUntilStopped() throws InterruptedException {
+    synchronized (entries) {
+      while (!stopped) {
+        entries.wait();
+      }
+    }
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/AssignRegionHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/AssignRegionHandler.java
index 5d9819c..101c9c3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/AssignRegionHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/AssignRegionHandler.java
@@ -22,9 +22,7 @@ import java.io.IOException;
 import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.client.RegionReplicaUtil;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.executor.EventHandler;
 import org.apache.hadoop.hbase.executor.EventType;
@@ -34,10 +32,10 @@ import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionServerServices.PostOpenDeployContext;
 import org.apache.hadoop.hbase.regionserver.RegionServerServices.RegionStateTransitionContext;
 import org.apache.hadoop.hbase.util.RetryCounter;
-import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
 
 /**
@@ -134,14 +132,6 @@ public class AssignRegionHandler extends EventHandler {
       // pass null for the last parameter, which used to be a CancelableProgressable, as now the
       // opening can not be interrupted by a close request any more.
       Configuration conf = rs.getConfiguration();
-      TableName tn = htd.getTableName();
-      if (ServerRegionReplicaUtil.isMetaRegionReplicaReplicationEnabled(conf, tn)) {
-        if (RegionReplicaUtil.isDefaultReplica(this.regionInfo.getReplicaId())) {
-          // Add the hbase:meta replication source on replica zero/default.
-          rs.getReplicationSourceService().getReplicationManager().
-            addCatalogReplicationSource(this.regionInfo);
-        }
-      }
       region = HRegion.openHRegion(regionInfo, htd, rs.getWAL(regionInfo), conf, rs, null);
     } catch (IOException e) {
       cleanUpAndReportFailure(e);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/UnassignRegionHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/UnassignRegionHandler.java
index 0d02f30..2ac55ec 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/UnassignRegionHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/UnassignRegionHandler.java
@@ -22,7 +22,6 @@ import java.io.IOException;
 import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.client.RegionReplicaUtil;
 import org.apache.hadoop.hbase.executor.EventHandler;
 import org.apache.hadoop.hbase.executor.EventType;
 import org.apache.hadoop.hbase.regionserver.HRegion;
@@ -31,10 +30,10 @@ import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionServerServices.RegionStateTransitionContext;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.RetryCounter;
-import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
 
 /**
@@ -121,15 +120,6 @@ public class UnassignRegionHandler extends EventHandler {
     }
 
     rs.removeRegion(region, destination);
-    if (ServerRegionReplicaUtil.isMetaRegionReplicaReplicationEnabled(rs.getConfiguration(),
-        region.getTableDescriptor().getTableName())) {
-      if (RegionReplicaUtil.isDefaultReplica(region.getRegionInfo().getReplicaId())) {
-        // If hbase:meta read replicas enabled, remove replication source for hbase:meta Regions.
-        // See assign region handler where we add the replication source on open.
-        rs.getReplicationSourceService().getReplicationManager().
-          removeCatalogReplicationSource(region.getRegionInfo());
-      }
-    }
     if (!rs.reportRegionStateTransition(
       new RegionStateTransitionContext(TransitionCode.CLOSED, HConstants.NO_SEQNUM, closeProcId,
         -1, region.getRegionInfo()))) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
index 002d9b7..f0e4a1f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
@@ -1164,8 +1164,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
       txidHolder.setValue(ringBuffer.next());
     });
     long txid = txidHolder.longValue();
-    ServerCall<?> rpcCall = RpcServer.getCurrentCall().filter(c -> c instanceof ServerCall)
-      .filter(c -> c.getCellScanner() != null).map(c -> (ServerCall) c).orElse(null);
+    ServerCall<?> rpcCall = RpcServer.getCurrentServerCallWithCellScanner().orElse(null);
     try {
       FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore, rpcCall);
       entry.stampRegionSequenceId(we);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java
index 23db3dc..15be95e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java
@@ -28,7 +28,10 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.ipc.RpcServer;
 import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl.WriteEntry;
+import org.apache.hadoop.hbase.regionserver.RegionReplicationSink;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.wal.WAL;
@@ -71,9 +74,9 @@ public class WALUtil {
    */
   public static WALKeyImpl writeCompactionMarker(WAL wal,
     NavigableMap<byte[], Integer> replicationScope, RegionInfo hri, final CompactionDescriptor c,
-    MultiVersionConcurrencyControl mvcc) throws IOException {
+    MultiVersionConcurrencyControl mvcc, RegionReplicationSink sink) throws IOException {
     WALKeyImpl walKey =
-      writeMarker(wal, replicationScope, hri, WALEdit.createCompaction(hri, c), mvcc, null);
+      writeMarker(wal, replicationScope, hri, WALEdit.createCompaction(hri, c), mvcc, null, sink);
     if (LOG.isTraceEnabled()) {
       LOG.trace("Appended compaction marker " + TextFormat.shortDebugString(c));
     }
@@ -86,10 +89,10 @@ public class WALUtil {
    * This write is for internal use only. Not for external client consumption.
    */
   public static WALKeyImpl writeFlushMarker(WAL wal, NavigableMap<byte[], Integer> replicationScope,
-    RegionInfo hri, final FlushDescriptor f, boolean sync, MultiVersionConcurrencyControl mvcc)
-    throws IOException {
+    RegionInfo hri, final FlushDescriptor f, boolean sync, MultiVersionConcurrencyControl mvcc,
+    RegionReplicationSink sink) throws IOException {
     WALKeyImpl walKey = doFullMarkerAppendTransaction(wal, replicationScope, hri,
-      WALEdit.createFlushWALEdit(hri, f), mvcc, null, sync);
+      WALEdit.createFlushWALEdit(hri, f), mvcc, null, sync, sink);
     if (LOG.isTraceEnabled()) {
       LOG.trace("Appended flush marker " + TextFormat.shortDebugString(f));
     }
@@ -102,9 +105,9 @@ public class WALUtil {
    */
   public static WALKeyImpl writeRegionEventMarker(WAL wal,
     NavigableMap<byte[], Integer> replicationScope, RegionInfo hri, RegionEventDescriptor r,
-    MultiVersionConcurrencyControl mvcc) throws IOException {
-    WALKeyImpl walKey =
-      writeMarker(wal, replicationScope, hri, WALEdit.createRegionEventWALEdit(hri, r), mvcc, null);
+    MultiVersionConcurrencyControl mvcc, RegionReplicationSink sink) throws IOException {
+    WALKeyImpl walKey = writeMarker(wal, replicationScope, hri,
+      WALEdit.createRegionEventWALEdit(hri, r), mvcc, null, sink);
     if (LOG.isTraceEnabled()) {
       LOG.trace("Appended region event marker " + TextFormat.shortDebugString(r));
     }
@@ -122,11 +125,11 @@ public class WALUtil {
    * @throws IOException We will throw an IOException if we can not append to the HLog.
    */
   public static WALKeyImpl writeBulkLoadMarkerAndSync(final WAL wal,
-      final NavigableMap<byte[], Integer> replicationScope, final RegionInfo hri,
-      final WALProtos.BulkLoadDescriptor desc, final MultiVersionConcurrencyControl mvcc)
-    throws IOException {
+    final NavigableMap<byte[], Integer> replicationScope, final RegionInfo hri,
+    final WALProtos.BulkLoadDescriptor desc, final MultiVersionConcurrencyControl mvcc,
+    final RegionReplicationSink sink) throws IOException {
     WALKeyImpl walKey = writeMarker(wal, replicationScope, hri,
-      WALEdit.createBulkLoadEvent(hri, desc), mvcc, null);
+      WALEdit.createBulkLoadEvent(hri, desc), mvcc, null, sink);
     if (LOG.isTraceEnabled()) {
       LOG.trace("Appended Bulk Load marker " + TextFormat.shortDebugString(desc));
     }
@@ -135,11 +138,11 @@ public class WALUtil {
 
   private static WALKeyImpl writeMarker(final WAL wal,
     final NavigableMap<byte[], Integer> replicationScope, final RegionInfo hri, final WALEdit edit,
-    final MultiVersionConcurrencyControl mvcc, final Map<String, byte[]> extendedAttributes)
-    throws IOException {
+    final MultiVersionConcurrencyControl mvcc, final Map<String, byte[]> extendedAttributes,
+    final RegionReplicationSink sink) throws IOException {
     // If sync == true in below, then timeout is not used; safe to pass UNSPECIFIED_TIMEOUT
     return doFullMarkerAppendTransaction(wal, replicationScope, hri, edit, mvcc, extendedAttributes,
-      true);
+      true, sink);
   }
 
   /**
@@ -152,19 +155,24 @@ public class WALUtil {
    */
   private static WALKeyImpl doFullMarkerAppendTransaction(final WAL wal,
     final NavigableMap<byte[], Integer> replicationScope, final RegionInfo hri, final WALEdit edit,
-    final MultiVersionConcurrencyControl mvcc,
-    final Map<String, byte[]> extendedAttributes, final boolean sync) throws IOException {
+    final MultiVersionConcurrencyControl mvcc, final Map<String, byte[]> extendedAttributes,
+    final boolean sync, final RegionReplicationSink sink) throws IOException {
     // TODO: Pass in current time to use?
     WALKeyImpl walKey = new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(),
       EnvironmentEdgeManager.currentTime(), mvcc, replicationScope, extendedAttributes);
     long trx = MultiVersionConcurrencyControl.NONE;
     try {
       trx = wal.appendMarker(hri, walKey, edit);
+      WriteEntry writeEntry = walKey.getWriteEntry();
+      if (sink != null) {
+        writeEntry.attachCompletionAction(() -> sink.add(walKey, edit,
+          RpcServer.getCurrentServerCallWithCellScanner().orElse(null)));
+      }
       if (sync) {
         wal.sync(trx);
       }
       // Call complete only here because these are markers only. They are not for clients to read.
-      mvcc.complete(walKey.getWriteEntry());
+      mvcc.complete(writeEntry);
     } catch (IOException ioe) {
       if (walKey.getWriteEntry() != null) {
         mvcc.complete(walKey.getWriteEntry());
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/CatalogReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/CatalogReplicationSource.java
deleted file mode 100644
index 8cb7860..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/CatalogReplicationSource.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication.regionserver;
-
-import java.util.Collections;
-import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
-import org.apache.yetus.audience.InterfaceAudience;
-
-/**
- * ReplicationSource that reads catalog WAL files -- e.g. hbase:meta WAL files -- and lets through
- * all WALEdits from these WALs. This ReplicationSource is NOT created via
- * {@link ReplicationSourceFactory}.
- */
-@InterfaceAudience.Private
-class CatalogReplicationSource extends ReplicationSource {
-  CatalogReplicationSource() {
-    // Filters in hbase:meta WAL files and allows all edits, including 'meta' edits (these are
-    // filtered out in the 'super' class default implementation).
-    super(p -> AbstractFSWALProvider.isMetaFile(p), Collections.emptyList());
-  }
-
-  @Override
-  public void logPositionAndCleanOldLogs(WALEntryBatch entryBatch) {
-    // Noop. This CatalogReplicationSource implementation does not persist state to backing storage
-    // nor does it keep its WALs in a general map up in ReplicationSourceManager --
-    // CatalogReplicationSource is used by the Catalog Read Replica feature which resets everytime
-    // the WAL source process crashes. Skip calling through to the default implementation.
-    // See "4.1 Skip maintaining zookeeper replication queue (offsets/WALs)" in the
-    // design doc attached to HBASE-18070 'Enable memstore replication for meta replica for detail'
-    // for background on why no need to keep WAL state.
-  }
-}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/CatalogReplicationSourcePeer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/CatalogReplicationSourcePeer.java
deleted file mode 100644
index 3bcd414..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/CatalogReplicationSourcePeer.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication.regionserver;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
-import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
-import org.apache.hadoop.hbase.replication.SyncReplicationState;
-import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
-import org.apache.yetus.audience.InterfaceAudience;
-
-/**
- * The 'peer' used internally by Catalog Region Replicas Replication Source.
- * The Replication system has 'peer' baked into its core so though we do not need 'peering', we
- * need a 'peer' and its configuration else the replication system breaks at a few locales.
- * Set "hbase.region.replica.catalog.replication" if you want to change the configured endpoint.
- */
-@InterfaceAudience.Private
-class CatalogReplicationSourcePeer extends ReplicationPeerImpl {
-  /**
-   * @param clusterKey Usually the UUID from zk passed in by caller as a String.
-   */
-  CatalogReplicationSourcePeer(Configuration configuration, String clusterKey) {
-    super(configuration, ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER + "_catalog",
-      ReplicationPeerConfig.newBuilder().
-        setClusterKey(clusterKey).
-        setReplicationEndpointImpl(
-          configuration.get("hbase.region.replica.catalog.replication",
-            RegionReplicaReplicationEndpoint.class.getName())).
-        setBandwidth(0). // '0' means no bandwidth.
-        setSerial(false).
-        build(),
-      true, SyncReplicationState.NONE, SyncReplicationState.NONE);
-  }
-}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
deleted file mode 100644
index 17e7a53..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
+++ /dev/null
@@ -1,407 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.replication.regionserver;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.TreeMap;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HBaseIOException;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.RegionLocations;
-import org.apache.hadoop.hbase.TableDescriptors;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.client.AsyncClusterConnection;
-import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.client.RegionReplicaUtil;
-import org.apache.hadoop.hbase.client.TableDescriptor;
-import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
-import org.apache.hadoop.hbase.replication.WALEntryFilter;
-import org.apache.hadoop.hbase.util.AtomicUtils;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.FutureUtils;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.util.RetryCounter;
-import org.apache.hadoop.hbase.util.RetryCounterFactory;
-import org.apache.hadoop.hbase.wal.WAL.Entry;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
-import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
-import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader;
-import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache;
-
-/**
- * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint} endpoint which receives the WAL
- * edits from the WAL, and sends the edits to replicas of regions.
- */
-@InterfaceAudience.Private
-public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
-
-  private static final Logger LOG = LoggerFactory.getLogger(RegionReplicaReplicationEndpoint.class);
-
-  // Can be configured differently than hbase.client.retries.number
-  private static String CLIENT_RETRIES_NUMBER =
-    "hbase.region.replica.replication.client.retries.number";
-
-  private Configuration conf;
-  private AsyncClusterConnection connection;
-  private TableDescriptors tableDescriptors;
-
-  private int numRetries;
-
-  private long operationTimeoutNs;
-
-  private LoadingCache<TableName, Optional<TableDescriptor>> tableDescriptorCache;
-
-  private Cache<TableName, TableName> disabledTableCache;
-
-  private final RetryCounterFactory retryCounterFactory =
-    new RetryCounterFactory(Integer.MAX_VALUE, 1000, 60000);
-
-  @Override
-  public void init(Context context) throws IOException {
-    super.init(context);
-    this.conf = context.getConfiguration();
-    this.tableDescriptors = context.getTableDescriptors();
-    int memstoreReplicationEnabledCacheExpiryMs = conf
-      .getInt("hbase.region.replica.replication.cache.memstoreReplicationEnabled.expiryMs", 5000);
-    // A cache for the table "memstore replication enabled" flag.
-    // It has a default expiry of 5 sec. This means that if the table is altered
-    // with a different flag value, we might miss to replicate for that amount of
-    // time. But this cache avoid the slow lookup and parsing of the TableDescriptor.
-    tableDescriptorCache = CacheBuilder.newBuilder()
-      .expireAfterWrite(memstoreReplicationEnabledCacheExpiryMs, TimeUnit.MILLISECONDS)
-      .initialCapacity(10).maximumSize(1000)
-      .build(new CacheLoader<TableName, Optional<TableDescriptor>>() {
-
-        @Override
-        public Optional<TableDescriptor> load(TableName tableName) throws Exception {
-          // check if the table requires memstore replication
-          // some unit-test drop the table, so we should do a bypass check and always replicate.
-          return Optional.ofNullable(tableDescriptors.get(tableName));
-        }
-      });
-    int nonExistentTableCacheExpiryMs =
-      conf.getInt("hbase.region.replica.replication.cache.disabledAndDroppedTables.expiryMs", 5000);
-    // A cache for non existing tables that have a default expiry of 5 sec. This means that if the
-    // table is created again with the same name, we might miss to replicate for that amount of
-    // time. But this cache prevents overloading meta requests for every edit from a deleted file.
-    disabledTableCache = CacheBuilder.newBuilder()
-      .expireAfterWrite(nonExistentTableCacheExpiryMs, TimeUnit.MILLISECONDS).initialCapacity(10)
-      .maximumSize(1000).build();
-    // HRS multiplies client retries by 10 globally for meta operations, but we do not want this.
-    // We are resetting it here because we want default number of retries (35) rather than 10 times
-    // that which makes very long retries for disabled tables etc.
-    int defaultNumRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
-      HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
-    if (defaultNumRetries > 10) {
-      int mult = conf.getInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER,
-        HConstants.DEFAULT_HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER);
-      defaultNumRetries = defaultNumRetries / mult; // reset if HRS has multiplied this already
-    }
-    this.numRetries = conf.getInt(CLIENT_RETRIES_NUMBER, defaultNumRetries);
-    // use the regular RPC timeout for replica replication RPC's
-    this.operationTimeoutNs =
-      TimeUnit.MILLISECONDS.toNanos(conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
-        HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT));
-    this.connection = context.getServer().getAsyncClusterConnection();
-  }
-
-  /**
-   * returns true if the specified entry must be replicated. We should always replicate meta
-   * operations (e.g. flush) and use the user HTD flag to decide whether or not replicate the
-   * memstore.
-   */
-  private boolean requiresReplication(Optional<TableDescriptor> tableDesc,
-      Entry entry) {
-    // empty edit does not need to be replicated
-    if (entry.getEdit().isEmpty() || !tableDesc.isPresent()) {
-      return false;
-    }
-    // meta edits (e.g. flush) must be always replicated
-    return entry.getEdit().isMetaEdit() || tableDesc.get().hasRegionMemStoreReplication();
-  }
-
-  private void getRegionLocations(CompletableFuture<RegionLocations> future,
-      TableDescriptor tableDesc, byte[] encodedRegionName, byte[] row, boolean reload) {
-    FutureUtils.addListener(connection.getRegionLocations(tableDesc.getTableName(), row, reload),
-      (locs, e) -> {
-        if (e != null) {
-          future.completeExceptionally(e);
-          return;
-        }
-        // if we are not loading from cache, just return
-        if (reload) {
-          future.complete(locs);
-          return;
-        }
-        // check if the number of region replicas is correct, and also the primary region name
-        // matches.
-        if (locs.size() == tableDesc.getRegionReplication() &&
-          locs.getDefaultRegionLocation() != null &&
-          Bytes.equals(locs.getDefaultRegionLocation().getRegion().getEncodedNameAsBytes(),
-            encodedRegionName)) {
-          future.complete(locs);
-        } else {
-          // reload again as the information in cache maybe stale
-          getRegionLocations(future, tableDesc, encodedRegionName, row, true);
-        }
-      });
-  }
-
-  private void replicate(CompletableFuture<Long> future, RegionLocations locs,
-      TableDescriptor tableDesc, byte[] encodedRegionName, byte[] row, List<Entry> entries) {
-    if (locs.size() == 1) {
-      LOG.info("Only one location for {}.{}, refresh the location cache only for meta now",
-        tableDesc.getTableName(), Bytes.toString(encodedRegionName));
-
-      // This could happen to meta table. In case of meta table comes with no replica and
-      // later it is changed to multiple replicas. The cached location for meta may only has
-      // the primary region. In this case, it needs to clean up and refresh the cached meta
-      // locations.
-      if (tableDesc.isMetaTable()) {
-        connection.getRegionLocator(tableDesc.getTableName()).clearRegionLocationCache();
-      }
-      future.complete(Long.valueOf(entries.size()));
-      return;
-    }
-    RegionInfo defaultReplica = locs.getDefaultRegionLocation().getRegion();
-    if (!Bytes.equals(defaultReplica.getEncodedNameAsBytes(), encodedRegionName)) {
-      // the region name is not equal, this usually means the region has been split or merged, so
-      // give up replicating as the new region(s) should already have all the data of the parent
-      // region(s).
-      if (LOG.isTraceEnabled()) {
-        LOG.trace(
-          "Skipping {} entries in table {} because located region {} is different than" +
-            " the original region {} from WALEdit",
-          tableDesc.getTableName(), defaultReplica.getEncodedName(),
-          Bytes.toStringBinary(encodedRegionName));
-      }
-      future.complete(Long.valueOf(entries.size()));
-      return;
-    }
-    AtomicReference<Throwable> error = new AtomicReference<>();
-    AtomicInteger remainingTasks = new AtomicInteger(locs.size() - 1);
-    AtomicLong skippedEdits = new AtomicLong(0);
-
-    for (int i = 1, n = locs.size(); i < n; i++) {
-      // Do not use the elements other than the default replica as they may be null. We will fail
-      // earlier if the location for default replica is null.
-      final RegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(defaultReplica, i);
-      FutureUtils
-        .addListener(connection.replay(tableDesc.getTableName(), replica.getEncodedNameAsBytes(),
-          row, entries, replica.getReplicaId(), numRetries, operationTimeoutNs), (r, e) -> {
-            if (e != null) {
-              LOG.warn("Failed to replicate to {}", replica, e);
-              error.compareAndSet(null, e);
-            } else {
-              AtomicUtils.updateMax(skippedEdits, r.longValue());
-            }
-            if (remainingTasks.decrementAndGet() == 0) {
-              if (error.get() != null) {
-                future.completeExceptionally(error.get());
-              } else {
-                future.complete(skippedEdits.get());
-              }
-            }
-          });
-    }
-  }
-
-  private void logSkipped(TableName tableName, List<Entry> entries, String reason) {
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("Skipping {} entries because table {} is {}", entries.size(), tableName, reason);
-      for (Entry entry : entries) {
-        LOG.trace("Skipping : {}", entry);
-      }
-    }
-  }
-
-  private CompletableFuture<Long> replicate(TableDescriptor tableDesc, byte[] encodedRegionName,
-      List<Entry> entries) {
-    if (disabledTableCache.getIfPresent(tableDesc.getTableName()) != null) {
-      logSkipped(tableDesc.getTableName(), entries, "cached as a disabled table");
-      return CompletableFuture.completedFuture(Long.valueOf(entries.size()));
-    }
-    byte[] row = CellUtil.cloneRow(entries.get(0).getEdit().getCells().get(0));
-    CompletableFuture<RegionLocations> locateFuture = new CompletableFuture<>();
-    getRegionLocations(locateFuture, tableDesc, encodedRegionName, row, false);
-    CompletableFuture<Long> future = new CompletableFuture<>();
-    FutureUtils.addListener(locateFuture, (locs, error) -> {
-      if (error != null) {
-        future.completeExceptionally(error);
-      } else if (locs.getDefaultRegionLocation() == null) {
-        future.completeExceptionally(
-          new HBaseIOException("No location found for default replica of table=" +
-            tableDesc.getTableName() + " row='" + Bytes.toStringBinary(row) + "'"));
-      } else {
-        replicate(future, locs, tableDesc, encodedRegionName, row, entries);
-      }
-    });
-    return future;
-  }
-
-  @Override
-  public boolean replicate(ReplicateContext replicateContext) {
-    Map<byte[], Pair<TableDescriptor, List<Entry>>> encodedRegionName2Entries =
-      new TreeMap<>(Bytes.BYTES_COMPARATOR);
-    long skippedEdits = 0;
-    RetryCounter retryCounter = retryCounterFactory.create();
-    outer: while (isRunning()) {
-      encodedRegionName2Entries.clear();
-      skippedEdits = 0;
-      for (Entry entry : replicateContext.getEntries()) {
-        Optional<TableDescriptor> tableDesc;
-        try {
-          tableDesc = tableDescriptorCache.get(entry.getKey().getTableName());
-        } catch (ExecutionException e) {
-          LOG.warn("Failed to load table descriptor for {}, attempts={}",
-            entry.getKey().getTableName(), retryCounter.getAttemptTimes(), e.getCause());
-          if (!retryCounter.shouldRetry()) {
-            return false;
-          }
-          try {
-            retryCounter.sleepUntilNextRetry();
-          } catch (InterruptedException e1) {
-            // restore the interrupted state
-            Thread.currentThread().interrupt();
-            return false;
-          }
-          continue outer;
-        }
-        if (!requiresReplication(tableDesc, entry)) {
-          skippedEdits++;
-          continue;
-        }
-        byte[] encodedRegionName = entry.getKey().getEncodedRegionName();
-        encodedRegionName2Entries
-          .computeIfAbsent(encodedRegionName, k -> Pair.newPair(tableDesc.get(), new ArrayList<>()))
-          .getSecond().add(entry);
-      }
-      break;
-    }
-    // send the request to regions
-    retryCounter = retryCounterFactory.create();
-    while (isRunning()) {
-      List<Pair<CompletableFuture<Long>, byte[]>> futureAndEncodedRegionNameList =
-        new ArrayList<Pair<CompletableFuture<Long>, byte[]>>();
-      for (Map.Entry<byte[], Pair<TableDescriptor, List<Entry>>> entry : encodedRegionName2Entries
-        .entrySet()) {
-        CompletableFuture<Long> future =
-          replicate(entry.getValue().getFirst(), entry.getKey(), entry.getValue().getSecond());
-        futureAndEncodedRegionNameList.add(Pair.newPair(future, entry.getKey()));
-      }
-      for (Pair<CompletableFuture<Long>, byte[]> pair : futureAndEncodedRegionNameList) {
-        byte[] encodedRegionName = pair.getSecond();
-        try {
-          skippedEdits += pair.getFirst().get().longValue();
-          encodedRegionName2Entries.remove(encodedRegionName);
-        } catch (InterruptedException e) {
-          // restore the interrupted state
-          Thread.currentThread().interrupt();
-          return false;
-        } catch (ExecutionException e) {
-          Pair<TableDescriptor, List<Entry>> tableAndEntries =
-            encodedRegionName2Entries.get(encodedRegionName);
-          TableName tableName = tableAndEntries.getFirst().getTableName();
-          List<Entry> entries = tableAndEntries.getSecond();
-          Throwable cause = e.getCause();
-          // The table can be disabled or dropped at this time. For disabled tables, we have no
-          // cheap mechanism to detect this case because meta does not contain this information.
-          // ClusterConnection.isTableDisabled() is a zk call which we cannot do for every replay
-          // RPC. So instead we start the replay RPC with retries and check whether the table is
-          // dropped or disabled which might cause SocketTimeoutException, or
-          // RetriesExhaustedException or similar if we get IOE.
-          if (cause instanceof TableNotFoundException) {
-            // add to cache that the table does not exist
-            tableDescriptorCache.put(tableName, Optional.empty());
-            logSkipped(tableName, entries, "dropped");
-            skippedEdits += entries.size();
-            encodedRegionName2Entries.remove(encodedRegionName);
-            continue;
-          }
-          boolean disabled = false;
-          try {
-            disabled = connection.getAdmin().isTableDisabled(tableName).get();
-          } catch (InterruptedException e1) {
-            // restore the interrupted state
-            Thread.currentThread().interrupt();
-            return false;
-          } catch (ExecutionException e1) {
-            LOG.warn("Failed to test whether {} is disabled, assume it is not disabled", tableName,
-              e1.getCause());
-          }
-          if (disabled) {
-            disabledTableCache.put(tableName, tableName);
-            logSkipped(tableName, entries, "disabled");
-            skippedEdits += entries.size();
-            encodedRegionName2Entries.remove(encodedRegionName);
-            continue;
-          }
-          LOG.warn("Failed to replicate {} entries for region {} of table {}", entries.size(),
-            Bytes.toStringBinary(encodedRegionName), tableName);
-        }
-      }
-      // we have done
-      if (encodedRegionName2Entries.isEmpty()) {
-        ctx.getMetrics().incrLogEditsFiltered(skippedEdits);
-        return true;
-      } else {
-        LOG.warn("Failed to replicate all entries, retry={}", retryCounter.getAttemptTimes());
-        if (!retryCounter.shouldRetry()) {
-          return false;
-        }
-        try {
-          retryCounter.sleepUntilNextRetry();
-        } catch (InterruptedException e) {
-          // restore the interrupted state
-          Thread.currentThread().interrupt();
-          return false;
-        }
-      }
-    }
-
-    return false;
-  }
-
-  @Override
-  public boolean canReplicateToSameCluster() {
-    return true;
-  }
-
-  @Override
-  protected WALEntryFilter getScopeWALEntryFilter() {
-    // we do not care about scope. We replicate everything.
-    return null;
-  }
-}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java
index 8863f14..b055902 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java
@@ -27,7 +27,6 @@ import org.slf4j.LoggerFactory;
 /**
  * Constructs a {@link ReplicationSourceInterface}
  * Note, not used to create specialized ReplicationSources
- * @see CatalogReplicationSource
  */
 @InterfaceAudience.Private
 public final class ReplicationSourceFactory {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 73efcfe..9f8d8dc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -39,7 +39,6 @@ import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -49,8 +48,6 @@ import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
-import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationPeer;
 import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
@@ -64,9 +61,7 @@ import org.apache.hadoop.hbase.replication.SyncReplicationState;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider;
-import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALFactory;
-import org.apache.hadoop.hbase.wal.WALProvider;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
@@ -183,16 +178,6 @@ public class ReplicationSourceManager {
   private final MetricsReplicationGlobalSourceSource globalMetrics;
 
   /**
-   * A special ReplicationSource for hbase:meta Region Read Replicas.
-   * Usually this reference remains empty. If an hbase:meta Region is opened on this server, we
-   * will create an instance of a hbase:meta CatalogReplicationSource and it will live the life of
-   * the Server thereafter; i.e. we will not shut it down even if the hbase:meta moves away from
-   * this server (in case it later gets moved back). We synchronize on this instance testing for
-   * presence and if absent, while creating so only created and started once.
-   */
-  AtomicReference<ReplicationSourceInterface> catalogReplicationSource = new AtomicReference<>();
-
-  /**
    * Creates a replication manager and sets the watch on all the other registered region servers
    * @param queueStorage the interface for manipulating replication queues
    * @param conf the configuration to use
@@ -1066,78 +1051,6 @@ public class ReplicationSourceManager {
     return this.globalMetrics;
   }
 
-  /**
-   * Add an hbase:meta Catalog replication source. Called on open of an hbase:meta Region.
-   * Create it once only. If exists already, use the existing one.
-   * @see #removeCatalogReplicationSource(RegionInfo)
-   * @see #addSource(String) This is specialization on the addSource method.
-   */
-  public ReplicationSourceInterface addCatalogReplicationSource(RegionInfo regionInfo)
-      throws IOException {
-    // Poor-man's putIfAbsent
-    synchronized (this.catalogReplicationSource) {
-      ReplicationSourceInterface rs = this.catalogReplicationSource.get();
-      return rs != null ? rs :
-        this.catalogReplicationSource.getAndSet(createCatalogReplicationSource(regionInfo));
-    }
-  }
-
-  /**
-   * Remove the hbase:meta Catalog replication source.
-   * Called when we close hbase:meta.
-   * @see #addCatalogReplicationSource(RegionInfo regionInfo)
-   */
-  public void removeCatalogReplicationSource(RegionInfo regionInfo) {
-    // Nothing to do. Leave any CatalogReplicationSource in place in case an hbase:meta Region
-    // comes back to this server.
-  }
-
-  /**
-   * Create, initialize, and start the Catalog ReplicationSource.
-   * Presumes called one-time only (caller must ensure one-time only call).
-   * This ReplicationSource is NOT created via {@link ReplicationSourceFactory}.
-   * @see #addSource(String) This is a specialization of the addSource call.
-   * @see #catalogReplicationSource for a note on this ReplicationSource's lifecycle (and more on
-   *    why the special handling).
-   */
-  private ReplicationSourceInterface createCatalogReplicationSource(RegionInfo regionInfo)
-      throws IOException {
-    // Instantiate meta walProvider. Instantiated here or over in the #warmupRegion call made by the
-    // Master on a 'move' operation. Need to do extra work if we did NOT instantiate the provider.
-    WALProvider walProvider = this.walFactory.getMetaWALProvider();
-    boolean instantiate = walProvider == null;
-    if (instantiate) {
-      walProvider = this.walFactory.getMetaProvider();
-    }
-    // Here we do a specialization on what {@link ReplicationSourceFactory} does. There is no need
-    // for persisting offset into WALs up in zookeeper (via ReplicationQueueInfo) as the catalog
-    // read replicas feature that makes use of the source does a reset on a crash of the WAL
-    // source process. See "4.1 Skip maintaining zookeeper replication queue (offsets/WALs)" in the
-    // design doc attached to HBASE-18070 'Enable memstore replication for meta replica' for detail.
-    CatalogReplicationSourcePeer peer = new CatalogReplicationSourcePeer(this.conf,
-      this.clusterId.toString());
-    final ReplicationSourceInterface crs = new CatalogReplicationSource();
-    crs.init(conf, fs, this, new NoopReplicationQueueStorage(), peer, server, peer.getId(),
-      clusterId, walProvider.getWALFileLengthProvider(), new MetricsSource(peer.getId()));
-    // Add listener on the provider so we can pick up the WAL to replicate on roll.
-    WALActionsListener listener = new WALActionsListener() {
-      @Override public void postLogRoll(Path oldPath, Path newPath) throws IOException {
-        crs.enqueueLog(newPath);
-      }
-    };
-    walProvider.addWALActionsListener(listener);
-    if (!instantiate) {
-      // If we did not instantiate provider, need to add our listener on already-created WAL
-      // instance too (listeners are passed by provider to WAL instance on creation but if provider
-      // created already, our listener add above is missed). And add the current WAL file to the
-      // Replication Source so it can start replicating it.
-      WAL wal = walProvider.getWAL(regionInfo);
-      wal.registerWALActionsListener(listener);
-      crs.enqueueLog(((AbstractFSWAL)wal).getCurrentFileName());
-    }
-    return crs.startup();
-  }
-
   ReplicationQueueStorage getQueueStorage() {
     return queueStorage;
   }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java
index 5583a47..6a46bd4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java
@@ -22,23 +22,15 @@ import java.io.IOException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
 import org.apache.hadoop.hbase.io.HFileLink;
 import org.apache.hadoop.hbase.io.Reference;
-import org.apache.hadoop.hbase.master.MasterServices;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
-import org.apache.hadoop.hbase.replication.ReplicationException;
-import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
-import org.apache.hadoop.hbase.replication.regionserver.RegionReplicaReplicationEndpoint;
-import org.apache.hadoop.hbase.zookeeper.ZKConfig;
 import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Similar to {@link RegionReplicaUtil} but for the server side
@@ -46,8 +38,6 @@ import org.slf4j.LoggerFactory;
 @InterfaceAudience.Private
 public class ServerRegionReplicaUtil extends RegionReplicaUtil {
 
-  private static final Logger LOG = LoggerFactory.getLogger(ServerRegionReplicaUtil.class);
-
   /**
    * Whether asynchronous WAL replication to the secondary region replicas is enabled or not.
    * If this is enabled, a replication peer named "region_replica_replication" will be created
@@ -59,7 +49,6 @@ public class ServerRegionReplicaUtil extends RegionReplicaUtil {
   public static final String REGION_REPLICA_REPLICATION_CONF_KEY
     = "hbase.region.replica.replication.enabled";
   private static final boolean DEFAULT_REGION_REPLICA_REPLICATION = false;
-  public static final String REGION_REPLICA_REPLICATION_PEER = "region_replica_replication";
 
   /**
    * Same as for {@link #REGION_REPLICA_REPLICATION_CONF_KEY} but for catalog replication.
@@ -162,27 +151,6 @@ public class ServerRegionReplicaUtil extends RegionReplicaUtil {
   }
 
   /**
-   * Create replication peer for replicating user-space Region Read Replicas.
-   * This methods should only be called at master side.
-   */
-  public static void setupRegionReplicaReplication(MasterServices services)
-    throws IOException, ReplicationException {
-    if (!isRegionReplicaReplicationEnabled(services.getConfiguration())) {
-      return;
-    }
-    if (services.getReplicationPeerManager().getPeerConfig(REGION_REPLICA_REPLICATION_PEER)
-      .isPresent()) {
-      return;
-    }
-    LOG.info("Region replica replication peer id=" + REGION_REPLICA_REPLICATION_PEER +
-      " not exist. Creating...");
-    ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder()
-      .setClusterKey(ZKConfig.getZooKeeperClusterKey(services.getConfiguration()))
-      .setReplicationEndpointImpl(RegionReplicaReplicationEndpoint.class.getName()).build();
-    services.addReplicationPeer(REGION_REPLICA_REPLICATION_PEER, peerConfig, true);
-  }
-
-  /**
    * @return True if Region Read Replica is enabled for <code>tn</code> (whether hbase:meta or
    *   user-space tables).
    */
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
index 3c2bc3f..fe16c31 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
@@ -296,8 +296,8 @@ public class TestIOFencing {
         FAMILY, Lists.newArrayList(new Path("/a")), Lists.newArrayList(new Path("/b")),
         new Path("store_dir"));
       WALUtil.writeCompactionMarker(compactingRegion.getWAL(),
-          ((HRegion)compactingRegion).getReplicationScope(),
-        oldHri, compactionDescriptor, compactingRegion.getMVCC());
+        ((HRegion) compactingRegion).getReplicationScope(), oldHri, compactionDescriptor,
+        compactingRegion.getMVCC(), null);
 
       // Wait till flush has happened, otherwise there won't be multiple store files
       long startWaitTime = EnvironmentEdgeManager.currentTime();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java
index 4b10110..ef3511c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java
@@ -126,12 +126,6 @@ public class DummyAsyncClusterConnection implements AsyncClusterConnection {
   }
 
   @Override
-  public CompletableFuture<Long> replay(TableName tableName, byte[] encodedRegionName, byte[] row,
-      List<Entry> entries, int replicaId, int numRetries, long operationTimeoutNs) {
-    return null;
-  }
-
-  @Override
   public CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[] row,
       boolean reload) {
     return null;
@@ -169,4 +163,11 @@ public class DummyAsyncClusterConnection implements AsyncClusterConnection {
   public CompletableFuture<List<ServerName>> getAllBootstrapNodes(ServerName regionServer) {
     return null;
   }
+
+  @Override
+  public CompletableFuture<Void> replicate(RegionInfo replica,
+    List<Entry> entries, int numRetries, long rpcTimeoutNs,
+    long operationTimeoutNs) {
+    return null;
+  }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index a5584ff..187dca5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -996,7 +996,7 @@ public class TestHRegion {
             region.getRegionFileSystem().getStoreDir(Bytes.toString(family)));
 
       WALUtil.writeCompactionMarker(region.getWAL(), this.region.getReplicationScope(),
-          this.region.getRegionInfo(), compactionDescriptor, region.getMVCC());
+          this.region.getRegionInfo(), compactionDescriptor, region.getMVCC(), null);
 
       Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicaFailover.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicaFailover.java
index ea6589d..ba7e9d1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicaFailover.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicaFailover.java
@@ -38,7 +38,7 @@ import org.apache.hadoop.hbase.client.Consistency;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.client.TableDescriptor;
-import org.apache.hadoop.hbase.replication.regionserver.TestRegionReplicaReplicationEndpoint;
+import org.apache.hadoop.hbase.replication.regionserver.TestRegionReplicaReplication;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
@@ -64,7 +64,7 @@ public class TestRegionReplicaFailover {
       HBaseClassTestRule.forClass(TestRegionReplicaFailover.class);
 
   private static final Logger LOG =
-      LoggerFactory.getLogger(TestRegionReplicaReplicationEndpoint.class);
+      LoggerFactory.getLogger(TestRegionReplicaReplication.class);
 
   private static final HBaseTestingUtil HTU = new HBaseTestingUtil();
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplication.java
similarity index 74%
rename from hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplicationEndpoint.java
rename to hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplication.java
index 5a06a11..b501ab2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplication.java
@@ -20,14 +20,13 @@ package org.apache.hadoop.hbase.replication.regionserver;
 import static org.apache.hadoop.hbase.client.RegionLocator.LOCATOR_META_REPLICAS_MODE;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import java.util.Objects;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellScanner;
@@ -36,7 +35,6 @@ import org.apache.hadoop.hbase.ClientMetaTableAccessor;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtil;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
 import org.apache.hadoop.hbase.TableName;
@@ -45,7 +43,6 @@ import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.client.RegionLocator;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
@@ -67,18 +64,17 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Tests RegionReplicaReplicationEndpoint class for hbase:meta by setting up region replicas and
- * verifying async wal replication replays the edits to the secondary region in various scenarios.
- *
- * @see TestRegionReplicaReplicationEndpoint
+ * Tests region replication for hbase:meta by setting up region replicas and verifying async wal
+ * replication replays the edits to the secondary region in various scenarios.
+ * @see TestRegionReplicaReplication
  */
-@Category({LargeTests.class})
-public class TestMetaRegionReplicaReplicationEndpoint {
+@Category({ LargeTests.class })
+public class TestMetaRegionReplicaReplication {
+
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
-    HBaseClassTestRule.forClass(TestMetaRegionReplicaReplicationEndpoint.class);
-  private static final Logger LOG =
-    LoggerFactory.getLogger(TestMetaRegionReplicaReplicationEndpoint.class);
+    HBaseClassTestRule.forClass(TestMetaRegionReplicaReplication.class);
+  private static final Logger LOG = LoggerFactory.getLogger(TestMetaRegionReplicaReplication.class);
   private static final int NB_SERVERS = 4;
   private final HBaseTestingUtil HTU = new HBaseTestingUtil();
   private int numOfMetaReplica = NB_SERVERS - 1;
@@ -102,17 +98,15 @@ public class TestMetaRegionReplicaReplicationEndpoint {
     conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
     conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);
     // Enable hbase:meta replication.
-    conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CATALOG_CONF_KEY,
-      true);
+    conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CATALOG_CONF_KEY, true);
     // Set hbase:meta replicas to be 3.
     // conf.setInt(HConstants.META_REPLICAS_NUM, numOfMetaReplica);
     HTU.startMiniCluster(NB_SERVERS);
     // Enable hbase:meta replication.
     HBaseTestingUtil.setReplicas(HTU.getAdmin(), TableName.META_TABLE_NAME, numOfMetaReplica);
 
-    HTU.waitFor(30000,
-      () -> HTU.getMiniHBaseCluster().getRegions(TableName.META_TABLE_NAME).size()
-      >= numOfMetaReplica);
+    HTU.waitFor(30000, () -> HTU.getMiniHBaseCluster().getRegions(TableName.META_TABLE_NAME)
+      .size() >= numOfMetaReplica);
   }
 
   @After
@@ -121,83 +115,19 @@ public class TestMetaRegionReplicaReplicationEndpoint {
   }
 
   /**
-   * Assert that the ReplicationSource for hbase:meta gets created when hbase:meta is opened.
-   */
-  @Test
-  public void testHBaseMetaReplicationSourceCreatedOnOpen() throws Exception {
-    SingleProcessHBaseCluster cluster = HTU.getMiniHBaseCluster();
-    HRegionServer hrs = cluster.getRegionServer(cluster.getServerHoldingMeta());
-    // Replicate a row to prove all working.
-    testHBaseMetaReplicatesOneRow(0);
-    assertTrue(isMetaRegionReplicaReplicationSource(hrs));
-    // Now move the hbase:meta and make sure the ReplicationSource is in both places.
-    HRegionServer hrsOther = null;
-    for (int i = 0; i < cluster.getNumLiveRegionServers(); i++) {
-      hrsOther = cluster.getRegionServer(i);
-      if (hrsOther.getServerName().equals(hrs.getServerName())) {
-        hrsOther = null;
-        continue;
-      }
-      break;
-    }
-    assertNotNull(hrsOther);
-    assertFalse(isMetaRegionReplicaReplicationSource(hrsOther));
-    Region meta = null;
-    for (Region region : hrs.getOnlineRegionsLocalContext()) {
-      if (region.getRegionInfo().isMetaRegion()) {
-        meta = region;
-        break;
-      }
-    }
-    assertNotNull(meta);
-    HTU.moveRegionAndWait(meta.getRegionInfo(), hrsOther.getServerName());
-    // Assert that there is a ReplicationSource in both places now.
-    assertTrue(isMetaRegionReplicaReplicationSource(hrs));
-    assertTrue(isMetaRegionReplicaReplicationSource(hrsOther));
-    // Replicate to show stuff still works.
-    testHBaseMetaReplicatesOneRow(1);
-    // Now pretend a few hours have gone by... roll the meta WAL in original location... Move the
-    // meta back and retry replication. See if it works.
-    hrs.getWAL(meta.getRegionInfo()).rollWriter(true);
-    testHBaseMetaReplicatesOneRow(2);
-    hrs.getWAL(meta.getRegionInfo()).rollWriter(true);
-    testHBaseMetaReplicatesOneRow(3);
-  }
-
-  /**
-   * Test meta region replica replication. Create some tables and see if replicas pick up the
-   * additions.
-   */
-  private void testHBaseMetaReplicatesOneRow(int i) throws Exception {
-    waitForMetaReplicasToOnline();
-    try (Table table = HTU.createTable(TableName.valueOf(this.name.getMethodName() + "_" + i),
-      HConstants.CATALOG_FAMILY)) {
-      verifyReplication(TableName.META_TABLE_NAME, numOfMetaReplica, getMetaCells(table.getName()));
-    }
-  }
-
-  /**
-   * @return Whether the special meta region replica peer is enabled on <code>hrs</code>
-   */
-  private boolean isMetaRegionReplicaReplicationSource(HRegionServer hrs) {
-    return hrs.getReplicationSourceService().getReplicationManager().
-      catalogReplicationSource.get() != null;
-  }
-
-  /**
    * Test meta region replica replication. Create some tables and see if replicas pick up the
    * additions.
    */
   @Test
   public void testHBaseMetaReplicates() throws Exception {
-    try (Table table = HTU
-      .createTable(TableName.valueOf(this.name.getMethodName() + "_0"), HConstants.CATALOG_FAMILY,
-        Arrays.copyOfRange(HBaseTestingUtil.KEYS, 1, HBaseTestingUtil.KEYS.length))) {
+    try (Table table = HTU.createTable(TableName.valueOf(this.name.getMethodName() + "_0"),
+      HConstants.CATALOG_FAMILY,
+      Arrays.copyOfRange(HBaseTestingUtil.KEYS, 1, HBaseTestingUtil.KEYS.length))) {
       verifyReplication(TableName.META_TABLE_NAME, numOfMetaReplica, getMetaCells(table.getName()));
     }
-    try (Table table = HTU
-      .createTable(TableName.valueOf(this.name.getMethodName() + "_1"), HConstants.CATALOG_FAMILY,
-        Arrays.copyOfRange(HBaseTestingUtil.KEYS, 1, HBaseTestingUtil.KEYS.length))) {
+    try (Table table = HTU.createTable(TableName.valueOf(this.name.getMethodName() + "_1"),
+      HConstants.CATALOG_FAMILY,
+      Arrays.copyOfRange(HBaseTestingUtil.KEYS, 1, HBaseTestingUtil.KEYS.length))) {
       verifyReplication(TableName.META_TABLE_NAME, numOfMetaReplica, getMetaCells(table.getName()));
       // Try delete.
       HTU.deleteTableIfAny(table.getName());
@@ -207,26 +137,22 @@ public class TestMetaRegionReplicaReplicationEndpoint {
 
   @Test
   public void testCatalogReplicaReplicationWithFlushAndCompaction() throws Exception {
-    Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
-    TableName tableName = TableName.valueOf("hbase:meta");
-    Table table = connection.getTable(tableName);
-    try {
+    try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
+      Table table = connection.getTable(TableName.META_TABLE_NAME)) {
       // load the data to the table
       for (int i = 0; i < 5; i++) {
         LOG.info("Writing data from " + i * 1000 + " to " + (i * 1000 + 1000));
         HTU.loadNumericRows(table, HConstants.CATALOG_FAMILY, i * 1000, i * 1000 + 1000);
         LOG.info("flushing table");
-        HTU.flush(tableName);
+        HTU.flush(TableName.META_TABLE_NAME);
         LOG.info("compacting table");
         if (i < 4) {
-          HTU.compact(tableName, false);
+          HTU.compact(TableName.META_TABLE_NAME, false);
         }
       }
 
-      verifyReplication(tableName, numOfMetaReplica, 0, 5000, HConstants.CATALOG_FAMILY);
-    } finally {
-      table.close();
-      connection.close();
+      verifyReplication(TableName.META_TABLE_NAME, numOfMetaReplica, 0, 5000,
+        HConstants.CATALOG_FAMILY);
     }
   }
 
@@ -235,7 +161,6 @@ public class TestMetaRegionReplicaReplicationEndpoint {
     SingleProcessHBaseCluster cluster = HTU.getMiniHBaseCluster();
     HRegionServer hrs = cluster.getRegionServer(cluster.getServerHoldingMeta());
 
-    HRegionServer hrsMetaReplica = null;
     HRegionServer hrsNoMetaReplica = null;
     HRegionServer server = null;
     Region metaReplica = null;
@@ -260,11 +185,8 @@ public class TestMetaRegionReplicaReplicationEndpoint {
         hrsNoMetaReplica = server;
       }
     }
-
-    Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
-    TableName tableName = TableName.valueOf("hbase:meta");
-    Table table = connection.getTable(tableName);
-    try {
+    try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
+      Table table = connection.getTable(TableName.META_TABLE_NAME)) {
       // load the data to the table
       for (int i = 0; i < 5; i++) {
         LOG.info("Writing data from " + i * 1000 + " to " + (i * 1000 + 1000));
@@ -274,10 +196,8 @@ public class TestMetaRegionReplicaReplicationEndpoint {
         }
       }
 
-      verifyReplication(tableName, numOfMetaReplica, 0, 5000, HConstants.CATALOG_FAMILY);
-    } finally {
-      table.close();
-      connection.close();
+      verifyReplication(TableName.META_TABLE_NAME, numOfMetaReplica, 0, 5000,
+        HConstants.CATALOG_FAMILY);
     }
   }
 
@@ -324,22 +244,6 @@ public class TestMetaRegionReplicaReplicationEndpoint {
   }
 
   /**
-   * Replicas come online after primary.
-   */
-  private void waitForMetaReplicasToOnline() throws IOException {
-    final RegionLocator regionLocator =
-      HTU.getConnection().getRegionLocator(TableName.META_TABLE_NAME);
-    HTU.waitFor(10000,
-      // getRegionLocations returns an entry for each replica but if unassigned, entry is null.
-      // Pass reload to force us to skip cache else it just keeps returning default.
-      () -> regionLocator.getRegionLocations(HConstants.EMPTY_START_ROW, true).stream().
-        filter(Objects::nonNull).count() >= numOfMetaReplica);
-    List<HRegionLocation> locations = regionLocator.getRegionLocations(HConstants.EMPTY_START_ROW);
-    LOG.info("Found locations {}", locations);
-    assertEquals(numOfMetaReplica, locations.size());
-  }
-
-  /**
    * Scan hbase:meta for <code>tableName</code> content.
    */
   private List<Result> getMetaCells(TableName tableName) throws IOException {
@@ -373,20 +277,9 @@ public class TestMetaRegionReplicaReplicationEndpoint {
     return regions;
   }
 
-  private Region getOneRegion(TableName tableName) {
-    for (int i = 0; i < NB_SERVERS; i++) {
-      HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i);
-      List<HRegion> onlineRegions = rs.getRegions(tableName);
-      if (onlineRegions.size() > 1) {
-        return onlineRegions.get(0);
-      }
-    }
-    return null;
-  }
-
   /**
-   * Verify when a Table is deleted from primary, then there are no references in replicas
-   * (because they get the delete of the table rows too).
+   * Verify when a Table is deleted from primary, then there are no references in replicas (because
+   * they get the delete of the table rows too).
    */
   private void verifyDeletedReplication(TableName tableName, int regionReplication,
     final TableName deletedTableName) {
@@ -417,8 +310,8 @@ public class TestMetaRegionReplicaReplicationEndpoint {
   }
 
   /**
-   * Cells are from hbase:meta replica so will start w/ 'tableName,'; i.e. the tablename followed
-   * by HConstants.DELIMITER. Make sure the deleted table is no longer present in passed
+   * Cells are from hbase:meta replica so will start w/ 'tableName,'; i.e. the tablename followed by
+   * HConstants.DELIMITER. Make sure the deleted table is no longer present in passed
    * <code>cells</code>.
    */
   private boolean doesNotContain(List<Cell> cells, TableName tableName) {
@@ -491,21 +384,19 @@ public class TestMetaRegionReplicaReplicationEndpoint {
   }
 
   private void primaryNoChangeReplicaIncrease(final long[] before, final long[] after) {
-    assertEquals(before[RegionInfo.DEFAULT_REPLICA_ID],
-      after[RegionInfo.DEFAULT_REPLICA_ID]);
+    assertEquals(before[RegionInfo.DEFAULT_REPLICA_ID], after[RegionInfo.DEFAULT_REPLICA_ID]);
 
-    for (int i = 1; i < after.length; i ++) {
+    for (int i = 1; i < after.length; i++) {
       assertTrue(after[i] > before[i]);
     }
   }
 
   private void primaryIncreaseReplicaNoChange(final long[] before, final long[] after) {
     // There are read requests increase for primary meta replica.
-    assertTrue(after[RegionInfo.DEFAULT_REPLICA_ID] >
-      before[RegionInfo.DEFAULT_REPLICA_ID]);
+    assertTrue(after[RegionInfo.DEFAULT_REPLICA_ID] > before[RegionInfo.DEFAULT_REPLICA_ID]);
 
     // No change for replica regions
-    for (int i = 1; i < after.length; i ++) {
+    for (int i = 1; i < after.length; i++) {
       assertEquals(before[i], after[i]);
     }
   }
@@ -515,13 +406,12 @@ public class TestMetaRegionReplicaReplicationEndpoint {
     for (Region r : metaRegions) {
       LOG.info("read request for region {} is {}", r, r.getReadRequestsCount());
       counters[i] = r.getReadRequestsCount();
-      i ++;
+      i++;
     }
   }
 
   @Test
   public void testHBaseMetaReplicaGets() throws Exception {
-
     TableName tn = TableName.valueOf(this.name.getMethodName());
     final Region[] metaRegions = getAllRegions(TableName.META_TABLE_NAME, numOfMetaReplica);
     long[] readReqsForMetaReplicas = new long[numOfMetaReplica];
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplication.java
new file mode 100644
index 0000000..7dd4255
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplication.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import static org.junit.Assert.assertNotNull;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.testclassification.FlakeyTests;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles;
+
+/**
+ * Tests RegionReplicaReplicationEndpoint class by setting up region replicas and verifying
+ * async wal replication replays the edits to the secondary region in various scenarios.
+ */
+@Category({FlakeyTests.class, LargeTests.class})
+public class TestRegionReplicaReplication {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestRegionReplicaReplication.class);
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestRegionReplicaReplication.class);
+
+  private static final int NB_SERVERS = 2;
+
+  private static final HBaseTestingUtil HTU = new HBaseTestingUtil();
+
+  @Rule
+  public TestName name = new TestName();
+
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    Configuration conf = HTU.getConfiguration();
+    conf.setFloat("hbase.regionserver.logroll.multiplier", 0.0003f);
+    conf.setInt("replication.source.size.capacity", 10240);
+    conf.setLong("replication.source.sleepforretries", 100);
+    conf.setInt("hbase.regionserver.maxlogs", 10);
+    conf.setLong("hbase.master.logcleaner.ttl", 10);
+    conf.setInt("zookeeper.recovery.retry", 1);
+    conf.setInt("zookeeper.recovery.retry.intervalmill", 10);
+    conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true);
+    conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
+    conf.setInt("replication.stats.thread.period.seconds", 5);
+    conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
+    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); // less number of retries is needed
+    conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);
+
+    HTU.startMiniCluster(NB_SERVERS);
+  }
+
+  @AfterClass
+  public static void afterClass() throws Exception {
+    HTU.shutdownMiniCluster();
+  }
+
+  private void testRegionReplicaReplication(int regionReplication) throws Exception {
+    // test region replica replication. Create a table with single region, write some data
+    // ensure that data is replicated to the secondary region
+    TableName tableName = TableName.valueOf("testRegionReplicaReplicationWithReplicas_"
+        + regionReplication);
+    TableDescriptor htd = HTU
+      .createModifyableTableDescriptor(TableName.valueOf(tableName.toString()),
+        ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
+        ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED)
+      .setRegionReplication(regionReplication).build();
+    createOrEnableTableWithRetries(htd, true);
+    TableName tableNameNoReplicas =
+        TableName.valueOf("testRegionReplicaReplicationWithReplicas_NO_REPLICAS");
+    HTU.deleteTableIfAny(tableNameNoReplicas);
+    HTU.createTable(tableNameNoReplicas, HBaseTestingUtil.fam1);
+
+    try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
+      Table table = connection.getTable(tableName);
+      Table tableNoReplicas = connection.getTable(tableNameNoReplicas)) {
+      // load some data to the non-replicated table
+      HTU.loadNumericRows(tableNoReplicas, HBaseTestingUtil.fam1, 6000, 7000);
+
+      // load the data to the table
+      HTU.loadNumericRows(table, HBaseTestingUtil.fam1, 0, 1000);
+
+      verifyReplication(tableName, regionReplication, 0, 1000);
+    } finally {
+      HTU.deleteTableIfAny(tableNameNoReplicas);
+    }
+  }
+
+  private void verifyReplication(TableName tableName, int regionReplication,
+      final int startRow, final int endRow) throws Exception {
+    verifyReplication(tableName, regionReplication, startRow, endRow, true);
+  }
+
+  private void verifyReplication(TableName tableName, int regionReplication,
+      final int startRow, final int endRow, final boolean present) throws Exception {
+    // find the regions
+    final Region[] regions = new Region[regionReplication];
+
+    for (int i=0; i < NB_SERVERS; i++) {
+      HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i);
+      List<HRegion> onlineRegions = rs.getRegions(tableName);
+      for (HRegion region : onlineRegions) {
+        regions[region.getRegionInfo().getReplicaId()] = region;
+      }
+    }
+
+    for (Region region : regions) {
+      assertNotNull(region);
+    }
+
+    for (int i = 1; i < regionReplication; i++) {
+      final Region region = regions[i];
+      // wait until all the data is replicated to all secondary regions
+      Waiter.waitFor(HTU.getConfiguration(), 90000, 1000, new Waiter.Predicate<Exception>() {
+        @Override
+        public boolean evaluate() throws Exception {
+          LOG.info("verifying replication for region replica:" + region.getRegionInfo());
+          try {
+            HTU.verifyNumericRows(region, HBaseTestingUtil.fam1, startRow, endRow, present);
+          } catch(Throwable ex) {
+            LOG.warn("Verification from secondary region is not complete yet", ex);
+            // still wait
+            return false;
+          }
+          return true;
+        }
+      });
+    }
+  }
+
+  @Test
+  public void testRegionReplicaReplicationWith2Replicas() throws Exception {
+    testRegionReplicaReplication(2);
+  }
+
+  @Test
+  public void testRegionReplicaReplicationWith3Replicas() throws Exception {
+    testRegionReplicaReplication(3);
+  }
+
+  @Test
+  public void testRegionReplicaReplicationWith10Replicas() throws Exception {
+    testRegionReplicaReplication(10);
+  }
+
+  @Test
+  public void testRegionReplicaWithoutMemstoreReplication() throws Exception {
+    int regionReplication = 3;
+    TableDescriptor htd = HTU.createModifyableTableDescriptor(name.getMethodName())
+      .setRegionReplication(regionReplication).setRegionMemStoreReplication(false).build();
+    createOrEnableTableWithRetries(htd, true);
+    final TableName tableName = htd.getTableName();
+    Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
+    Table table = connection.getTable(tableName);
+    try {
+      // write data to the primary. The replicas should not receive the data
+      final int STEP = 100;
+      for (int i = 0; i < 3; ++i) {
+        final int startRow = i * STEP;
+        final int endRow = (i + 1) * STEP;
+        LOG.info("Writing data from " + startRow + " to " + endRow);
+        HTU.loadNumericRows(table, HBaseTestingUtil.fam1, startRow, endRow);
+        verifyReplication(tableName, regionReplication, startRow, endRow, false);
+
+        // Flush the table, now the data should show up in the replicas
+        LOG.info("flushing table");
+        HTU.flush(tableName);
+        verifyReplication(tableName, regionReplication, 0, endRow, true);
+      }
+    } finally {
+      table.close();
+      connection.close();
+    }
+  }
+
+  @Test
+  public void testRegionReplicaReplicationForFlushAndCompaction() throws Exception {
+    // Tests a table with region replication 3. Writes some data, and causes flushes and
+    // compactions. Verifies that the data is readable from the replicas. Note that this
+    // does not test whether the replicas actually pick up flushed files and apply compaction
+    // to their stores
+    int regionReplication = 3;
+    TableDescriptor htd = HTU.createModifyableTableDescriptor(name.getMethodName())
+      .setRegionReplication(regionReplication).build();
+    createOrEnableTableWithRetries(htd, true);
+    final TableName tableName = htd.getTableName();
+
+    Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
+    Table table = connection.getTable(tableName);
+    try {
+      // load the data to the table
+
+      for (int i = 0; i < 6000; i += 1000) {
+        LOG.info("Writing data from " + i + " to " + (i+1000));
+        HTU.loadNumericRows(table, HBaseTestingUtil.fam1, i, i+1000);
+        LOG.info("flushing table");
+        HTU.flush(tableName);
+        LOG.info("compacting table");
+        HTU.compact(tableName, false);
+      }
+
+      verifyReplication(tableName, regionReplication, 0, 1000);
+    } finally {
+      table.close();
+      connection.close();
+    }
+  }
+
+  private void createOrEnableTableWithRetries(TableDescriptor htd, boolean createTableOperation) {
+    // Helper function to run create/enable table operations with a retry feature
+    boolean continueToRetry = true;
+    int tries = 0;
+    while (continueToRetry && tries < 50) {
+      try {
+        continueToRetry = false;
+        if (createTableOperation) {
+          HTU.getAdmin().createTable(htd);
+        } else {
+          HTU.getAdmin().enableTable(htd.getTableName());
+        }
+      } catch (IOException e) {
+        if (e.getCause() instanceof ReplicationException) {
+          continueToRetry = true;
+          tries++;
+          Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+        }
+      }
+    }
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java
deleted file mode 100644
index d238e09..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java
+++ /dev/null
@@ -1,515 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication.regionserver;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.Cell.Type;
-import org.apache.hadoop.hbase.CellBuilderFactory;
-import org.apache.hadoop.hbase.CellBuilderType;
-import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HBaseTestingUtil;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.Waiter;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.RegionLocator;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.client.TableDescriptor;
-import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
-import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
-import org.apache.hadoop.hbase.regionserver.Region;
-import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
-import org.apache.hadoop.hbase.replication.ReplicationException;
-import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
-import org.apache.hadoop.hbase.testclassification.FlakeyTests;
-import org.apache.hadoop.hbase.testclassification.LargeTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
-import org.apache.hadoop.hbase.wal.WAL.Entry;
-import org.apache.hadoop.hbase.wal.WALEdit;
-import org.apache.hadoop.hbase.wal.WALKeyImpl;
-import org.apache.hadoop.hbase.zookeeper.ZKConfig;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
-import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles;
-
-/**
- * Tests RegionReplicaReplicationEndpoint class by setting up region replicas and verifying
- * async wal replication replays the edits to the secondary region in various scenarios.
- */
-@Category({FlakeyTests.class, LargeTests.class})
-public class TestRegionReplicaReplicationEndpoint {
-
-  @ClassRule
-  public static final HBaseClassTestRule CLASS_RULE =
-      HBaseClassTestRule.forClass(TestRegionReplicaReplicationEndpoint.class);
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(TestRegionReplicaReplicationEndpoint.class);
-
-  private static final int NB_SERVERS = 2;
-
-  private static final HBaseTestingUtil HTU = new HBaseTestingUtil();
-
-  @Rule
-  public TestName name = new TestName();
-
-  @BeforeClass
-  public static void beforeClass() throws Exception {
-    Configuration conf = HTU.getConfiguration();
-    conf.setFloat("hbase.regionserver.logroll.multiplier", 0.0003f);
-    conf.setInt("replication.source.size.capacity", 10240);
-    conf.setLong("replication.source.sleepforretries", 100);
-    conf.setInt("hbase.regionserver.maxlogs", 10);
-    conf.setLong("hbase.master.logcleaner.ttl", 10);
-    conf.setInt("zookeeper.recovery.retry", 1);
-    conf.setInt("zookeeper.recovery.retry.intervalmill", 10);
-    conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true);
-    conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
-    conf.setInt("replication.stats.thread.period.seconds", 5);
-    conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
-    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); // less number of retries is needed
-    conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);
-
-    HTU.startMiniCluster(NB_SERVERS);
-  }
-
-  @AfterClass
-  public static void afterClass() throws Exception {
-    HTU.shutdownMiniCluster();
-  }
-
-  @Test
-  public void testRegionReplicaReplicationPeerIsCreated() throws IOException {
-    // create a table with region replicas. Check whether the replication peer is created
-    // and replication started.
-    try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
-      Admin admin = connection.getAdmin()) {
-      String peerId = ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER;
-
-      ReplicationPeerConfig peerConfig = null;
-      try {
-        peerConfig = admin.getReplicationPeerConfig(peerId);
-      } catch (ReplicationPeerNotFoundException e) {
-        LOG.warn("Region replica replication peer id=" + peerId + " not exist", e);
-      }
-
-      if (peerConfig != null) {
-        admin.removeReplicationPeer(peerId);
-        peerConfig = null;
-      }
-
-      TableDescriptor htd = HTU
-        .createTableDescriptor(TableName.valueOf("testReplicationPeerIsCreated_no_region_replicas"),
-          ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
-          ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
-      createOrEnableTableWithRetries(htd, true);
-      try {
-        peerConfig = admin.getReplicationPeerConfig(peerId);
-        fail("Should throw ReplicationException, because replication peer id=" + peerId
-          + " not exist");
-      } catch (ReplicationPeerNotFoundException e) {
-      }
-      assertNull(peerConfig);
-
-      htd = HTU.createModifyableTableDescriptor(TableName.valueOf("testReplicationPeerIsCreated"),
-        ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
-        ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED).setRegionReplication(2).build();
-      createOrEnableTableWithRetries(htd, true);
-
-      // assert peer configuration is correct
-      peerConfig = admin.getReplicationPeerConfig(peerId);
-      assertNotNull(peerConfig);
-      assertEquals(peerConfig.getClusterKey(),
-        ZKConfig.getZooKeeperClusterKey(HTU.getConfiguration()));
-      assertEquals(RegionReplicaReplicationEndpoint.class.getName(),
-        peerConfig.getReplicationEndpointImpl());
-    }
-  }
-
-  @Test
-  public void testRegionReplicaReplicationPeerIsCreatedForModifyTable() throws Exception {
-    // modify a table by adding region replicas. Check whether the replication peer is created
-    // and replication started.
-    try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
-      Admin admin = connection.getAdmin()) {
-      String peerId = "region_replica_replication";
-
-      ReplicationPeerConfig peerConfig = null;
-      try {
-        peerConfig = admin.getReplicationPeerConfig(peerId);
-      } catch (ReplicationPeerNotFoundException e) {
-        LOG.warn("Region replica replication peer id=" + peerId + " not exist", e);
-      }
-
-      if (peerConfig != null) {
-        admin.removeReplicationPeer(peerId);
-        peerConfig = null;
-      }
-
-      TableDescriptor htd = HTU.createTableDescriptor(
-        TableName.valueOf("testRegionReplicaReplicationPeerIsCreatedForModifyTable"),
-        ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
-        ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
-      createOrEnableTableWithRetries(htd, true);
-
-      // assert that replication peer is not created yet
-      try {
-        peerConfig = admin.getReplicationPeerConfig(peerId);
-        fail("Should throw ReplicationException, because replication peer id=" + peerId
-          + " not exist");
-      } catch (ReplicationPeerNotFoundException e) {
-      }
-      assertNull(peerConfig);
-
-      HTU.getAdmin().disableTable(htd.getTableName());
-      htd = TableDescriptorBuilder.newBuilder(htd).setRegionReplication(2).build();
-      HTU.getAdmin().modifyTable(htd);
-      createOrEnableTableWithRetries(htd, false);
-
-      // assert peer configuration is correct
-      peerConfig = admin.getReplicationPeerConfig(peerId);
-      assertNotNull(peerConfig);
-      assertEquals(peerConfig.getClusterKey(),
-        ZKConfig.getZooKeeperClusterKey(HTU.getConfiguration()));
-      assertEquals(RegionReplicaReplicationEndpoint.class.getName(),
-        peerConfig.getReplicationEndpointImpl());
-    }
-  }
-
-  public void testRegionReplicaReplication(int regionReplication) throws Exception {
-    // test region replica replication. Create a table with single region, write some data
-    // ensure that data is replicated to the secondary region
-    TableName tableName = TableName.valueOf("testRegionReplicaReplicationWithReplicas_"
-        + regionReplication);
-    TableDescriptor htd = HTU
-      .createModifyableTableDescriptor(TableName.valueOf(tableName.toString()),
-        ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
-        ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED)
-      .setRegionReplication(regionReplication).build();
-    createOrEnableTableWithRetries(htd, true);
-    TableName tableNameNoReplicas =
-        TableName.valueOf("testRegionReplicaReplicationWithReplicas_NO_REPLICAS");
-    HTU.deleteTableIfAny(tableNameNoReplicas);
-    HTU.createTable(tableNameNoReplicas, HBaseTestingUtil.fam1);
-
-    Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
-    Table table = connection.getTable(tableName);
-    Table tableNoReplicas = connection.getTable(tableNameNoReplicas);
-
-    try {
-      // load some data to the non-replicated table
-      HTU.loadNumericRows(tableNoReplicas, HBaseTestingUtil.fam1, 6000, 7000);
-
-      // load the data to the table
-      HTU.loadNumericRows(table, HBaseTestingUtil.fam1, 0, 1000);
-
-      verifyReplication(tableName, regionReplication, 0, 1000);
-
-    } finally {
-      table.close();
-      tableNoReplicas.close();
-      HTU.deleteTableIfAny(tableNameNoReplicas);
-      connection.close();
-    }
-  }
-
-  private void verifyReplication(TableName tableName, int regionReplication,
-      final int startRow, final int endRow) throws Exception {
-    verifyReplication(tableName, regionReplication, startRow, endRow, true);
-  }
-
-  private void verifyReplication(TableName tableName, int regionReplication,
-      final int startRow, final int endRow, final boolean present) throws Exception {
-    // find the regions
-    final Region[] regions = new Region[regionReplication];
-
-    for (int i=0; i < NB_SERVERS; i++) {
-      HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i);
-      List<HRegion> onlineRegions = rs.getRegions(tableName);
-      for (HRegion region : onlineRegions) {
-        regions[region.getRegionInfo().getReplicaId()] = region;
-      }
-    }
-
-    for (Region region : regions) {
-      assertNotNull(region);
-    }
-
-    for (int i = 1; i < regionReplication; i++) {
-      final Region region = regions[i];
-      // wait until all the data is replicated to all secondary regions
-      Waiter.waitFor(HTU.getConfiguration(), 90000, 1000, new Waiter.Predicate<Exception>() {
-        @Override
-        public boolean evaluate() throws Exception {
-          LOG.info("verifying replication for region replica:" + region.getRegionInfo());
-          try {
-            HTU.verifyNumericRows(region, HBaseTestingUtil.fam1, startRow, endRow, present);
-          } catch(Throwable ex) {
-            LOG.warn("Verification from secondary region is not complete yet", ex);
-            // still wait
-            return false;
-          }
-          return true;
-        }
-      });
-    }
-  }
-
-  @Test
-  public void testRegionReplicaReplicationWith2Replicas() throws Exception {
-    testRegionReplicaReplication(2);
-  }
-
-  @Test
-  public void testRegionReplicaReplicationWith3Replicas() throws Exception {
-    testRegionReplicaReplication(3);
-  }
-
-  @Test
-  public void testRegionReplicaReplicationWith10Replicas() throws Exception {
-    testRegionReplicaReplication(10);
-  }
-
-  @Test
-  public void testRegionReplicaWithoutMemstoreReplication() throws Exception {
-    int regionReplication = 3;
-    TableDescriptor htd = HTU.createModifyableTableDescriptor(name.getMethodName())
-      .setRegionReplication(regionReplication).setRegionMemStoreReplication(false).build();
-    createOrEnableTableWithRetries(htd, true);
-    final TableName tableName = htd.getTableName();
-    Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
-    Table table = connection.getTable(tableName);
-    try {
-      // write data to the primary. The replicas should not receive the data
-      final int STEP = 100;
-      for (int i = 0; i < 3; ++i) {
-        final int startRow = i * STEP;
-        final int endRow = (i + 1) * STEP;
-        LOG.info("Writing data from " + startRow + " to " + endRow);
-        HTU.loadNumericRows(table, HBaseTestingUtil.fam1, startRow, endRow);
-        verifyReplication(tableName, regionReplication, startRow, endRow, false);
-
-        // Flush the table, now the data should show up in the replicas
-        LOG.info("flushing table");
-        HTU.flush(tableName);
-        verifyReplication(tableName, regionReplication, 0, endRow, true);
-      }
-    } finally {
-      table.close();
-      connection.close();
-    }
-  }
-
-  @Test
-  public void testRegionReplicaReplicationForFlushAndCompaction() throws Exception {
-    // Tests a table with region replication 3. Writes some data, and causes flushes and
-    // compactions. Verifies that the data is readable from the replicas. Note that this
-    // does not test whether the replicas actually pick up flushed files and apply compaction
-    // to their stores
-    int regionReplication = 3;
-    TableDescriptor htd = HTU.createModifyableTableDescriptor(name.getMethodName())
-      .setRegionReplication(regionReplication).build();
-    createOrEnableTableWithRetries(htd, true);
-    final TableName tableName = htd.getTableName();
-
-    Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
-    Table table = connection.getTable(tableName);
-    try {
-      // load the data to the table
-
-      for (int i = 0; i < 6000; i += 1000) {
-        LOG.info("Writing data from " + i + " to " + (i+1000));
-        HTU.loadNumericRows(table, HBaseTestingUtil.fam1, i, i+1000);
-        LOG.info("flushing table");
-        HTU.flush(tableName);
-        LOG.info("compacting table");
-        HTU.compact(tableName, false);
-      }
-
-      verifyReplication(tableName, regionReplication, 0, 1000);
-    } finally {
-      table.close();
-      connection.close();
-    }
-  }
-
-  @Test
-  public void testRegionReplicaReplicationIgnoresDisabledTables() throws Exception {
-    testRegionReplicaReplicationIgnores(false, false);
-  }
-
-  @Test
-  public void testRegionReplicaReplicationIgnoresDroppedTables() throws Exception {
-    testRegionReplicaReplicationIgnores(true, false);
-  }
-
-  @Test
-  public void testRegionReplicaReplicationIgnoresNonReplicatedTables() throws Exception {
-    testRegionReplicaReplicationIgnores(false, true);
-  }
-
-  private void testRegionReplicaReplicationIgnores(boolean dropTable, boolean disableReplication)
-      throws Exception {
-    // tests having edits from a disabled or dropped table is handled correctly by skipping those
-    // entries and further edits after the edits from dropped/disabled table can be replicated
-    // without problems.
-    int regionReplication = 3;
-    TableDescriptor htd = HTU
-      .createModifyableTableDescriptor(
-        name.getMethodName() + "_drop_" + dropTable + "_disabledReplication_" + disableReplication)
-      .setRegionReplication(regionReplication).build();
-    final TableName tableName = htd.getTableName();
-    HTU.deleteTableIfAny(tableName);
-
-    createOrEnableTableWithRetries(htd, true);
-    TableName toBeDisabledTable = TableName.valueOf(
-      dropTable ? "droppedTable" : (disableReplication ? "disableReplication" : "disabledTable"));
-    HTU.deleteTableIfAny(toBeDisabledTable);
-    htd = HTU
-      .createModifyableTableDescriptor(toBeDisabledTable,
-        ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
-        ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED)
-      .setRegionReplication(regionReplication).build();
-    createOrEnableTableWithRetries(htd, true);
-
-    // both tables are created, now pause replication
-    HTU.getAdmin().disableReplicationPeer(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER);
-
-    // now that the replication is disabled, write to the table to be dropped, then drop the table.
-
-    Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
-    Table table = connection.getTable(tableName);
-    Table tableToBeDisabled = connection.getTable(toBeDisabledTable);
-
-    HTU.loadNumericRows(tableToBeDisabled, HBaseTestingUtil.fam1, 6000, 7000);
-
-    RegionLocator rl = connection.getRegionLocator(toBeDisabledTable);
-    HRegionLocation hrl = rl.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY);
-    byte[] encodedRegionName = hrl.getRegion().getEncodedNameAsBytes();
-
-    Cell cell = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(Bytes.toBytes("A"))
-        .setFamily(HTU.fam1).setValue(Bytes.toBytes("VAL")).setType(Type.Put).build();
-    Entry entry = new Entry(
-      new WALKeyImpl(encodedRegionName, toBeDisabledTable, 1),
-        new WALEdit()
-            .add(cell));
-    HTU.getAdmin().disableTable(toBeDisabledTable); // disable the table
-    if (dropTable) {
-      HTU.getAdmin().deleteTable(toBeDisabledTable);
-    } else if (disableReplication) {
-      htd =
-        TableDescriptorBuilder.newBuilder(htd).setRegionReplication(regionReplication - 2).build();
-      HTU.getAdmin().modifyTable(htd);
-      createOrEnableTableWithRetries(htd, false);
-    }
-
-    HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(0);
-    MetricsSource metrics = mock(MetricsSource.class);
-    ReplicationEndpoint.Context ctx =
-      new ReplicationEndpoint.Context(rs, HTU.getConfiguration(), HTU.getConfiguration(),
-        HTU.getTestFileSystem(), ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER,
-        UUID.fromString(rs.getClusterId()), rs.getReplicationSourceService().getReplicationPeers()
-          .getPeer(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER),
-        metrics, rs.getTableDescriptors(), rs);
-    RegionReplicaReplicationEndpoint rrpe = new RegionReplicaReplicationEndpoint();
-    rrpe.init(ctx);
-    rrpe.start();
-    ReplicationEndpoint.ReplicateContext repCtx = new ReplicationEndpoint.ReplicateContext();
-    repCtx.setEntries(Lists.newArrayList(entry, entry));
-    assertTrue(rrpe.replicate(repCtx));
-    verify(metrics, times(1)).incrLogEditsFiltered(eq(2L));
-    rrpe.stop();
-    if (disableReplication) {
-      // enable replication again so that we can verify replication
-      HTU.getAdmin().disableTable(toBeDisabledTable); // disable the table
-      htd = TableDescriptorBuilder.newBuilder(htd).setRegionReplication(regionReplication).build();
-      HTU.getAdmin().modifyTable(htd);
-      createOrEnableTableWithRetries(htd, false);
-    }
-
-    try {
-      // load some data to the to-be-dropped table
-      // load the data to the table
-      HTU.loadNumericRows(table, HBaseTestingUtil.fam1, 0, 1000);
-
-      // now enable the replication
-      HTU.getAdmin().enableReplicationPeer(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER);
-
-      verifyReplication(tableName, regionReplication, 0, 1000);
-    } finally {
-      table.close();
-      rl.close();
-      tableToBeDisabled.close();
-      HTU.deleteTableIfAny(toBeDisabledTable);
-      connection.close();
-    }
-  }
-
-  private void createOrEnableTableWithRetries(TableDescriptor htd, boolean createTableOperation) {
-    // Helper function to run create/enable table operations with a retry feature
-    boolean continueToRetry = true;
-    int tries = 0;
-    while (continueToRetry && tries < 50) {
-      try {
-        continueToRetry = false;
-        if (createTableOperation) {
-          HTU.getAdmin().createTable(htd);
-        } else {
-          HTU.getAdmin().enableTable(htd.getTableName());
-        }
-      } catch (IOException e) {
-        if (e.getCause() instanceof ReplicationException) {
-          continueToRetry = true;
-          tries++;
-          Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
-        }
-      }
-    }
-  }
-}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
deleted file mode 100644
index a37ab5f..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
+++ /dev/null
@@ -1,281 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication.regionserver;
-
-import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.closeRegion;
-import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.openRegion;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Optional;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HBaseTestingUtil;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.RegionLocations;
-import org.apache.hadoop.hbase.StartTestingClusterOption;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.AsyncClusterConnection;
-import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
-import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
-import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.client.RegionLocator;
-import org.apache.hadoop.hbase.client.RegionReplicaUtil;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.client.TableDescriptor;
-import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
-import org.apache.hadoop.hbase.coprocessor.ObserverContext;
-import org.apache.hadoop.hbase.coprocessor.WALCoprocessor;
-import org.apache.hadoop.hbase.coprocessor.WALCoprocessorEnvironment;
-import org.apache.hadoop.hbase.coprocessor.WALObserver;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
-import org.apache.hadoop.hbase.regionserver.Region;
-import org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster;
-import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
-import org.apache.hadoop.hbase.replication.ReplicationEndpoint.ReplicateContext;
-import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.testclassification.ReplicationTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
-import org.apache.hadoop.hbase.wal.WAL.Entry;
-import org.apache.hadoop.hbase.wal.WALEdit;
-import org.apache.hadoop.hbase.wal.WALKey;
-import org.apache.hadoop.hbase.wal.WALKeyImpl;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
-
-/**
- * Tests RegionReplicaReplicationEndpoint. Unlike TestRegionReplicaReplicationEndpoint this
- * class contains lower level tests using callables.
- */
-@Category({ReplicationTests.class, MediumTests.class})
-public class TestRegionReplicaReplicationEndpointNoMaster {
-
-  @ClassRule
-  public static final HBaseClassTestRule CLASS_RULE =
-      HBaseClassTestRule.forClass(TestRegionReplicaReplicationEndpointNoMaster.class);
-
-  private static final int NB_SERVERS = 2;
-  private static TableName tableName = TableName.valueOf(
-    TestRegionReplicaReplicationEndpointNoMaster.class.getSimpleName());
-  private static Table table;
-  private static final byte[] row = Bytes.toBytes("TestRegionReplicaReplicator");
-
-  private static HRegionServer rs0;
-  private static HRegionServer rs1;
-
-  private static RegionInfo hriPrimary;
-  private static RegionInfo hriSecondary;
-
-  private static final HBaseTestingUtil HTU = new HBaseTestingUtil();
-  private static final byte[] f = HConstants.CATALOG_FAMILY;
-
-  @BeforeClass
-  public static void beforeClass() throws Exception {
-    Configuration conf = HTU.getConfiguration();
-    conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true);
-    conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_WAIT_FOR_PRIMARY_FLUSH_CONF_KEY, false);
-
-    // install WALObserver coprocessor for tests
-    String walCoprocs = HTU.getConfiguration().get(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY);
-    if (walCoprocs == null) {
-      walCoprocs = WALEditCopro.class.getName();
-    } else {
-      walCoprocs += "," + WALEditCopro.class.getName();
-    }
-    HTU.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY,
-      walCoprocs);
-    StartTestingClusterOption option = StartTestingClusterOption.builder()
-      .numAlwaysStandByMasters(1).numRegionServers(NB_SERVERS).numDataNodes(NB_SERVERS).build();
-    HTU.startMiniCluster(option);
-
-    // Create table then get the single region for our new table.
-    TableDescriptor htd = HTU.createTableDescriptor(TableName.valueOf(tableName.getNameAsString()),
-      ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
-      ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
-    table = HTU.createTable(htd, new byte[][]{f}, null);
-
-    try (RegionLocator locator = HTU.getConnection().getRegionLocator(tableName)) {
-      hriPrimary = locator.getRegionLocation(row, false).getRegion();
-    }
-
-    // mock a secondary region info to open
-    hriSecondary = RegionReplicaUtil.getRegionInfoForReplica(hriPrimary, 1);
-
-    // No master
-    TestRegionServerNoMaster.stopMasterAndCacheMetaLocation(HTU);
-    rs0 = HTU.getMiniHBaseCluster().getRegionServer(0);
-    rs1 = HTU.getMiniHBaseCluster().getRegionServer(1);
-  }
-
-  @AfterClass
-  public static void afterClass() throws Exception {
-    HRegionServer.TEST_SKIP_REPORTING_TRANSITION = false;
-    table.close();
-    HTU.shutdownMiniCluster();
-  }
-
-  @Before
-  public void before() throws Exception {
-    entries.clear();
-  }
-
-  @After
-  public void after() throws Exception {
-  }
-
-  static ConcurrentLinkedQueue<Entry> entries = new ConcurrentLinkedQueue<>();
-
-  public static class WALEditCopro implements WALCoprocessor, WALObserver {
-    public WALEditCopro() {
-      entries.clear();
-    }
-
-    @Override
-    public Optional<WALObserver> getWALObserver() {
-      return Optional.of(this);
-    }
-
-    @Override
-    public void postWALWrite(ObserverContext<? extends WALCoprocessorEnvironment> ctx,
-                             RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {
-      // only keep primary region's edits
-      if (logKey.getTableName().equals(tableName) && info.getReplicaId() == 0) {
-        // Presume type is a WALKeyImpl
-        entries.add(new Entry((WALKeyImpl)logKey, logEdit));
-      }
-    }
-  }
-
-  @Test
-  public void testReplayCallable() throws Exception {
-    // tests replaying the edits to a secondary region replica using the Callable directly
-    openRegion(HTU, rs0, hriSecondary);
-
-    // load some data to primary
-    HTU.loadNumericRows(table, f, 0, 1000);
-
-    Assert.assertEquals(1000, entries.size());
-    try (AsyncClusterConnection conn = ClusterConnectionFactory
-      .createAsyncClusterConnection(HTU.getConfiguration(), null, User.getCurrent())) {
-      // replay the edits to the secondary using replay callable
-      replicateUsingCallable(conn, entries);
-    }
-
-    Region region = rs0.getRegion(hriSecondary.getEncodedName());
-    HTU.verifyNumericRows(region, f, 0, 1000);
-
-    HTU.deleteNumericRows(table, f, 0, 1000);
-    closeRegion(HTU, rs0, hriSecondary);
-  }
-
-  private void replicateUsingCallable(AsyncClusterConnection connection, Queue<Entry> entries)
-      throws IOException, ExecutionException, InterruptedException {
-    Entry entry;
-    while ((entry = entries.poll()) != null) {
-      byte[] row = CellUtil.cloneRow(entry.getEdit().getCells().get(0));
-      RegionLocations locations = connection.getRegionLocations(tableName, row, true).get();
-      connection
-        .replay(tableName, locations.getRegionLocation(1).getRegion().getEncodedNameAsBytes(), row,
-          Collections.singletonList(entry), 1, Integer.MAX_VALUE, TimeUnit.SECONDS.toNanos(10))
-        .get();
-    }
-  }
-
-  @Test
-  public void testReplayCallableWithRegionMove() throws Exception {
-    // tests replaying the edits to a secondary region replica using the Callable directly while
-    // the region is moved to another location.It tests handling of RME.
-    try (AsyncClusterConnection conn = ClusterConnectionFactory
-      .createAsyncClusterConnection(HTU.getConfiguration(), null, User.getCurrent())) {
-      openRegion(HTU, rs0, hriSecondary);
-      // load some data to primary
-      HTU.loadNumericRows(table, f, 0, 1000);
-
-      Assert.assertEquals(1000, entries.size());
-
-      // replay the edits to the secondary using replay callable
-      replicateUsingCallable(conn, entries);
-
-      Region region = rs0.getRegion(hriSecondary.getEncodedName());
-      HTU.verifyNumericRows(region, f, 0, 1000);
-
-      HTU.loadNumericRows(table, f, 1000, 2000); // load some more data to primary
-
-      // move the secondary region from RS0 to RS1
-      closeRegion(HTU, rs0, hriSecondary);
-      openRegion(HTU, rs1, hriSecondary);
-
-      // replicate the new data
-      replicateUsingCallable(conn, entries);
-
-      region = rs1.getRegion(hriSecondary.getEncodedName());
-      // verify the new data. old data may or may not be there
-      HTU.verifyNumericRows(region, f, 1000, 2000);
-
-      HTU.deleteNumericRows(table, f, 0, 2000);
-      closeRegion(HTU, rs1, hriSecondary);
-    }
-  }
-
-  @Test
-  public void testRegionReplicaReplicationEndpointReplicate() throws Exception {
-    // tests replaying the edits to a secondary region replica using the RRRE.replicate()
-    openRegion(HTU, rs0, hriSecondary);
-    RegionReplicaReplicationEndpoint replicator = new RegionReplicaReplicationEndpoint();
-
-    ReplicationEndpoint.Context context = mock(ReplicationEndpoint.Context.class);
-    when(context.getConfiguration()).thenReturn(HTU.getConfiguration());
-    when(context.getMetrics()).thenReturn(mock(MetricsSource.class));
-    when(context.getServer()).thenReturn(rs0);
-    when(context.getTableDescriptors()).thenReturn(rs0.getTableDescriptors());
-    replicator.init(context);
-    replicator.startAsync();
-
-    //load some data to primary
-    HTU.loadNumericRows(table, f, 0, 1000);
-
-    Assert.assertEquals(1000, entries.size());
-    // replay the edits to the secondary using replay callable
-    final String fakeWalGroupId = "fakeWALGroup";
-    replicator.replicate(new ReplicateContext().setEntries(Lists.newArrayList(entries))
-        .setWalGroupId(fakeWalGroupId));
-    replicator.stop();
-    Region region = rs0.getRegion(hriSecondary.getEncodedName());
-    HTU.verifyNumericRows(region, f, 0, 1000);
-
-    HTU.deleteNumericRows(table, f, 0, 1000);
-    closeRegion(HTU, rs0, hriSecondary);
-  }
-}