You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2021/11/21 15:55:30 UTC

[hbase] branch HBASE-26233 updated (aa3d0b4 -> 116f462)

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a change to branch HBASE-26233
in repository https://gitbox.apache.org/repos/asf/hbase.git.


    omit aa3d0b4  HBASE-26457 Should not always clear all the failed replicas when getting a flush all request (#3850)
    omit 42ec40c  HBASE-26448 Make sure we do not flush a region too frequently (#3847)
    omit 086401a  HBASE-26449 The way we add or clear failedReplicas may have race (#3846)
    omit e83e36d  HBASE-26413 Limit the total size of buffered region replication entries (#3844)
    omit e261a54  HBASE-26412 Handle sink failure in RegionReplicationSink (#3815)
    omit 9cada84  HBASE-26407 Introduce a region replication sink for sinking WAL edits to secondary replicas directly (#3807)
     add 7a80b3a  HBASE-26463 Unreadable table names after HBASE-24605 (#3853)
     add 04635f4  HBASE-26467 Fix bug for MemStoreLABImpl.forceCopyOfBigCellInto(Cell) (#3858)
     new 5f1f64e  HBASE-26407 Introduce a region replication sink for sinking WAL edits to secondary replicas directly (#3807)
     new 4d59cb5  HBASE-26412 Handle sink failure in RegionReplicationSink (#3815)
     new 646c93c  HBASE-26413 Limit the total size of buffered region replication entries (#3844)
     new 8083542  HBASE-26449 The way we add or clear failedReplicas may have race (#3846)
     new 2c8696f  HBASE-26448 Make sure we do not flush a region too frequently (#3847)
     new 116f462  HBASE-26457 Should not always clear all the failed replicas when getting a flush all request (#3850)

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (aa3d0b4)
            \
             N -- N -- N   refs/heads/HBASE-26233 (116f462)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../hadoop/hbase/regionserver/MemStoreLABImpl.java |  3 +-
 .../resources/hbase-webapps/static/css/hbase.css   |  5 ++++
 .../TestCompactingToCellFlatMapMemStore.java       | 28 +++++++++++++++++-
 .../hadoop/hbase/regionserver/TestMemStoreLAB.java | 33 +++++++++++++++++++++-
 4 files changed, 65 insertions(+), 4 deletions(-)

[hbase] 03/06: HBASE-26413 Limit the total size of buffered region replication entries (#3844)

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch HBASE-26233
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 646c93c46741c0959cea7eb6190843d17d5c0e72
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Sat Nov 13 10:45:54 2021 +0800

    HBASE-26413 Limit the total size of buffered region replication entries (#3844)
    
    Signed-off-by: GeorryHuang <hu...@apache.org>
---
 .../apache/hadoop/hbase/regionserver/HRegion.java  |  12 +-
 .../hadoop/hbase/regionserver/HRegionServer.java   |  10 +-
 .../hbase/regionserver/RegionServerServices.java   |  12 ++
 .../RegionReplicationBufferManager.java            | 145 +++++++++++++++
 .../RegionReplicationSink.java                     | 106 ++++++++---
 .../hadoop/hbase/regionserver/wal/WALUtil.java     |   2 +-
 .../hadoop/hbase/MockRegionServerServices.java     |  15 +-
 .../hadoop/hbase/master/MockRegionServer.java      |  13 +-
 .../TestRegionReplicationBufferManager.java        | 125 +++++++++++++
 .../TestRegionReplicationSink.java                 | 194 +++++++++++++++++++++
 .../TestMetaRegionReplicaReplication.java          |   5 -
 11 files changed, 595 insertions(+), 44 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 5de22eb..92b8734 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -145,6 +145,7 @@ import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl.Write
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
 import org.apache.hadoop.hbase.regionserver.compactions.ForbidMajorCompactionChecker;
+import org.apache.hadoop.hbase.regionserver.regionreplication.RegionReplicationSink;
 import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
 import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
 import org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector;
@@ -1104,11 +1105,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       return;
     }
     status.setStatus("Initializaing region replication sink");
-    regionReplicationSink = Optional.of(new RegionReplicationSink(conf, regionInfo, td, () -> {
-      rss.getFlushRequester().requestFlush(this, new ArrayList<>(td.getColumnFamilyNames()),
-        FlushLifeCycleTracker.DUMMY);
-    }, rss.getAsyncClusterConnection()));
-
+    regionReplicationSink = Optional.of(new RegionReplicationSink(conf, regionInfo, td,
+      rss.getRegionReplicationBufferManager(), () -> rss.getFlushRequester().requestFlush(this,
+        new ArrayList<>(td.getColumnFamilyNames()), FlushLifeCycleTracker.DUMMY),
+      rss.getAsyncClusterConnection()));
   }
 
   /**
@@ -2491,7 +2491,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     boolean isCompactionNeeded();
   }
 
-  FlushResultImpl flushcache(boolean flushAllStores, boolean writeFlushRequestWalMarker,
+  public FlushResultImpl flushcache(boolean flushAllStores, boolean writeFlushRequestWalMarker,
     FlushLifeCycleTracker tracker) throws IOException {
     List<byte[]> families = null;
     if (flushAllStores) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 4bf2d9c..d81c5b3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -126,6 +126,7 @@ import org.apache.hadoop.hbase.regionserver.handler.RSProcedureHandler;
 import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler;
 import org.apache.hadoop.hbase.regionserver.http.RSDumpServlet;
 import org.apache.hadoop.hbase.regionserver.http.RSStatusServlet;
+import org.apache.hadoop.hbase.regionserver.regionreplication.RegionReplicationBufferManager;
 import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory;
 import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
@@ -459,6 +460,7 @@ public class HRegionServer extends HBaseServerBase<RSRpcServices>
   // A timer to shutdown the process if abort takes too long
   private Timer abortMonitor;
 
+  private RegionReplicationBufferManager regionReplicationBufferManager;
   /**
    * Starts a HRegionServer at the default location.
    * <p/>
@@ -643,6 +645,7 @@ public class HRegionServer extends HBaseServerBase<RSRpcServices>
       initializeZooKeeper();
       setupClusterConnection();
       bootstrapNodeManager = new BootstrapNodeManager(asyncClusterConnection, masterAddressTracker);
+      regionReplicationBufferManager = new RegionReplicationBufferManager(this);
       // Setup RPC client for master communication
       this.rpcClient = asyncClusterConnection.getRpcClient();
     } catch (Throwable t) {
@@ -881,7 +884,7 @@ public class HRegionServer extends HBaseServerBase<RSRpcServices>
       closeUserRegions(abortRequested.get());
       LOG.info("stopping server " + this.serverName);
     }
-
+    regionReplicationBufferManager.stop();
     closeClusterConnection();
     // Closing the compactSplit thread before closing meta regions
     if (!this.killed && containsMetaTableRegions()) {
@@ -3495,4 +3498,9 @@ public class HRegionServer extends HBaseServerBase<RSRpcServices>
     shutdownChore(fsUtilizationChore);
     shutdownChore(slowLogTableOpsChore);
   }
+
+  @Override
+  public RegionReplicationBufferManager getRegionReplicationBufferManager() {
+    return regionReplicationBufferManager;
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
index fce8df1..ec27fb2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
 import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
 import org.apache.hadoop.hbase.quotas.RegionSizeStore;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
+import org.apache.hadoop.hbase.regionserver.regionreplication.RegionReplicationBufferManager;
 import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 import org.apache.hadoop.hbase.security.access.AccessChecker;
 import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher;
@@ -317,4 +318,15 @@ public interface RegionServerServices extends Server, MutableOnlineRegions, Favo
    * @return {@link ZKPermissionWatcher}
    */
   ZKPermissionWatcher getZKPermissionWatcher();
+
+  RegionReplicationBufferManager getRegionReplicationBufferManager();
+
+  @Override
+  HRegion getRegion(String encodedRegionName);
+
+  @Override
+  List<HRegion> getRegions(TableName tableName) throws IOException;
+
+  @Override
+  List<HRegion> getRegions();
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationBufferManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationBufferManager.java
new file mode 100644
index 0000000..bda3cb4
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationBufferManager.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.regionreplication;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * Manager the buffer size for all {@link RegionReplicationSink}.
+ * <p/>
+ * If the buffer size exceeds the soft limit, we will find out the region with largest pending size
+ * and trigger a flush, so it can drop all the pending entries and save memories.
+ * <p/>
+ * If the buffer size exceeds the hard limit, we will return {@code false} for
+ * {@link #increase(long)} and let the {@link RegionReplicationSink} to drop the edits immediately.
+ */
+@InterfaceAudience.Private
+public class RegionReplicationBufferManager {
+
+  private static final Logger LOG = LoggerFactory.getLogger(RegionReplicationBufferManager.class);
+
+  /**
+   * This is the total size of pending entries for all the sinks.
+   */
+  public static final String MAX_PENDING_SIZE = "hbase.region.read-replica.sink.max-pending-size";
+
+  public static final long MAX_PENDING_SIZE_DEFAULT = 100L * 1024 * 1024;
+
+  public static final String SOFT_LIMIT_PERCENTAGE =
+    "hbase.region.read-replica.sink.max-pending-size.soft-limit-percentage";
+
+  public static final float SOFT_LIMIT_PERCENTAGE_DEFAULT = 0.8f;
+
+  private final RegionServerServices rsServices;
+
+  private final long maxPendingSize;
+
+  private final long softMaxPendingSize;
+
+  private final AtomicLong pendingSize = new AtomicLong();
+
+  private final ThreadPoolExecutor executor;
+
+  public RegionReplicationBufferManager(RegionServerServices rsServices) {
+    this.rsServices = rsServices;
+    Configuration conf = rsServices.getConfiguration();
+    this.maxPendingSize = conf.getLong(MAX_PENDING_SIZE, MAX_PENDING_SIZE_DEFAULT);
+    this.softMaxPendingSize =
+      (long) (conf.getFloat(SOFT_LIMIT_PERCENTAGE, SOFT_LIMIT_PERCENTAGE_DEFAULT) * maxPendingSize);
+    this.executor = new ThreadPoolExecutor(
+      1, 1, 1, TimeUnit.SECONDS, new SynchronousQueue<>(), new ThreadFactoryBuilder()
+        .setDaemon(true).setNameFormat("Region-Replication-Flusher-%d").build(),
+      (r, e) -> LOG.debug("A flush task is ongoing, drop the new scheduled one"));
+    executor.allowCoreThreadTimeOut(true);
+  }
+
+  private void flush() {
+    long max = Long.MIN_VALUE;
+    HRegion toFlush = null;
+    for (HRegion region : rsServices.getRegions()) {
+      Optional<RegionReplicationSink> sink = region.getRegionReplicationSink();
+      if (sink.isPresent()) {
+        RegionReplicationSink s = sink.get();
+        long p = s.pendingSize();
+        if (p > max) {
+          max = p;
+          toFlush = region;
+        }
+      }
+    }
+    if (toFlush != null) {
+      // here we need to write flush marker out, so we can drop all the pending edits in the region
+      // replication sink.
+      try {
+        LOG.info("Going to flush {} with {} pending entry size", toFlush.getRegionInfo(),
+          StringUtils.TraditionalBinaryPrefix.long2String(max, "", 1));
+        FlushResult result = toFlush.flushcache(true, true, FlushLifeCycleTracker.DUMMY);
+        if (!result.isFlushSucceeded()) {
+          LOG.warn("Failed to flush {}, the result is {}", toFlush.getRegionInfo(),
+            result.getResult());
+        }
+      } catch (IOException e) {
+        LOG.warn("Failed to flush {}", toFlush.getRegionInfo(), e);
+      }
+    } else {
+      // usually this should not happen but since the flush operation is async, theoretically it
+      // could happen. Let's log it, no real harm.
+      LOG.warn("Can not find a region to flush");
+    }
+  }
+
+  /**
+   * Return whether we should just drop all the edits, if we have reached the hard limit of max
+   * pending size.
+   * @return {@code true} means OK, {@code false} means drop all the edits.
+   */
+  public boolean increase(long size) {
+    long sz = pendingSize.addAndGet(size);
+    if (sz > softMaxPendingSize) {
+      executor.execute(this::flush);
+    }
+    return sz <= maxPendingSize;
+  }
+
+  /**
+   * Called after you ship the edits out.
+   */
+  public void decrease(long size) {
+    pendingSize.addAndGet(-size);
+  }
+
+  public void stop() {
+    executor.shutdown();
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java
similarity index 74%
rename from hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionReplicationSink.java
rename to hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java
index cdd77e8..9c6f6e2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionReplicationSink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hbase.regionserver;
+package org.apache.hadoop.hbase.regionserver.regionreplication;
 
 import java.io.IOException;
 import java.util.ArrayDeque;
@@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.util.FutureUtils;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.hbase.wal.WALKeyImpl;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -60,10 +61,6 @@ public class RegionReplicationSink {
 
   private static final Logger LOG = LoggerFactory.getLogger(RegionReplicationSink.class);
 
-  public static final String MAX_PENDING_SIZE = "hbase.region.read-replica.sink.max-pending-size";
-
-  public static final long MAX_PENDING_SIZE_DEFAULT = 10L * 1024 * 1024;
-
   public static final String RETRIES_NUMBER = "hbase.region.read-replica.sink.retries.number";
 
   public static final int RETRIES_NUMBER_DEFAULT = 3;
@@ -85,10 +82,13 @@ public class RegionReplicationSink {
 
     final ServerCall<?> rpcCall;
 
+    final long size;
+
     SinkEntry(WALKeyImpl key, WALEdit edit, ServerCall<?> rpcCall) {
       this.key = key;
       this.edit = edit;
       this.rpcCall = rpcCall;
+      this.size = key.estimatedSerializedSizeOf() + edit.estimatedSerializedSizeOf();
       if (rpcCall != null) {
         // increase the reference count to avoid the rpc framework free the memory before we
         // actually sending them out.
@@ -112,6 +112,11 @@ public class RegionReplicationSink {
 
   private final TableDescriptor tableDesc;
 
+  // store it here to avoid passing it every time when calling TableDescriptor.getRegionReplication.
+  private final int regionReplication;
+
+  private final RegionReplicationBufferManager manager;
+
   private final Runnable flushRequester;
 
   private final AsyncClusterConnection conn;
@@ -128,20 +133,24 @@ public class RegionReplicationSink {
 
   private final long operationTimeoutNs;
 
+  private volatile long pendingSize;
+
   private boolean sending;
 
   private boolean stopping;
 
   private boolean stopped;
 
-  RegionReplicationSink(Configuration conf, RegionInfo primary, TableDescriptor td,
-    Runnable flushRequester, AsyncClusterConnection conn) {
+  public RegionReplicationSink(Configuration conf, RegionInfo primary, TableDescriptor td,
+    RegionReplicationBufferManager manager, Runnable flushRequester, AsyncClusterConnection conn) {
     Preconditions.checkArgument(RegionReplicaUtil.isDefaultReplica(primary), "%s is not primary",
       primary);
-    Preconditions.checkArgument(td.getRegionReplication() > 1,
-      "region replication should be greater than 1 but got %s", td.getRegionReplication());
+    this.regionReplication = td.getRegionReplication();
+    Preconditions.checkArgument(this.regionReplication > 1,
+      "region replication should be greater than 1 but got %s", this.regionReplication);
     this.primary = primary;
     this.tableDesc = td;
+    this.manager = manager;
     this.flushRequester = flushRequester;
     this.conn = conn;
     this.retries = conf.getInt(RETRIES_NUMBER, RETRIES_NUMBER_DEFAULT);
@@ -153,7 +162,12 @@ public class RegionReplicationSink {
 
   private void onComplete(List<SinkEntry> sent,
     Map<Integer, MutableObject<Throwable>> replica2Error) {
-    sent.forEach(SinkEntry::replicated);
+    long toReleaseSize = 0;
+    for (SinkEntry entry : sent) {
+      entry.replicated();
+      toReleaseSize += entry.size;
+    }
+    manager.decrease(toReleaseSize);
     Set<Integer> failed = new HashSet<>();
     for (Map.Entry<Integer, MutableObject<Throwable>> entry : replica2Error.entrySet()) {
       Integer replicaId = entry.getKey();
@@ -165,6 +179,7 @@ public class RegionReplicationSink {
       }
     }
     synchronized (entries) {
+      pendingSize -= toReleaseSize;
       if (!failed.isEmpty()) {
         failedReplicas.addAll(failed);
         flushRequester.run();
@@ -190,7 +205,7 @@ public class RegionReplicationSink {
       }
       toSend.add(entry);
     }
-    int toSendReplicaCount = tableDesc.getRegionReplication() - 1 - failedReplicas.size();
+    int toSendReplicaCount = regionReplication - 1 - failedReplicas.size();
     if (toSendReplicaCount <= 0) {
       return;
     }
@@ -199,7 +214,7 @@ public class RegionReplicationSink {
       toSend.stream().map(e -> new WAL.Entry(e.key, e.edit)).collect(Collectors.toList());
     AtomicInteger remaining = new AtomicInteger(toSendReplicaCount);
     Map<Integer, MutableObject<Throwable>> replica2Error = new HashMap<>();
-    for (int replicaId = 1; replicaId < tableDesc.getRegionReplication(); replicaId++) {
+    for (int replicaId = 1; replicaId < regionReplication; replicaId++) {
       MutableObject<Throwable> error = new MutableObject<>();
       replica2Error.put(replicaId, error);
       RegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(primary, replicaId);
@@ -223,6 +238,17 @@ public class RegionReplicationSink {
     return storesFlushed.containsAll(tableDesc.getColumnFamilyNames());
   }
 
+  private void clearAllEntries() {
+    long toClearSize = 0;
+    for (SinkEntry entry : entries) {
+      toClearSize += entry.size;
+      entry.replicated();
+    }
+    entries.clear();
+    pendingSize -= toClearSize;
+    manager.decrease(toClearSize);
+  }
+
   /**
    * Add this edit to replication queue.
    * <p/>
@@ -251,31 +277,67 @@ public class RegionReplicationSink {
               continue;
             }
             if (flushDesc != null && flushAllStores(flushDesc)) {
-              LOG.debug("Got a flush all request, clear failed replicas {} and {} pending" +
-                " replication entries", failedReplicas, entries.size());
-              entries.clear();
+              int toClearCount = 0;
+              long toClearSize = 0;
+              for (;;) {
+                SinkEntry e = entries.peek();
+                if (e == null) {
+                  break;
+                }
+                if (e.key.getSequenceId() < flushDesc.getFlushSequenceNumber()) {
+                  entries.poll();
+                  toClearCount++;
+                  toClearSize += e.size;
+                } else {
+                  break;
+                }
+              }
               failedReplicas.clear();
+              LOG.debug(
+                "Got a flush all request with sequence id {}, clear failed replicas {}" +
+                  " and {} pending entries with size {}",
+                flushDesc.getFlushSequenceNumber(), failedReplicas, toClearCount,
+                StringUtils.TraditionalBinaryPrefix.long2String(toClearSize, "", 1));
             }
           }
         }
       }
-      // TODO: limit the total cached entries here, and we should have a global limitation, not for
-      // only this region.
-      entries.add(new SinkEntry(key, edit, rpcCall));
-      if (!sending) {
-        send();
+      if (failedReplicas.size() == regionReplication - 1) {
+        // this means we have marked all the replicas as failed, so just give up here
+        return;
+      }
+      SinkEntry entry = new SinkEntry(key, edit, rpcCall);
+      entries.add(entry);
+      pendingSize += entry.size;
+      if (manager.increase(entry.size)) {
+        if (!sending) {
+          send();
+        }
+      } else {
+        // we have run out of the max pending size, drop all the edits, and mark all replicas as
+        // failed
+        clearAllEntries();
+        for (int replicaId = 1; replicaId < regionReplication; replicaId++) {
+          failedReplicas.add(replicaId);
+        }
+        flushRequester.run();
       }
     }
   }
 
+  long pendingSize() {
+    return pendingSize;
+  }
+
   /**
    * Stop the replication sink.
    * <p/>
    * Usually this should only be called when you want to close a region.
    */
-  void stop() {
+  public void stop() {
     synchronized (entries) {
       stopping = true;
+      clearAllEntries();
       if (!sending) {
         stopped = true;
         entries.notifyAll();
@@ -291,7 +353,7 @@ public class RegionReplicationSink {
    * <p/>
    * This is used to keep the replicating order the same with the WAL edit order when writing.
    */
-  void waitUntilStopped() throws InterruptedException {
+  public void waitUntilStopped() throws InterruptedException {
     synchronized (entries) {
       while (!stopped) {
         entries.wait();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java
index 15be95e..2076dd4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java
@@ -31,7 +31,7 @@ import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.ipc.RpcServer;
 import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
 import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl.WriteEntry;
-import org.apache.hadoop.hbase.regionserver.RegionReplicationSink;
+import org.apache.hadoop.hbase.regionserver.regionreplication.RegionReplicationSink;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.wal.WAL;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
index 82f7a85..ebe6edd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
@@ -47,13 +47,13 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HeapMemoryManager;
 import org.apache.hadoop.hbase.regionserver.LeaseManager;
 import org.apache.hadoop.hbase.regionserver.MetricsRegionServer;
-import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionServerAccounting;
 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
 import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
 import org.apache.hadoop.hbase.regionserver.SecureBulkLoadManager;
 import org.apache.hadoop.hbase.regionserver.ServerNonceManager;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
+import org.apache.hadoop.hbase.regionserver.regionreplication.RegionReplicationBufferManager;
 import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 import org.apache.hadoop.hbase.security.access.AccessChecker;
 import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher;
@@ -72,7 +72,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
  */
 public class MockRegionServerServices implements RegionServerServices {
   protected static final Logger LOG = LoggerFactory.getLogger(MockRegionServerServices.class);
-  private final Map<String, Region> regions = new HashMap<>();
+  private final Map<String, HRegion> regions = new HashMap<>();
   private final ConcurrentSkipListMap<byte[], Boolean> rit =
     new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
   private HFileSystem hfs = null;
@@ -108,17 +108,17 @@ public class MockRegionServerServices implements RegionServerServices {
   }
 
   @Override
-  public Region getRegion(String encodedRegionName) {
+  public HRegion getRegion(String encodedRegionName) {
     return this.regions.get(encodedRegionName);
   }
 
   @Override
-  public List<Region> getRegions(TableName tableName) throws IOException {
+  public List<HRegion> getRegions(TableName tableName) throws IOException {
     return null;
   }
 
   @Override
-  public List<Region> getRegions() {
+  public List<HRegion> getRegions() {
     return null;
   }
 
@@ -379,4 +379,9 @@ public class MockRegionServerServices implements RegionServerServices {
   public AsyncClusterConnection getAsyncClusterConnection() {
     return null;
   }
+
+  @Override
+  public RegionReplicationBufferManager getRegionReplicationBufferManager() {
+    return null;
+  }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
index 69a7a79..56813af 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
@@ -61,17 +61,16 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HeapMemoryManager;
 import org.apache.hadoop.hbase.regionserver.LeaseManager;
 import org.apache.hadoop.hbase.regionserver.MetricsRegionServer;
-import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionServerAccounting;
 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
 import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
 import org.apache.hadoop.hbase.regionserver.SecureBulkLoadManager;
 import org.apache.hadoop.hbase.regionserver.ServerNonceManager;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
+import org.apache.hadoop.hbase.regionserver.regionreplication.RegionReplicationBufferManager;
 import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 import org.apache.hadoop.hbase.security.access.AccessChecker;
 import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
@@ -138,6 +137,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBul
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
 
@@ -463,7 +463,7 @@ class MockRegionServer implements AdminProtos.AdminService.BlockingInterface,
   }
 
   @Override
-  public List<Region> getRegions() {
+  public List<HRegion> getRegions() {
     return null;
   }
 
@@ -527,7 +527,7 @@ class MockRegionServer implements AdminProtos.AdminService.BlockingInterface,
   }
 
   @Override
-  public List<Region> getRegions(TableName tableName) throws IOException {
+  public List<HRegion> getRegions(TableName tableName) throws IOException {
     return null;
   }
 
@@ -750,4 +750,9 @@ class MockRegionServer implements AdminProtos.AdminService.BlockingInterface,
   public AsyncClusterConnection getAsyncClusterConnection() {
     return null;
   }
+
+  @Override
+  public RegionReplicationBufferManager getRegionReplicationBufferManager() {
+    return null;
+  }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationBufferManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationBufferManager.java
new file mode 100644
index 0000000..8b56d09
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationBufferManager.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.regionreplication;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegion.FlushResultImpl;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ RegionServerTests.class, MediumTests.class })
+public class TestRegionReplicationBufferManager {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestRegionReplicationBufferManager.class);
+
+  private Configuration conf;
+
+  private RegionServerServices rsServices;
+
+  private RegionReplicationBufferManager manager;
+
+  @Before
+  public void setUp() {
+    conf = HBaseConfiguration.create();
+    rsServices = mock(RegionServerServices.class);
+    when(rsServices.getConfiguration()).thenReturn(conf);
+  }
+
+  @After
+  public void tearDown() {
+    if (manager != null) {
+      manager.stop();
+    }
+  }
+
+  private HRegion mockRegion(RegionInfo regionInfo, long pendingSize) throws IOException {
+    HRegion region = mock(HRegion.class);
+    when(region.getRegionInfo()).thenReturn(regionInfo);
+    if (pendingSize < 0) {
+      when(region.getRegionReplicationSink()).thenReturn(Optional.empty());
+    } else {
+      RegionReplicationSink sink = mock(RegionReplicationSink.class);
+      when(sink.pendingSize()).thenReturn(pendingSize);
+      when(region.getRegionReplicationSink()).thenReturn(Optional.of(sink));
+    }
+    return region;
+  }
+
+  @Test
+  public void testScheduleFlush() throws IOException, InterruptedException {
+    conf.setLong(RegionReplicationBufferManager.MAX_PENDING_SIZE, 1024 * 1024);
+    manager = new RegionReplicationBufferManager(rsServices);
+    RegionInfo info1 = RegionInfoBuilder.newBuilder(TableName.valueOf("info1")).build();
+    RegionInfo info2 = RegionInfoBuilder.newBuilder(TableName.valueOf("info2")).build();
+    HRegion region1 = mockRegion(info1, 1000);
+    HRegion region2 = mockRegion(info2, 10000);
+    when(rsServices.getRegions()).thenReturn(Arrays.asList(region1, region2));
+    CountDownLatch arrive = new CountDownLatch(1);
+    CountDownLatch resume = new CountDownLatch(1);
+    when(region2.flushcache(anyBoolean(), anyBoolean(), any())).then(i -> {
+      arrive.countDown();
+      resume.await();
+      FlushResultImpl result = mock(FlushResultImpl.class);
+      when(result.isFlushSucceeded()).thenReturn(true);
+      return result;
+    });
+    // hit the soft limit, should trigger a flush
+    assertTrue(manager.increase(1000 * 1024));
+    arrive.await();
+
+    // we should have called getRegions once to find the region to flush
+    verify(rsServices, times(1)).getRegions();
+
+    // hit the hard limit, but since the background thread is running as we haven't call the
+    // resume.countDown yet, the schedule of the new background flush task should be discard
+    // silently.
+    assertFalse(manager.increase(100 * 1024));
+    resume.countDown();
+
+    // wait several seconds and then check the getRegions call, we should not call it second time
+    Thread.sleep(2000);
+    verify(rsServices, times(1)).getRegions();
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSink.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSink.java
new file mode 100644
index 0000000..19b1698
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSink.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.regionreplication;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableNameTestRule;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.ipc.ServerCall;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALKeyImpl;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ RegionServerTests.class, MediumTests.class })
+public class TestRegionReplicationSink {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestRegionReplicationSink.class);
+
+  private Configuration conf;
+
+  private TableDescriptor td;
+
+  private RegionInfo primary;
+
+  private Runnable flushRequester;
+
+  private AsyncClusterConnection conn;
+
+  private RegionReplicationBufferManager manager;
+
+  @Rule
+  public final TableNameTestRule name = new TableNameTestRule();
+
+  @Before
+  public void setUp() {
+    conf = HBaseConfiguration.create();
+    td = TableDescriptorBuilder.newBuilder(name.getTableName())
+      .setColumnFamily(ColumnFamilyDescriptorBuilder.of("cf")).setRegionReplication(3).build();
+    primary = RegionInfoBuilder.newBuilder(name.getTableName()).build();
+    flushRequester = mock(Runnable.class);
+    conn = mock(AsyncClusterConnection.class);
+    manager = mock(RegionReplicationBufferManager.class);
+  }
+
+  private RegionReplicationSink create() {
+    return new RegionReplicationSink(conf, primary, td, manager, flushRequester, conn);
+  }
+
+  @Test
+  public void testNormal() {
+    RegionReplicationSink sink = create();
+    MutableInt next = new MutableInt(0);
+    List<CompletableFuture<Void>> futures =
+      Arrays.asList(new CompletableFuture<>(), new CompletableFuture<>());
+    when(conn.replicate(any(), anyList(), anyInt(), anyLong(), anyLong()))
+      .then(i -> futures.get(next.getAndIncrement()));
+    ServerCall<?> rpcCall = mock(ServerCall.class);
+    WALKeyImpl key = mock(WALKeyImpl.class);
+    when(key.estimatedSerializedSizeOf()).thenReturn(100L);
+    WALEdit edit = mock(WALEdit.class);
+    when(edit.estimatedSerializedSizeOf()).thenReturn(1000L);
+    when(manager.increase(anyLong())).thenReturn(true);
+
+    sink.add(key, edit, rpcCall);
+    // should call increase on manager
+    verify(manager, times(1)).increase(anyLong());
+    // should have been retained
+    verify(rpcCall, times(1)).retainByWAL();
+    assertEquals(1100, sink.pendingSize());
+
+    futures.get(0).complete(null);
+    // should not call decrease yet
+    verify(manager, never()).decrease(anyLong());
+    // should not call release yet
+    verify(rpcCall, never()).releaseByWAL();
+    assertEquals(1100, sink.pendingSize());
+
+    futures.get(1).complete(null);
+    // should call decrease
+    verify(manager, times(1)).decrease(anyLong());
+    // should call release
+    verify(rpcCall, times(1)).releaseByWAL();
+    assertEquals(0, sink.pendingSize());
+  }
+
+  @Test
+  public void testDropEdits() {
+    RegionReplicationSink sink = create();
+    MutableInt next = new MutableInt(0);
+    List<CompletableFuture<Void>> futures =
+      Arrays.asList(new CompletableFuture<>(), new CompletableFuture<>());
+    when(conn.replicate(any(), anyList(), anyInt(), anyLong(), anyLong()))
+      .then(i -> futures.get(next.getAndIncrement()));
+    ServerCall<?> rpcCall1 = mock(ServerCall.class);
+    WALKeyImpl key1 = mock(WALKeyImpl.class);
+    when(key1.estimatedSerializedSizeOf()).thenReturn(100L);
+    WALEdit edit1 = mock(WALEdit.class);
+    when(edit1.estimatedSerializedSizeOf()).thenReturn(1000L);
+    when(manager.increase(anyLong())).thenReturn(true);
+
+    sink.add(key1, edit1, rpcCall1);
+    verify(manager, times(1)).increase(anyLong());
+    verify(manager, never()).decrease(anyLong());
+    verify(rpcCall1, times(1)).retainByWAL();
+    assertEquals(1100, sink.pendingSize());
+
+    ServerCall<?> rpcCall2 = mock(ServerCall.class);
+    WALKeyImpl key2 = mock(WALKeyImpl.class);
+    when(key2.estimatedSerializedSizeOf()).thenReturn(200L);
+    WALEdit edit2 = mock(WALEdit.class);
+    when(edit2.estimatedSerializedSizeOf()).thenReturn(2000L);
+
+    sink.add(key2, edit2, rpcCall2);
+    verify(manager, times(2)).increase(anyLong());
+    verify(manager, never()).decrease(anyLong());
+    verify(rpcCall2, times(1)).retainByWAL();
+    assertEquals(3300, sink.pendingSize());
+
+    ServerCall<?> rpcCall3 = mock(ServerCall.class);
+    WALKeyImpl key3 = mock(WALKeyImpl.class);
+    when(key3.estimatedSerializedSizeOf()).thenReturn(200L);
+    WALEdit edit3 = mock(WALEdit.class);
+    when(edit3.estimatedSerializedSizeOf()).thenReturn(3000L);
+    when(manager.increase(anyLong())).thenReturn(false);
+
+    // should not buffer this edit
+    sink.add(key3, edit3, rpcCall3);
+    verify(manager, times(3)).increase(anyLong());
+    verify(manager, times(1)).decrease(anyLong());
+    // should retain and then release immediately
+    verify(rpcCall3, times(1)).retainByWAL();
+    verify(rpcCall3, times(1)).releaseByWAL();
+    // should also clear the pending edit
+    verify(rpcCall2, times(1)).releaseByWAL();
+    assertEquals(1100, sink.pendingSize());
+    // should have request flush
+    verify(flushRequester, times(1)).run();
+
+    // finish the replication for first edit, we should decrease the size, release the rpc call,and
+    // the pendingSize should be 0 as there are no pending entries
+    futures.forEach(f -> f.complete(null));
+    verify(manager, times(2)).decrease(anyLong());
+    verify(rpcCall1, times(1)).releaseByWAL();
+    assertEquals(0, sink.pendingSize());
+
+    // should only call replicate 2 times for replicating the first edit, as we have 2 secondary
+    // replicas
+    verify(conn, times(2)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong());
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplication.java
index b501ab2..a4da640 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplication.java
@@ -86,11 +86,6 @@ public class TestMetaRegionReplicaReplication {
   @Before
   public void before() throws Exception {
     Configuration conf = HTU.getConfiguration();
-    conf.setFloat("hbase.regionserver.logroll.multiplier", 0.0003f);
-    conf.setInt("replication.source.size.capacity", 10240);
-    conf.setLong("replication.source.sleepforretries", 100);
-    conf.setInt("hbase.regionserver.maxlogs", 10);
-    conf.setLong("hbase.master.logcleaner.ttl", 10);
     conf.setInt("zookeeper.recovery.retry", 1);
     conf.setInt("zookeeper.recovery.retry.intervalmill", 10);
     conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);

[hbase] 05/06: HBASE-26448 Make sure we do not flush a region too frequently (#3847)

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch HBASE-26233
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 2c8696f49482277fa096e999edb01695b968b3b5
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Tue Nov 16 00:06:54 2021 +0800

    HBASE-26448 Make sure we do not flush a region too frequently (#3847)
    
    Signed-off-by: Xiaolin Ha <ha...@apache.org>
---
 .../RegionReplicationFlushRequester.java           | 142 +++++++++++++++++++++
 .../regionreplication/RegionReplicationSink.java   |  24 ++--
 .../TestRegionReplicationFlushRequester.java       |  84 ++++++++++++
 3 files changed, 239 insertions(+), 11 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationFlushRequester.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationFlushRequester.java
new file mode 100644
index 0000000..960f57e
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationFlushRequester.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.regionreplication;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
+import org.apache.hbase.thirdparty.io.netty.util.Timeout;
+
+/**
+ * A helper class for requesting flush on a given region.
+ * <p/>
+ * In general, we do not want to trigger flush too frequently for a region, so here we will add
+ * something like a rate control, i.e, the interval of the two flush request should not be too
+ * small.
+ */
+@InterfaceAudience.Private
+class RegionReplicationFlushRequester {
+
+  /**
+   * The timer for executing delayed flush request task.
+   * <p/>
+   * It will be shared across all the instances {@link RegionReplicationFlushRequester}. Created on
+   * demand to save one extra thread as not every user uses region replication.
+   */
+  private static volatile HashedWheelTimer TIMER;
+
+  /**
+   * The minimum interval between two flush requests
+   */
+  public static final String MIN_INTERVAL_SECS =
+    "hbase.region.read-replica.sink.flush.min-interval.secs";
+
+  public static final int MIN_INTERVAL_SECS_DEFAULT = 30;
+
+  private final Runnable flushRequester;
+
+  private final long minIntervalSecs;
+
+  private long lastRequestNanos;
+
+  private long pendingFlushRequestSequenceId;
+
+  private long lastFlushedSequenceId;
+
+  private Timeout pendingFlushRequest;
+
+  RegionReplicationFlushRequester(Configuration conf, Runnable flushRequester) {
+    this.flushRequester = flushRequester;
+    this.minIntervalSecs = conf.getInt(MIN_INTERVAL_SECS, MIN_INTERVAL_SECS_DEFAULT);
+  }
+
+  private static HashedWheelTimer getTimer() {
+    HashedWheelTimer timer = TIMER;
+    if (timer != null) {
+      return timer;
+    }
+    synchronized (RegionReplicationFlushRequester.class) {
+      timer = TIMER;
+      if (timer != null) {
+        return timer;
+      }
+      timer = new HashedWheelTimer(
+        new ThreadFactoryBuilder().setNameFormat("RegionReplicationFlushRequester-Timer-pool-%d")
+          .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(),
+        500, TimeUnit.MILLISECONDS);
+      TIMER = timer;
+    }
+    return timer;
+  }
+
+  private void request() {
+    flushRequester.run();
+    lastRequestNanos = System.nanoTime();
+  }
+
+  private synchronized void flush(Timeout timeout) {
+    pendingFlushRequest = null;
+    if (pendingFlushRequestSequenceId >= lastFlushedSequenceId) {
+      request();
+    }
+  }
+
+  /**
+   * Request a flush for the given region.
+   * <p/>
+   * The sequence id of the edit which we fail to replicate. A flush must happen after this sequence
+   * id to recover the failure.
+   */
+  synchronized void requestFlush(long sequenceId) {
+    // if there is already a flush task, just reuse it.
+    if (pendingFlushRequest != null) {
+      pendingFlushRequestSequenceId = Math.max(sequenceId, pendingFlushRequestSequenceId);
+      return;
+    }
+    // check last flush time
+    long elapsedSecs = TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - lastRequestNanos);
+    if (elapsedSecs >= minIntervalSecs) {
+      request();
+      return;
+    }
+    // schedule a timer task
+    HashedWheelTimer timer = getTimer();
+    pendingFlushRequest =
+      timer.newTimeout(this::flush, minIntervalSecs - elapsedSecs, TimeUnit.SECONDS);
+  }
+
+  /**
+   * Record that we have already finished a flush with the given {@code sequenceId}.
+   * <p/>
+   * We can cancel the pending flush request if the failed sequence id is less than the given
+   * {@code sequenceId}.
+   */
+  synchronized void recordFlush(long sequenceId) {
+    this.lastFlushedSequenceId = sequenceId;
+    // cancel the pending flush request if it is necessary, i.e, we have already finished a flush
+    // with higher sequence id.
+    if (sequenceId > pendingFlushRequestSequenceId && pendingFlushRequest != null) {
+      pendingFlushRequest.cancel();
+      pendingFlushRequest = null;
+    }
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java
index 68aa508..9095870 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java
@@ -118,7 +118,7 @@ public class RegionReplicationSink {
 
   private final RegionReplicationBufferManager manager;
 
-  private final Runnable flushRequester;
+  private final RegionReplicationFlushRequester flushRequester;
 
   private final AsyncClusterConnection conn;
 
@@ -136,7 +136,7 @@ public class RegionReplicationSink {
 
   private volatile long pendingSize;
 
-  private long lastFlushSequenceNumber;
+  private long lastFlushedSequenceId;
 
   private boolean sending;
 
@@ -154,7 +154,7 @@ public class RegionReplicationSink {
     this.primary = primary;
     this.tableDesc = td;
     this.manager = manager;
-    this.flushRequester = flushRequester;
+    this.flushRequester = new RegionReplicationFlushRequester(conf, flushRequester);
     this.conn = conn;
     this.retries = conf.getInt(RETRIES_NUMBER, RETRIES_NUMBER_DEFAULT);
     this.rpcTimeoutNs =
@@ -178,19 +178,19 @@ public class RegionReplicationSink {
       Integer replicaId = entry.getKey();
       Throwable error = entry.getValue().getValue();
       if (error != null) {
-        if (maxSequenceId > lastFlushSequenceNumber) {
+        if (maxSequenceId > lastFlushedSequenceId) {
           LOG.warn(
             "Failed to replicate to secondary replica {} for {}, since the max sequence" +
               " id of sunk entris is {}, which is greater than the last flush SN {}," +
               " we will stop replicating for a while and trigger a flush",
-            replicaId, primary, maxSequenceId, lastFlushSequenceNumber, error);
+            replicaId, primary, maxSequenceId, lastFlushedSequenceId, error);
           failed.add(replicaId);
         } else {
           LOG.warn(
             "Failed to replicate to secondary replica {} for {}, since the max sequence" +
               " id of sunk entris is {}, which is less than or equal to the last flush SN {}," +
               " we will not stop replicating",
-            replicaId, primary, maxSequenceId, lastFlushSequenceNumber, error);
+            replicaId, primary, maxSequenceId, lastFlushedSequenceId, error);
         }
       }
     }
@@ -198,7 +198,7 @@ public class RegionReplicationSink {
       pendingSize -= toReleaseSize;
       if (!failed.isEmpty()) {
         failedReplicas.addAll(failed);
-        flushRequester.run();
+        flushRequester.requestFlush(maxSequenceId);
       }
       sending = false;
       if (stopping) {
@@ -296,6 +296,7 @@ public class RegionReplicationSink {
               continue;
             }
             if (flushDesc != null && flushAllStores(flushDesc)) {
+              long flushedSequenceId = flushDesc.getFlushSequenceNumber();
               int toClearCount = 0;
               long toClearSize = 0;
               for (;;) {
@@ -303,7 +304,7 @@ public class RegionReplicationSink {
                 if (e == null) {
                   break;
                 }
-                if (e.key.getSequenceId() < flushDesc.getFlushSequenceNumber()) {
+                if (e.key.getSequenceId() < flushedSequenceId) {
                   entries.poll();
                   toClearCount++;
                   toClearSize += e.size;
@@ -311,13 +312,14 @@ public class RegionReplicationSink {
                   break;
                 }
               }
-              lastFlushSequenceNumber = flushDesc.getFlushSequenceNumber();
+              lastFlushedSequenceId = flushedSequenceId;
               failedReplicas.clear();
               LOG.debug(
                 "Got a flush all request with sequence id {}, clear failed replicas {}" +
                   " and {} pending entries with size {}",
-                flushDesc.getFlushSequenceNumber(), failedReplicas, toClearCount,
+                flushedSequenceId, failedReplicas, toClearCount,
                 StringUtils.TraditionalBinaryPrefix.long2String(toClearSize, "", 1));
+              flushRequester.recordFlush(flushedSequenceId);
             }
           }
         }
@@ -340,7 +342,7 @@ public class RegionReplicationSink {
         for (int replicaId = 1; replicaId < regionReplication; replicaId++) {
           failedReplicas.add(replicaId);
         }
-        flushRequester.run();
+        flushRequester.requestFlush(entry.key.getSequenceId());
       }
     }
   }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationFlushRequester.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationFlushRequester.java
new file mode 100644
index 0000000..abe5aa1
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationFlushRequester.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.regionreplication;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ RegionServerTests.class, MediumTests.class })
+public class TestRegionReplicationFlushRequester {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestRegionReplicationFlushRequester.class);
+
+  private Configuration conf;
+
+  private Runnable requester;
+
+  private RegionReplicationFlushRequester flushRequester;
+
+  @Before
+  public void setUp() {
+    conf = HBaseConfiguration.create();
+    conf.setInt(RegionReplicationFlushRequester.MIN_INTERVAL_SECS, 1);
+    requester = mock(Runnable.class);
+    flushRequester = new RegionReplicationFlushRequester(conf, requester);
+  }
+
+  @Test
+  public void testRequest() throws InterruptedException {
+    // should call request directly
+    flushRequester.requestFlush(100L);
+    verify(requester, times(1)).run();
+
+    // should not call request directly, since the min interval is 1 second
+    flushRequester.requestFlush(200L);
+    verify(requester, times(1)).run();
+    Thread.sleep(2000);
+    verify(requester, times(2)).run();
+
+    // should call request directly because we have already elapsed more than 1 second
+    Thread.sleep(2000);
+    flushRequester.requestFlush(300L);
+    verify(requester, times(3)).run();
+  }
+
+  @Test
+  public void testCancelFlushRequest() throws InterruptedException {
+    flushRequester.requestFlush(100L);
+    flushRequester.requestFlush(200L);
+    verify(requester, times(1)).run();
+
+    // the pending flush request should be canceled
+    flushRequester.recordFlush(300L);
+    Thread.sleep(2000);
+    verify(requester, times(1)).run();
+  }
+}

[hbase] 02/06: HBASE-26412 Handle sink failure in RegionReplicationSink (#3815)

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch HBASE-26233
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 4d59cb5d1f3eea5f457958d9b11bc25b0b1c6a9e
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Tue Nov 9 21:41:25 2021 +0800

    HBASE-26412 Handle sink failure in RegionReplicationSink (#3815)
    
    Signed-off-by: GeorryHuang <hu...@apache.org>
---
 .../apache/hadoop/hbase/regionserver/HRegion.java  |   7 +-
 .../hbase/regionserver/RegionReplicationSink.java  | 147 ++++++++++++++-----
 .../TestRegionReplicaReplicationError.java         | 158 +++++++++++++++++++++
 .../regionserver/TestRegionReplicaReplication.java |   4 +-
 4 files changed, 275 insertions(+), 41 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index c611a8a..5de22eb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -1104,8 +1104,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       return;
     }
     status.setStatus("Initializaing region replication sink");
-    regionReplicationSink = Optional.of(new RegionReplicationSink(conf, regionInfo,
-      regionReplication, td.hasRegionMemStoreReplication(), rss.getAsyncClusterConnection()));
+    regionReplicationSink = Optional.of(new RegionReplicationSink(conf, regionInfo, td, () -> {
+      rss.getFlushRequester().requestFlush(this, new ArrayList<>(td.getColumnFamilyNames()),
+        FlushLifeCycleTracker.DUMMY);
+    }, rss.getAsyncClusterConnection()));
+
   }
 
   /**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionReplicationSink.java
index 6911289..cdd77e8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionReplicationSink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionReplicationSink.java
@@ -17,18 +17,29 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Queue;
-import java.util.concurrent.CompletableFuture;
+import java.util.Set;
+import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
+import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.client.AsyncClusterConnection;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
+import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.ipc.ServerCall;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FutureUtils;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALEdit;
@@ -39,6 +50,8 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
 
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor;
+
 /**
  * The class for replicating WAL edits to secondary replicas, one instance per region.
  */
@@ -97,35 +110,39 @@ public class RegionReplicationSink {
 
   private final RegionInfo primary;
 
-  private final int regionReplication;
-
-  private final boolean hasRegionMemStoreReplication;
+  private final TableDescriptor tableDesc;
 
-  private final Queue<SinkEntry> entries = new ArrayDeque<>();
+  private final Runnable flushRequester;
 
   private final AsyncClusterConnection conn;
 
+  // used to track the replicas which we failed to replicate edits to them
+  // will be cleared after we get a flush edit.
+  private final Set<Integer> failedReplicas = new HashSet<>();
+
+  private final Queue<SinkEntry> entries = new ArrayDeque<>();
+
   private final int retries;
 
   private final long rpcTimeoutNs;
 
   private final long operationTimeoutNs;
 
-  private CompletableFuture<Void> future;
+  private boolean sending;
 
   private boolean stopping;
 
   private boolean stopped;
 
-  RegionReplicationSink(Configuration conf, RegionInfo primary, int regionReplication,
-    boolean hasRegionMemStoreReplication, AsyncClusterConnection conn) {
+  RegionReplicationSink(Configuration conf, RegionInfo primary, TableDescriptor td,
+    Runnable flushRequester, AsyncClusterConnection conn) {
     Preconditions.checkArgument(RegionReplicaUtil.isDefaultReplica(primary), "%s is not primary",
       primary);
-    Preconditions.checkArgument(regionReplication > 1,
-      "region replication should be greater than 1 but got %s", regionReplication);
+    Preconditions.checkArgument(td.getRegionReplication() > 1,
+      "region replication should be greater than 1 but got %s", td.getRegionReplication());
     this.primary = primary;
-    this.regionReplication = regionReplication;
-    this.hasRegionMemStoreReplication = hasRegionMemStoreReplication;
+    this.tableDesc = td;
+    this.flushRequester = flushRequester;
     this.conn = conn;
     this.retries = conf.getInt(RETRIES_NUMBER, RETRIES_NUMBER_DEFAULT);
     this.rpcTimeoutNs =
@@ -134,6 +151,36 @@ public class RegionReplicationSink {
       .toNanos(conf.getLong(OPERATION_TIMEOUT_MS, OPERATION_TIMEOUT_MS_DEFAULT));
   }
 
+  private void onComplete(List<SinkEntry> sent,
+    Map<Integer, MutableObject<Throwable>> replica2Error) {
+    sent.forEach(SinkEntry::replicated);
+    Set<Integer> failed = new HashSet<>();
+    for (Map.Entry<Integer, MutableObject<Throwable>> entry : replica2Error.entrySet()) {
+      Integer replicaId = entry.getKey();
+      Throwable error = entry.getValue().getValue();
+      if (error != null) {
+        LOG.warn("Failed to replicate to secondary replica {} for {}, stop replicating" +
+          " for a while and trigger a flush", replicaId, primary, error);
+        failed.add(replicaId);
+      }
+    }
+    synchronized (entries) {
+      if (!failed.isEmpty()) {
+        failedReplicas.addAll(failed);
+        flushRequester.run();
+      }
+      sending = false;
+      if (stopping) {
+        stopped = true;
+        entries.notifyAll();
+        return;
+      }
+      if (!entries.isEmpty()) {
+        send();
+      }
+    }
+  }
+
   private void send() {
     List<SinkEntry> toSend = new ArrayList<>();
     for (SinkEntry entry;;) {
@@ -143,32 +190,37 @@ public class RegionReplicationSink {
       }
       toSend.add(entry);
     }
+    int toSendReplicaCount = tableDesc.getRegionReplication() - 1 - failedReplicas.size();
+    if (toSendReplicaCount <= 0) {
+      return;
+    }
+    sending = true;
     List<WAL.Entry> walEntries =
       toSend.stream().map(e -> new WAL.Entry(e.key, e.edit)).collect(Collectors.toList());
-    List<CompletableFuture<Void>> futures = new ArrayList<>();
-    for (int replicaId = 1; replicaId < regionReplication; replicaId++) {
+    AtomicInteger remaining = new AtomicInteger(toSendReplicaCount);
+    Map<Integer, MutableObject<Throwable>> replica2Error = new HashMap<>();
+    for (int replicaId = 1; replicaId < tableDesc.getRegionReplication(); replicaId++) {
+      MutableObject<Throwable> error = new MutableObject<>();
+      replica2Error.put(replicaId, error);
       RegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(primary, replicaId);
-      futures.add(conn.replicate(replica, walEntries, retries, rpcTimeoutNs, operationTimeoutNs));
+      FutureUtils.addListener(
+        conn.replicate(replica, walEntries, retries, rpcTimeoutNs, operationTimeoutNs), (r, e) -> {
+          error.setValue(e);
+          if (remaining.decrementAndGet() == 0) {
+            onComplete(toSend, replica2Error);
+          }
+        });
     }
-    future = CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[0]));
-    FutureUtils.addListener(future, (r, e) -> {
-      if (e != null) {
-        // TODO: drop pending edits and issue a flush
-        LOG.warn("Failed to replicate to secondary replicas for {}", primary, e);
-      }
-      toSend.forEach(SinkEntry::replicated);
-      synchronized (entries) {
-        future = null;
-        if (stopping) {
-          stopped = true;
-          entries.notifyAll();
-          return;
-        }
-        if (!entries.isEmpty()) {
-          send();
-        }
-      }
-    });
+  }
+
+  private boolean flushAllStores(FlushDescriptor flushDesc) {
+    Set<byte[]> storesFlushed =
+      flushDesc.getStoreFlushesList().stream().map(sfd -> sfd.getFamilyName().toByteArray())
+        .collect(Collectors.toCollection(() -> new TreeSet<>(Bytes.BYTES_COMPARATOR)));
+    if (storesFlushed.size() != tableDesc.getColumnFamilyCount()) {
+      return false;
+    }
+    return storesFlushed.containsAll(tableDesc.getColumnFamilyNames());
   }
 
   /**
@@ -178,7 +230,7 @@ public class RegionReplicationSink {
    * rpc call has cell scanner, which is off heap.
    */
   public void add(WALKeyImpl key, WALEdit edit, ServerCall<?> rpcCall) {
-    if (!hasRegionMemStoreReplication && !edit.isMetaEdit()) {
+    if (!tableDesc.hasRegionMemStoreReplication() && !edit.isMetaEdit()) {
       // only replicate meta edit if region memstore replication is not enabled
       return;
     }
@@ -186,10 +238,31 @@ public class RegionReplicationSink {
       if (stopping) {
         return;
       }
+      if (edit.isMetaEdit()) {
+        // check whether we flushed all stores, which means we could drop all the previous edits,
+        // and also, recover from the previous failure of some replicas
+        for (Cell metaCell : edit.getCells()) {
+          if (CellUtil.matchingFamily(metaCell, WALEdit.METAFAMILY)) {
+            FlushDescriptor flushDesc;
+            try {
+              flushDesc = WALEdit.getFlushDescriptor(metaCell);
+            } catch (IOException e) {
+              LOG.warn("Failed to parse FlushDescriptor from {}", metaCell);
+              continue;
+            }
+            if (flushDesc != null && flushAllStores(flushDesc)) {
+              LOG.debug("Got a flush all request, clear failed replicas {} and {} pending" +
+                " replication entries", failedReplicas, entries.size());
+              entries.clear();
+              failedReplicas.clear();
+            }
+          }
+        }
+      }
       // TODO: limit the total cached entries here, and we should have a global limitation, not for
       // only this region.
       entries.add(new SinkEntry(key, edit, rpcCall));
-      if (future == null) {
+      if (!sending) {
         send();
       }
     }
@@ -203,7 +276,7 @@ public class RegionReplicationSink {
   void stop() {
     synchronized (entries) {
       stopping = true;
-      if (future == null) {
+      if (!sending) {
         stopped = true;
         entries.notifyAll();
       }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicaReplicationError.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicaReplicationError.java
new file mode 100644
index 0000000..f88c4a9
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicaReplicationError.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
+import org.apache.hadoop.hbase.StartTestingClusterOption;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.testclassification.FlakeyTests;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
+import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
+import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
+
+/**
+ * Test region replication when error occur.
+ * <p/>
+ * We can not simply move the secondary replicas as we will trigger a flush for the primary replica
+ * when secondary replica is online, which will always make the data of the two regions in sync. So
+ * here we need to simulate request errors.
+ */
+@Category({ FlakeyTests.class, LargeTests.class })
+public class TestRegionReplicaReplicationError {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestRegionReplicaReplicationError.class);
+
+  public static final class ErrorReplayRSRpcServices extends RSRpcServices {
+
+    private final AtomicInteger count = new AtomicInteger(0);
+
+    public ErrorReplayRSRpcServices(HRegionServer rs) throws IOException {
+      super(rs);
+    }
+
+    @Override
+    public ReplicateWALEntryResponse replay(RpcController controller,
+      ReplicateWALEntryRequest request) throws ServiceException {
+      List<WALEntry> entries = request.getEntryList();
+      if (CollectionUtils.isEmpty(entries)) {
+        return ReplicateWALEntryResponse.getDefaultInstance();
+      }
+      ByteString regionName = entries.get(0).getKey().getEncodedRegionName();
+      HRegion region;
+      try {
+        region = server.getRegionByEncodedName(regionName.toStringUtf8());
+      } catch (NotServingRegionException e) {
+        throw new ServiceException(e);
+      }
+      // fail the first several request
+      if (region.getRegionInfo().getReplicaId() == 1 && count.addAndGet(entries.size()) < 100) {
+        throw new ServiceException("Inject error!");
+      }
+      return super.replay(controller, request);
+    }
+
+  }
+
+  public static final class RSForTest
+    extends SingleProcessHBaseCluster.MiniHBaseClusterRegionServer {
+
+    public RSForTest(Configuration conf) throws IOException, InterruptedException {
+      super(conf);
+    }
+
+    @Override
+    protected RSRpcServices createRpcServices() throws IOException {
+      return new ErrorReplayRSRpcServices(this);
+    }
+  }
+
+  private static final HBaseTestingUtil HTU = new HBaseTestingUtil();
+
+  private static TableName TN = TableName.valueOf("test");
+
+  private static byte[] CF = Bytes.toBytes("cf");
+
+  private static byte[] CQ = Bytes.toBytes("cq");
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    HTU.getConfiguration().setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY,
+      true);
+    HTU.startMiniCluster(
+      StartTestingClusterOption.builder().rsClass(RSForTest.class).numRegionServers(3).build());
+    TableDescriptor td = TableDescriptorBuilder.newBuilder(TN).setRegionReplication(3)
+      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(CF)).build();
+    HTU.getAdmin().createTable(td);
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    HTU.shutdownMiniCluster();
+  }
+
+  private boolean checkReplica(Table table, int replicaId) throws IOException {
+    boolean ret = true;
+    for (int i = 0; i < 500; i++) {
+      Result result = table.get(new Get(Bytes.toBytes(i)).setReplicaId(replicaId));
+      byte[] value = result.getValue(CF, CQ);
+      ret &= value != null && value.length > 0 && Bytes.toInt(value) == i;
+    }
+    return ret;
+  }
+
+  @Test
+  public void test() throws IOException {
+    try (Table table = HTU.getConnection().getTable(TN)) {
+      for (int i = 0; i < 500; i++) {
+        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
+      }
+      HTU.waitFor(30000, () -> checkReplica(table, 2));
+      HTU.waitFor(30000, () -> checkReplica(table, 1));
+    }
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplication.java
index 7dd4255..231c9e1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplication.java
@@ -53,8 +53,8 @@ import org.slf4j.LoggerFactory;
 import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles;
 
 /**
- * Tests RegionReplicaReplicationEndpoint class by setting up region replicas and verifying
- * async wal replication replays the edits to the secondary region in various scenarios.
+ * Tests region replication by setting up region replicas and verifying async wal replication
+ * replays the edits to the secondary region in various scenarios.
  */
 @Category({FlakeyTests.class, LargeTests.class})
 public class TestRegionReplicaReplication {

[hbase] 01/06: HBASE-26407 Introduce a region replication sink for sinking WAL edits to secondary replicas directly (#3807)

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch HBASE-26233
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 5f1f64e0e9e684751bbc8669e3b57ba59e97e2da
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Tue Nov 2 08:42:29 2021 +0800

    HBASE-26407 Introduce a region replication sink for sinking WAL edits to secondary replicas directly (#3807)
    
    Signed-off-by: Xiaolin Ha <ha...@apache.org>
---
 .../hbase/client/AsyncClusterConnection.java       |  13 +-
 .../hbase/client/AsyncClusterConnectionImpl.java   |  17 +-
 .../AsyncRegionReplicaReplayRetryingCaller.java    | 147 ------
 .../AsyncRegionReplicationRetryingCaller.java      | 103 +++++
 .../org/apache/hadoop/hbase/ipc/RpcServer.java     |  11 +
 .../hadoop/hbase/master/janitor/MetaFixer.java     |   9 -
 .../master/procedure/CreateTableProcedure.java     |  11 -
 .../master/procedure/ModifyTableProcedure.java     |   9 -
 .../apache/hadoop/hbase/regionserver/HRegion.java  |  79 +++-
 .../apache/hadoop/hbase/regionserver/HStore.java   |   3 +-
 .../MultiVersionConcurrencyControl.java            |  24 +-
 .../hbase/regionserver/RegionReplicationSink.java  | 228 +++++++++
 .../regionserver/handler/AssignRegionHandler.java  |  12 +-
 .../handler/UnassignRegionHandler.java             |  12 +-
 .../hbase/regionserver/wal/AbstractFSWAL.java      |   3 +-
 .../hadoop/hbase/regionserver/wal/WALUtil.java     |  44 +-
 .../regionserver/CatalogReplicationSource.java     |  47 --
 .../regionserver/CatalogReplicationSourcePeer.java |  50 --
 .../RegionReplicaReplicationEndpoint.java          | 407 ----------------
 .../regionserver/ReplicationSourceFactory.java     |   1 -
 .../regionserver/ReplicationSourceManager.java     |  87 ----
 .../hadoop/hbase/util/ServerRegionReplicaUtil.java |  32 --
 .../org/apache/hadoop/hbase/TestIOFencing.java     |   4 +-
 .../hbase/client/DummyAsyncClusterConnection.java  |  13 +-
 .../hadoop/hbase/regionserver/TestHRegion.java     |   2 +-
 .../regionserver/TestRegionReplicaFailover.java    |   4 +-
 ....java => TestMetaRegionReplicaReplication.java} | 184 ++------
 .../regionserver/TestRegionReplicaReplication.java | 273 +++++++++++
 .../TestRegionReplicaReplicationEndpoint.java      | 515 ---------------------
 ...stRegionReplicaReplicationEndpointNoMaster.java | 281 -----------
 30 files changed, 790 insertions(+), 1835 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
index 9502424..11c4f4f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
@@ -58,13 +58,6 @@ public interface AsyncClusterConnection extends AsyncConnection {
   CompletableFuture<FlushRegionResponse> flush(byte[] regionName, boolean writeFlushWALMarker);
 
   /**
-   * Replicate wal edits for replica regions. The return value is the edits we skipped, as the
-   * original return value is useless.
-   */
-  CompletableFuture<Long> replay(TableName tableName, byte[] encodedRegionName, byte[] row,
-      List<Entry> entries, int replicaId, int numRetries, long operationTimeoutNs);
-
-  /**
    * Return all the replicas for a region. Used for region replica replication.
    */
   CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[] row,
@@ -110,4 +103,10 @@ public interface AsyncClusterConnection extends AsyncConnection {
    * Get the bootstrap node list of another region server.
    */
   CompletableFuture<List<ServerName>> getAllBootstrapNodes(ServerName regionServer);
+
+  /**
+   * Replicate wal edits to a secondary replica.
+   */
+  CompletableFuture<Void> replicate(RegionInfo replica, List<Entry> entries, int numRetries,
+    long rpcTimeoutNs, long operationTimeoutNs);
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java
index 825fbb4..789d616 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java
@@ -85,14 +85,6 @@ class AsyncClusterConnectionImpl extends AsyncConnectionImpl implements AsyncClu
   }
 
   @Override
-  public CompletableFuture<Long> replay(TableName tableName, byte[] encodedRegionName, byte[] row,
-      List<Entry> entries, int replicaId, int retries, long operationTimeoutNs) {
-    return new AsyncRegionReplicaReplayRetryingCaller(RETRY_TIMER, this,
-      ConnectionUtils.retries2Attempts(retries), operationTimeoutNs, tableName, encodedRegionName,
-      row, entries, replicaId).call();
-  }
-
-  @Override
   public CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[] row,
       boolean reload) {
     return getLocator().getRegionLocations(tableName, row, RegionLocateType.CURRENT, reload, -1L);
@@ -176,4 +168,13 @@ class AsyncClusterConnectionImpl extends AsyncConnectionImpl implements AsyncClu
       });
     return future;
   }
+
+  @Override
+  public CompletableFuture<Void> replicate(RegionInfo replica,
+    List<Entry> entries, int retries, long rpcTimeoutNs,
+    long operationTimeoutNs) {
+    return new AsyncRegionReplicationRetryingCaller(RETRY_TIMER, this,
+      ConnectionUtils.retries2Attempts(retries), rpcTimeoutNs, operationTimeoutNs, replica, entries)
+        .call();
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicaReplayRetryingCaller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicaReplayRetryingCaller.java
deleted file mode 100644
index 0146c8b..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicaReplayRetryingCaller.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.client;
-
-import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
-
-import java.io.IOException;
-import java.util.List;
-import org.apache.hadoop.hbase.CellScanner;
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.wal.WAL.Entry;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
-
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
-
-/**
- * For replaying edits for region replica.
- * <p/>
- * The mainly difference here is that, every time after locating, we will check whether the region
- * name is equal, if not, we will give up, as this usually means the region has been split or
- * merged, and the new region(s) should already have all the data of the parent region(s).
- * <p/>
- * Notice that, the return value is the edits we skipped, as the original response message is not
- * used at upper layer.
- */
-@InterfaceAudience.Private
-public class AsyncRegionReplicaReplayRetryingCaller extends AsyncRpcRetryingCaller<Long> {
-
-  private static final Logger LOG =
-    LoggerFactory.getLogger(AsyncRegionReplicaReplayRetryingCaller.class);
-
-  private final TableName tableName;
-
-  private final byte[] encodedRegionName;
-
-  private final byte[] row;
-
-  private final Entry[] entries;
-
-  private final int replicaId;
-
-  public AsyncRegionReplicaReplayRetryingCaller(HashedWheelTimer retryTimer,
-      AsyncClusterConnectionImpl conn, int maxAttempts, long operationTimeoutNs,
-      TableName tableName, byte[] encodedRegionName, byte[] row, List<Entry> entries,
-      int replicaId) {
-    super(retryTimer, conn, ConnectionUtils.getPriority(tableName), conn.connConf.getPauseNs(),
-      conn.connConf.getPauseForCQTBENs(), maxAttempts, operationTimeoutNs,
-      conn.connConf.getWriteRpcTimeoutNs(), conn.connConf.getStartLogErrorsCnt());
-    this.tableName = tableName;
-    this.encodedRegionName = encodedRegionName;
-    this.row = row;
-    this.entries = entries.toArray(new Entry[0]);
-    this.replicaId = replicaId;
-  }
-
-  private void call(HRegionLocation loc) {
-    if (!Bytes.equals(encodedRegionName, loc.getRegion().getEncodedNameAsBytes())) {
-      if (LOG.isTraceEnabled()) {
-        LOG.trace(
-          "Skipping {} entries in table {} because located region {} is different than" +
-            " the original region {} from WALEdit",
-          entries.length, tableName, loc.getRegion().getEncodedName(),
-          Bytes.toStringBinary(encodedRegionName));
-        for (Entry entry : entries) {
-          LOG.trace("Skipping : " + entry);
-        }
-      }
-      future.complete(Long.valueOf(entries.length));
-      return;
-    }
-
-    AdminService.Interface stub;
-    try {
-      stub = conn.getAdminStub(loc.getServerName());
-    } catch (IOException e) {
-      onError(e,
-        () -> "Get async admin stub to " + loc.getServerName() + " for '" +
-          Bytes.toStringBinary(row) + "' in " + loc.getRegion().getEncodedName() + " of " +
-          tableName + " failed",
-        err -> conn.getLocator().updateCachedLocationOnError(loc, err));
-      return;
-    }
-    Pair<ReplicateWALEntryRequest, CellScanner> p = ReplicationProtobufUtil
-      .buildReplicateWALEntryRequest(entries, encodedRegionName, null, null, null);
-    resetCallTimeout();
-    controller.setCellScanner(p.getSecond());
-    stub.replay(controller, p.getFirst(), r -> {
-      if (controller.failed()) {
-        onError(controller.getFailed(),
-          () -> "Call to " + loc.getServerName() + " for '" + Bytes.toStringBinary(row) + "' in " +
-            loc.getRegion().getEncodedName() + " of " + tableName + " failed",
-          err -> conn.getLocator().updateCachedLocationOnError(loc, err));
-      } else {
-        future.complete(0L);
-      }
-    });
-
-  }
-
-  @Override
-  protected void doCall() {
-    long locateTimeoutNs;
-    if (operationTimeoutNs > 0) {
-      locateTimeoutNs = remainingTimeNs();
-      if (locateTimeoutNs <= 0) {
-        completeExceptionally();
-        return;
-      }
-    } else {
-      locateTimeoutNs = -1L;
-    }
-    addListener(conn.getLocator().getRegionLocation(tableName, row, replicaId,
-      RegionLocateType.CURRENT, locateTimeoutNs), (loc, error) -> {
-        if (error != null) {
-          onError(error,
-            () -> "Locate '" + Bytes.toStringBinary(row) + "' in " + tableName + " failed", err -> {
-            });
-          return;
-        }
-        call(loc);
-      });
-  }
-}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicationRetryingCaller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicationRetryingCaller.java
new file mode 100644
index 0000000..a0ce418
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicationRetryingCaller.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
+
+/**
+ * For replicating edits to secondary replicas.
+ */
+@InterfaceAudience.Private
+public class AsyncRegionReplicationRetryingCaller extends AsyncRpcRetryingCaller<Void> {
+
+  private final RegionInfo replica;
+
+  private final Entry[] entries;
+
+  public AsyncRegionReplicationRetryingCaller(HashedWheelTimer retryTimer,
+    AsyncClusterConnectionImpl conn, int maxAttempts, long rpcTimeoutNs, long operationTimeoutNs,
+    RegionInfo replica, List<Entry> entries) {
+    super(retryTimer, conn, ConnectionUtils.getPriority(replica.getTable()),
+      conn.connConf.getPauseNs(), conn.connConf.getPauseForCQTBENs(), maxAttempts,
+      operationTimeoutNs, rpcTimeoutNs, conn.connConf.getStartLogErrorsCnt());
+    this.replica = replica;
+    this.entries = entries.toArray(new Entry[0]);
+  }
+
+  private void call(HRegionLocation loc) {
+    AdminService.Interface stub;
+    try {
+      stub = conn.getAdminStub(loc.getServerName());
+    } catch (IOException e) {
+      onError(e,
+        () -> "Get async admin stub to " + loc.getServerName() + " for " + replica + " failed",
+        err -> conn.getLocator().updateCachedLocationOnError(loc, err));
+      return;
+    }
+    Pair<ReplicateWALEntryRequest, CellScanner> pair = ReplicationProtobufUtil
+      .buildReplicateWALEntryRequest(entries, replica.getEncodedNameAsBytes(), null, null, null);
+    resetCallTimeout();
+    controller.setCellScanner(pair.getSecond());
+    stub.replay(controller, pair.getFirst(), r -> {
+      if (controller.failed()) {
+        onError(controller.getFailed(),
+          () -> "Call to " + loc.getServerName() + " for " + replica + " failed",
+          err -> conn.getLocator().updateCachedLocationOnError(loc, err));
+      } else {
+        future.complete(null);
+      }
+    });
+  }
+
+  @Override
+  protected void doCall() {
+    long locateTimeoutNs;
+    if (operationTimeoutNs > 0) {
+      locateTimeoutNs = remainingTimeNs();
+      if (locateTimeoutNs <= 0) {
+        completeExceptionally();
+        return;
+      }
+    } else {
+      locateTimeoutNs = -1L;
+    }
+    addListener(conn.getLocator().getRegionLocation(replica.getTable(), replica.getStartKey(),
+      replica.getReplicaId(), RegionLocateType.CURRENT, locateTimeoutNs), (loc, error) -> {
+        if (error != null) {
+          onError(error, () -> "Locate " + replica + " failed", err -> {
+          });
+          return;
+        }
+        call(loc);
+      });
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index b41619a..9a7ba92 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -679,6 +679,17 @@ public abstract class RpcServer implements RpcServerInterface,
     return Optional.ofNullable(CurCall.get());
   }
 
+  /**
+   * Just return the current rpc call if it is a {@link ServerCall} and also has {@link CellScanner}
+   * attached.
+   * <p/>
+   * Mainly used for reference counting as {@link CellScanner} may reference non heap memory.
+   */
+  public static Optional<ServerCall<?>> getCurrentServerCallWithCellScanner() {
+    return getCurrentCall().filter(c -> c instanceof ServerCall)
+      .filter(c -> c.getCellScanner() != null).map(c -> (ServerCall<?>) c);
+  }
+
   public static boolean isInRpcCallContext() {
     return CurCall.get() != null;
   }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/janitor/MetaFixer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/janitor/MetaFixer.java
index 4a5aa0a..64ee49e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/janitor/MetaFixer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/janitor/MetaFixer.java
@@ -29,7 +29,6 @@ import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.stream.Collectors;
-import org.apache.hadoop.hbase.HBaseIOException;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.TableName;
@@ -40,10 +39,8 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.exceptions.MergeRegionException;
 import org.apache.hadoop.hbase.master.MasterServices;
 import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure;
-import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -197,15 +194,9 @@ public class MetaFixer {
           MetaTableAccessor.addRegionsToMeta(masterServices.getConnection(), newRegions,
             td.getRegionReplication());
 
-          // Setup replication for region replicas if needed
-          if (td.getRegionReplication() > 1) {
-            ServerRegionReplicaUtil.setupRegionReplicaReplication(masterServices);
-          }
           return Either.<List<RegionInfo>, IOException> ofLeft(newRegions);
         } catch (IOException e) {
           return Either.<List<RegionInfo>, IOException> ofRight(e);
-        } catch (ReplicationException e) {
-          return Either.<List<RegionInfo>, IOException> ofRight(new HBaseIOException(e));
         }
       })
       .collect(Collectors.toList());
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java
index 2313e70..723f851 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java
@@ -25,7 +25,6 @@ import java.util.function.Supplier;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.hadoop.hbase.HBaseIOException;
 import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.NamespaceDescriptor;
 import org.apache.hadoop.hbase.TableExistsException;
@@ -37,12 +36,10 @@ import org.apache.hadoop.hbase.client.TableState;
 import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
 import org.apache.hadoop.hbase.master.MasterFileSystem;
 import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
-import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.rsgroup.RSGroupInfo;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.apache.hadoop.hbase.util.FSTableDescriptors;
 import org.apache.hadoop.hbase.util.ModifyRegionUtils;
-import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -378,14 +375,6 @@ public class CreateTableProcedure
     // Add regions to META
     addRegionsToMeta(env, tableDescriptor, newRegions);
 
-    // Setup replication for region replicas if needed
-    if (tableDescriptor.getRegionReplication() > 1) {
-      try {
-        ServerRegionReplicaUtil.setupRegionReplicaReplication(env.getMasterServices());
-      } catch (ReplicationException e) {
-        throw new HBaseIOException(e);
-      }
-    }
     return newRegions;
   }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java
index 247dd9c..aedb42c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java
@@ -38,10 +38,8 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
 import org.apache.hadoop.hbase.master.zksyncer.MetaLocationSyncer;
 import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
-import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.rsgroup.RSGroupInfo;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -414,13 +412,6 @@ public class ModifyTableProcedure
         .collect(Collectors.toList());
       addChildProcedure(env.getAssignmentManager().createAssignProcedures(newReplicas));
     }
-    if (oldReplicaCount <= 1) {
-      try {
-        ServerRegionReplicaUtil.setupRegionReplicaReplication(env.getMasterServices());
-      } catch (ReplicationException e) {
-        throw new HBaseIOException(e);
-      }
-    }
   }
 
   private void closeExcessReplicasIfNeeded(MasterProcedureEnv env) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index dd9720c..c611a8a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver;
 import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL;
 import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY;
 import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
+
 import edu.umd.cs.findbugs.annotations.Nullable;
 import io.opentelemetry.api.trace.Span;
 import java.io.EOFException;
@@ -135,6 +136,7 @@ import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
 import org.apache.hadoop.hbase.ipc.RpcCall;
 import org.apache.hadoop.hbase.ipc.RpcServer;
+import org.apache.hadoop.hbase.ipc.ServerCall;
 import org.apache.hadoop.hbase.mob.MobFileCache;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
@@ -177,6 +179,7 @@ import org.apache.hadoop.util.StringUtils;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
 import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
 import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
@@ -191,6 +194,7 @@ import org.apache.hbase.thirdparty.com.google.protobuf.Service;
 import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
 import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
 import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
+
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall;
@@ -708,6 +712,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
   private final StoreHotnessProtector storeHotnessProtector;
 
+  private Optional<RegionReplicationSink> regionReplicationSink = Optional.empty();
+
   /**
    * HRegion constructor. This constructor should only be used for testing and
    * extensions.  Instances of HRegion should be instantiated with the
@@ -1080,11 +1086,28 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       status.setStatus("Running coprocessor post-open hooks");
       coprocessorHost.postOpen();
     }
-
+    initializeRegionReplicationSink(reporter, status);
     status.markComplete("Region opened successfully");
     return nextSeqId;
   }
 
+  private void initializeRegionReplicationSink(CancelableProgressable reporter,
+    MonitoredTask status) {
+    RegionServerServices rss = getRegionServerServices();
+    TableDescriptor td = getTableDescriptor();
+    int regionReplication = td.getRegionReplication();
+    RegionInfo regionInfo = getRegionInfo();
+    if (regionReplication <= 1 || !RegionReplicaUtil.isDefaultReplica(regionInfo) ||
+      !ServerRegionReplicaUtil.isRegionReplicaReplicationEnabled(conf, regionInfo.getTable()) ||
+      rss == null) {
+      regionReplicationSink = Optional.empty();
+      return;
+    }
+    status.setStatus("Initializaing region replication sink");
+    regionReplicationSink = Optional.of(new RegionReplicationSink(conf, regionInfo,
+      regionReplication, td.hasRegionMemStoreReplication(), rss.getAsyncClusterConnection()));
+  }
+
   /**
    * Open all Stores.
    * @param reporter
@@ -1207,7 +1230,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       RegionEventDescriptor.EventType.REGION_OPEN, getRegionInfo(), openSeqId,
       getRegionServerServices().getServerName(), storeFiles);
     WALUtil.writeRegionEventMarker(wal, getReplicationScope(), getRegionInfo(), regionOpenDesc,
-        mvcc);
+        mvcc, regionReplicationSink.orElse(null));
   }
 
   private void writeRegionCloseMarker(WAL wal) throws IOException {
@@ -1216,7 +1239,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       RegionEventDescriptor.EventType.REGION_CLOSE, getRegionInfo(), mvcc.getReadPoint(),
       getRegionServerServices().getServerName(), storeFiles);
     WALUtil.writeRegionEventMarker(wal, getReplicationScope(), getRegionInfo(), regionEventDesc,
-        mvcc);
+      mvcc, null);
 
     // Store SeqId in WAL FileSystem when a region closes
     // checking region folder exists is due to many tests which delete the table folder while a
@@ -1861,7 +1884,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         RegionReplicaUtil.isDefaultReplica(getRegionInfo())) {
         writeRegionCloseMarker(wal);
       }
-
+      if (regionReplicationSink.isPresent()) {
+        // stop replicating to secondary replicas
+        RegionReplicationSink sink = regionReplicationSink.get();
+        sink.stop();
+        try {
+          regionReplicationSink.get().waitUntilStopped();
+        } catch (InterruptedException e) {
+          throw throwOnInterrupt(e);
+        }
+      }
       this.closed.set(true);
       if (!canFlush) {
         decrMemStoreSize(this.memStoreSizing.getMemStoreSize());
@@ -2822,7 +2854,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
             getRegionInfo(), flushOpSeqId, committedFiles);
         // No sync. Sync is below where no updates lock and we do FlushAction.COMMIT_FLUSH
         WALUtil.writeFlushMarker(wal, this.getReplicationScope(), getRegionInfo(), desc, false,
-            mvcc);
+          mvcc, null);
       }
 
       // Prepare flush (take a snapshot)
@@ -2883,8 +2915,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     try {
       FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
           getRegionInfo(), flushOpSeqId, committedFiles);
-      WALUtil.writeFlushMarker(wal, this.getReplicationScope(), getRegionInfo(), desc, false,
-          mvcc);
+      WALUtil.writeFlushMarker(wal, this.getReplicationScope(), getRegionInfo(), desc, false, mvcc,
+        null);
     } catch (Throwable t) {
       LOG.warn("Received unexpected exception trying to write ABORT_FLUSH marker to WAL: {} in "
         + " region {}", StringUtils.stringifyException(t), this);
@@ -2928,8 +2960,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.CANNOT_FLUSH,
         getRegionInfo(), -1, new TreeMap<>(Bytes.BYTES_COMPARATOR));
       try {
-        WALUtil.writeFlushMarker(wal, this.getReplicationScope(), getRegionInfo(), desc, true,
-            mvcc);
+        WALUtil.writeFlushMarker(wal, this.getReplicationScope(), getRegionInfo(), desc, true, mvcc,
+          null);
         return true;
       } catch (IOException e) {
         LOG.warn(getRegionInfo().getEncodedName() + " : "
@@ -3008,8 +3040,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         // write flush marker to WAL. If fail, we should throw DroppedSnapshotException
         FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH,
           getRegionInfo(), flushOpSeqId, committedFiles);
-        WALUtil.writeFlushMarker(wal, this.getReplicationScope(), getRegionInfo(), desc, true,
-            mvcc);
+        WALUtil.writeFlushMarker(wal, this.getReplicationScope(), getRegionInfo(), desc, true, mvcc,
+          regionReplicationSink.orElse(null));
       }
     } catch (Throwable t) {
       // An exception here means that the snapshot was not persisted.
@@ -3022,7 +3054,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         try {
           FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
             getRegionInfo(), flushOpSeqId, committedFiles);
-          WALUtil.writeFlushMarker(wal, this.replicationScope, getRegionInfo(), desc, false, mvcc);
+          WALUtil.writeFlushMarker(wal, this.replicationScope, getRegionInfo(), desc, false, mvcc,
+            null);
         } catch (Throwable ex) {
           LOG.warn(getRegionInfo().getEncodedName() + " : "
               + "failed writing ABORT_FLUSH marker to WAL", ex);
@@ -7067,10 +7100,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         try {
           WALProtos.BulkLoadDescriptor loadDescriptor =
               ProtobufUtil.toBulkLoadDescriptor(this.getRegionInfo().getTable(),
-                  UnsafeByteOperations.unsafeWrap(this.getRegionInfo().getEncodedNameAsBytes()),
-                  storeFiles, storeFilesSizes, seqId, clusterIds, replicate);
+              UnsafeByteOperations.unsafeWrap(this.getRegionInfo().getEncodedNameAsBytes()),
+              storeFiles, storeFilesSizes, seqId, clusterIds, replicate);
           WALUtil.writeBulkLoadMarkerAndSync(this.wal, this.getReplicationScope(), getRegionInfo(),
-              loadDescriptor, mvcc);
+            loadDescriptor, mvcc, regionReplicationSink.orElse(null));
         } catch (IOException ioe) {
           if (this.rsServices != null) {
             // Have to abort region server because some hfiles has been loaded but we can't write
@@ -7757,21 +7790,25 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     if (this.coprocessorHost != null && !walEdit.isMetaEdit()) {
       this.coprocessorHost.preWALAppend(walKey, walEdit);
     }
-    WriteEntry writeEntry = null;
+    ServerCall<?> rpcCall = RpcServer.getCurrentServerCallWithCellScanner().orElse(null);
     try {
       long txid = this.wal.appendData(this.getRegionInfo(), walKey, walEdit);
+      WriteEntry writeEntry = walKey.getWriteEntry();
+      regionReplicationSink.ifPresent(sink -> writeEntry.attachCompletionAction(() -> {
+        sink.add(walKey, walEdit, rpcCall);
+      }));
       // Call sync on our edit.
       if (txid != 0) {
         sync(txid, durability);
       }
-      writeEntry = walKey.getWriteEntry();
+      return writeEntry;
     } catch (IOException ioe) {
-      if (walKey != null && walKey.getWriteEntry() != null) {
+      if (walKey.getWriteEntry() != null) {
         mvcc.complete(walKey.getWriteEntry());
       }
       throw ioe;
     }
-    return writeEntry;
+
   }
 
   public static final long FIXED_OVERHEAD = ClassSize.estimateBase(HRegion.class, false);
@@ -8426,6 +8463,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     }
   }
 
+  public Optional<RegionReplicationSink> getRegionReplicationSink() {
+    return regionReplicationSink;
+  }
+
   public void addReadRequestsCount(long readRequestsCount) {
     this.readRequestsCount.add(readRequestsCount);
   }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 32693ab..73abc84 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -1572,7 +1572,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
     // Does this method belong in Region altogether given it is making so many references up there?
     // Could be Region#writeCompactionMarker(compactionDescriptor);
     WALUtil.writeCompactionMarker(this.region.getWAL(), this.region.getReplicationScope(),
-        this.region.getRegionInfo(), compactionDescriptor, this.region.getMVCC());
+      this.region.getRegionInfo(), compactionDescriptor, this.region.getMVCC(),
+      region.getRegionReplicationSink().orElse(null));
   }
 
   void replaceStoreFiles(Collection<HStoreFile> compactedFiles, Collection<HStoreFile> result)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java
index d821eec..91ae03d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.util.LinkedList;
+import java.util.Optional;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ClassSize;
@@ -202,6 +203,7 @@ public class MultiVersionConcurrencyControl {
         if (queueFirst.isCompleted()) {
           nextReadValue = queueFirst.getWriteNumber();
           writeQueue.removeFirst();
+          queueFirst.runCompletionAction();
         } else {
           break;
         }
@@ -271,22 +273,36 @@ public class MultiVersionConcurrencyControl {
    * Every created WriteEntry must be completed by calling mvcc#complete or #completeAndWait.
    */
   @InterfaceAudience.Private
-  public static class WriteEntry {
+  public static final class WriteEntry {
     private final long writeNumber;
     private boolean completed = false;
+    /**
+     * Will be called after completion, i.e, when being removed from the
+     * {@link MultiVersionConcurrencyControl#writeQueue}.
+     */
+    private Optional<Runnable> completionAction = Optional.empty();
 
-    WriteEntry(long writeNumber) {
+    private WriteEntry(long writeNumber) {
       this.writeNumber = writeNumber;
     }
 
-    void markCompleted() {
+    private void markCompleted() {
       this.completed = true;
     }
 
-    boolean isCompleted() {
+    private boolean isCompleted() {
       return this.completed;
     }
 
+    public void attachCompletionAction(Runnable action) {
+      assert !completionAction.isPresent();
+      completionAction = Optional.of(action);
+    }
+
+    private void runCompletionAction() {
+      completionAction.ifPresent(Runnable::run);
+    }
+
     public long getWriteNumber() {
       return this.writeNumber;
     }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionReplicationSink.java
new file mode 100644
index 0000000..6911289
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionReplicationSink.java
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionReplicaUtil;
+import org.apache.hadoop.hbase.ipc.ServerCall;
+import org.apache.hadoop.hbase.util.FutureUtils;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALKeyImpl;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+
+/**
+ * The class for replicating WAL edits to secondary replicas, one instance per region.
+ */
+@InterfaceAudience.Private
+public class RegionReplicationSink {
+
+  private static final Logger LOG = LoggerFactory.getLogger(RegionReplicationSink.class);
+
+  public static final String MAX_PENDING_SIZE = "hbase.region.read-replica.sink.max-pending-size";
+
+  public static final long MAX_PENDING_SIZE_DEFAULT = 10L * 1024 * 1024;
+
+  public static final String RETRIES_NUMBER = "hbase.region.read-replica.sink.retries.number";
+
+  public static final int RETRIES_NUMBER_DEFAULT = 3;
+
+  public static final String RPC_TIMEOUT_MS = "hbase.region.read-replica.sink.rpc.timeout.ms";
+
+  public static final long RPC_TIMEOUT_MS_DEFAULT = 200;
+
+  public static final String OPERATION_TIMEOUT_MS =
+    "hbase.region.read-replica.sink.operation.timeout.ms";
+
+  public static final long OPERATION_TIMEOUT_MS_DEFAULT = 1000;
+
+  private static final class SinkEntry {
+
+    final WALKeyImpl key;
+
+    final WALEdit edit;
+
+    final ServerCall<?> rpcCall;
+
+    SinkEntry(WALKeyImpl key, WALEdit edit, ServerCall<?> rpcCall) {
+      this.key = key;
+      this.edit = edit;
+      this.rpcCall = rpcCall;
+      if (rpcCall != null) {
+        // increase the reference count to avoid the rpc framework free the memory before we
+        // actually sending them out.
+        rpcCall.retainByWAL();
+      }
+    }
+
+    /**
+     * Should be called regardless of the result of the replicating operation. Unless you still want
+     * to reuse this entry, otherwise you must call this method to release the possible off heap
+     * memories.
+     */
+    void replicated() {
+      if (rpcCall != null) {
+        rpcCall.releaseByWAL();
+      }
+    }
+  }
+
+  private final RegionInfo primary;
+
+  private final int regionReplication;
+
+  private final boolean hasRegionMemStoreReplication;
+
+  private final Queue<SinkEntry> entries = new ArrayDeque<>();
+
+  private final AsyncClusterConnection conn;
+
+  private final int retries;
+
+  private final long rpcTimeoutNs;
+
+  private final long operationTimeoutNs;
+
+  private CompletableFuture<Void> future;
+
+  private boolean stopping;
+
+  private boolean stopped;
+
+  RegionReplicationSink(Configuration conf, RegionInfo primary, int regionReplication,
+    boolean hasRegionMemStoreReplication, AsyncClusterConnection conn) {
+    Preconditions.checkArgument(RegionReplicaUtil.isDefaultReplica(primary), "%s is not primary",
+      primary);
+    Preconditions.checkArgument(regionReplication > 1,
+      "region replication should be greater than 1 but got %s", regionReplication);
+    this.primary = primary;
+    this.regionReplication = regionReplication;
+    this.hasRegionMemStoreReplication = hasRegionMemStoreReplication;
+    this.conn = conn;
+    this.retries = conf.getInt(RETRIES_NUMBER, RETRIES_NUMBER_DEFAULT);
+    this.rpcTimeoutNs =
+      TimeUnit.MILLISECONDS.toNanos(conf.getLong(RPC_TIMEOUT_MS, RPC_TIMEOUT_MS_DEFAULT));
+    this.operationTimeoutNs = TimeUnit.MILLISECONDS
+      .toNanos(conf.getLong(OPERATION_TIMEOUT_MS, OPERATION_TIMEOUT_MS_DEFAULT));
+  }
+
+  private void send() {
+    List<SinkEntry> toSend = new ArrayList<>();
+    for (SinkEntry entry;;) {
+      entry = entries.poll();
+      if (entry == null) {
+        break;
+      }
+      toSend.add(entry);
+    }
+    List<WAL.Entry> walEntries =
+      toSend.stream().map(e -> new WAL.Entry(e.key, e.edit)).collect(Collectors.toList());
+    List<CompletableFuture<Void>> futures = new ArrayList<>();
+    for (int replicaId = 1; replicaId < regionReplication; replicaId++) {
+      RegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(primary, replicaId);
+      futures.add(conn.replicate(replica, walEntries, retries, rpcTimeoutNs, operationTimeoutNs));
+    }
+    future = CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[0]));
+    FutureUtils.addListener(future, (r, e) -> {
+      if (e != null) {
+        // TODO: drop pending edits and issue a flush
+        LOG.warn("Failed to replicate to secondary replicas for {}", primary, e);
+      }
+      toSend.forEach(SinkEntry::replicated);
+      synchronized (entries) {
+        future = null;
+        if (stopping) {
+          stopped = true;
+          entries.notifyAll();
+          return;
+        }
+        if (!entries.isEmpty()) {
+          send();
+        }
+      }
+    });
+  }
+
+  /**
+   * Add this edit to replication queue.
+   * <p/>
+   * The {@code rpcCall} is for retaining the cells if the edit is built within an rpc call and the
+   * rpc call has cell scanner, which is off heap.
+   */
+  public void add(WALKeyImpl key, WALEdit edit, ServerCall<?> rpcCall) {
+    if (!hasRegionMemStoreReplication && !edit.isMetaEdit()) {
+      // only replicate meta edit if region memstore replication is not enabled
+      return;
+    }
+    synchronized (entries) {
+      if (stopping) {
+        return;
+      }
+      // TODO: limit the total cached entries here, and we should have a global limitation, not for
+      // only this region.
+      entries.add(new SinkEntry(key, edit, rpcCall));
+      if (future == null) {
+        send();
+      }
+    }
+  }
+
+  /**
+   * Stop the replication sink.
+   * <p/>
+   * Usually this should only be called when you want to close a region.
+   */
+  void stop() {
+    synchronized (entries) {
+      stopping = true;
+      if (future == null) {
+        stopped = true;
+        entries.notifyAll();
+      }
+    }
+  }
+
+  /**
+   * Make sure that we have finished all the replicating requests.
+   * <p/>
+   * After returning, we can make sure there will be no new replicating requests to secondary
+   * replicas.
+   * <p/>
+   * This is used to keep the replicating order the same with the WAL edit order when writing.
+   */
+  void waitUntilStopped() throws InterruptedException {
+    synchronized (entries) {
+      while (!stopped) {
+        entries.wait();
+      }
+    }
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/AssignRegionHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/AssignRegionHandler.java
index 5d9819c..101c9c3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/AssignRegionHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/AssignRegionHandler.java
@@ -22,9 +22,7 @@ import java.io.IOException;
 import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.client.RegionReplicaUtil;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.executor.EventHandler;
 import org.apache.hadoop.hbase.executor.EventType;
@@ -34,10 +32,10 @@ import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionServerServices.PostOpenDeployContext;
 import org.apache.hadoop.hbase.regionserver.RegionServerServices.RegionStateTransitionContext;
 import org.apache.hadoop.hbase.util.RetryCounter;
-import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
 
 /**
@@ -134,14 +132,6 @@ public class AssignRegionHandler extends EventHandler {
       // pass null for the last parameter, which used to be a CancelableProgressable, as now the
       // opening can not be interrupted by a close request any more.
       Configuration conf = rs.getConfiguration();
-      TableName tn = htd.getTableName();
-      if (ServerRegionReplicaUtil.isMetaRegionReplicaReplicationEnabled(conf, tn)) {
-        if (RegionReplicaUtil.isDefaultReplica(this.regionInfo.getReplicaId())) {
-          // Add the hbase:meta replication source on replica zero/default.
-          rs.getReplicationSourceService().getReplicationManager().
-            addCatalogReplicationSource(this.regionInfo);
-        }
-      }
       region = HRegion.openHRegion(regionInfo, htd, rs.getWAL(regionInfo), conf, rs, null);
     } catch (IOException e) {
       cleanUpAndReportFailure(e);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/UnassignRegionHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/UnassignRegionHandler.java
index 0d02f30..2ac55ec 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/UnassignRegionHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/UnassignRegionHandler.java
@@ -22,7 +22,6 @@ import java.io.IOException;
 import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.client.RegionReplicaUtil;
 import org.apache.hadoop.hbase.executor.EventHandler;
 import org.apache.hadoop.hbase.executor.EventType;
 import org.apache.hadoop.hbase.regionserver.HRegion;
@@ -31,10 +30,10 @@ import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionServerServices.RegionStateTransitionContext;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.RetryCounter;
-import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
 
 /**
@@ -121,15 +120,6 @@ public class UnassignRegionHandler extends EventHandler {
     }
 
     rs.removeRegion(region, destination);
-    if (ServerRegionReplicaUtil.isMetaRegionReplicaReplicationEnabled(rs.getConfiguration(),
-        region.getTableDescriptor().getTableName())) {
-      if (RegionReplicaUtil.isDefaultReplica(region.getRegionInfo().getReplicaId())) {
-        // If hbase:meta read replicas enabled, remove replication source for hbase:meta Regions.
-        // See assign region handler where we add the replication source on open.
-        rs.getReplicationSourceService().getReplicationManager().
-          removeCatalogReplicationSource(region.getRegionInfo());
-      }
-    }
     if (!rs.reportRegionStateTransition(
       new RegionStateTransitionContext(TransitionCode.CLOSED, HConstants.NO_SEQNUM, closeProcId,
         -1, region.getRegionInfo()))) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
index 002d9b7..f0e4a1f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
@@ -1164,8 +1164,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
       txidHolder.setValue(ringBuffer.next());
     });
     long txid = txidHolder.longValue();
-    ServerCall<?> rpcCall = RpcServer.getCurrentCall().filter(c -> c instanceof ServerCall)
-      .filter(c -> c.getCellScanner() != null).map(c -> (ServerCall) c).orElse(null);
+    ServerCall<?> rpcCall = RpcServer.getCurrentServerCallWithCellScanner().orElse(null);
     try {
       FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore, rpcCall);
       entry.stampRegionSequenceId(we);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java
index 23db3dc..15be95e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java
@@ -28,7 +28,10 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.ipc.RpcServer;
 import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl.WriteEntry;
+import org.apache.hadoop.hbase.regionserver.RegionReplicationSink;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.wal.WAL;
@@ -71,9 +74,9 @@ public class WALUtil {
    */
   public static WALKeyImpl writeCompactionMarker(WAL wal,
     NavigableMap<byte[], Integer> replicationScope, RegionInfo hri, final CompactionDescriptor c,
-    MultiVersionConcurrencyControl mvcc) throws IOException {
+    MultiVersionConcurrencyControl mvcc, RegionReplicationSink sink) throws IOException {
     WALKeyImpl walKey =
-      writeMarker(wal, replicationScope, hri, WALEdit.createCompaction(hri, c), mvcc, null);
+      writeMarker(wal, replicationScope, hri, WALEdit.createCompaction(hri, c), mvcc, null, sink);
     if (LOG.isTraceEnabled()) {
       LOG.trace("Appended compaction marker " + TextFormat.shortDebugString(c));
     }
@@ -86,10 +89,10 @@ public class WALUtil {
    * This write is for internal use only. Not for external client consumption.
    */
   public static WALKeyImpl writeFlushMarker(WAL wal, NavigableMap<byte[], Integer> replicationScope,
-    RegionInfo hri, final FlushDescriptor f, boolean sync, MultiVersionConcurrencyControl mvcc)
-    throws IOException {
+    RegionInfo hri, final FlushDescriptor f, boolean sync, MultiVersionConcurrencyControl mvcc,
+    RegionReplicationSink sink) throws IOException {
     WALKeyImpl walKey = doFullMarkerAppendTransaction(wal, replicationScope, hri,
-      WALEdit.createFlushWALEdit(hri, f), mvcc, null, sync);
+      WALEdit.createFlushWALEdit(hri, f), mvcc, null, sync, sink);
     if (LOG.isTraceEnabled()) {
       LOG.trace("Appended flush marker " + TextFormat.shortDebugString(f));
     }
@@ -102,9 +105,9 @@ public class WALUtil {
    */
   public static WALKeyImpl writeRegionEventMarker(WAL wal,
     NavigableMap<byte[], Integer> replicationScope, RegionInfo hri, RegionEventDescriptor r,
-    MultiVersionConcurrencyControl mvcc) throws IOException {
-    WALKeyImpl walKey =
-      writeMarker(wal, replicationScope, hri, WALEdit.createRegionEventWALEdit(hri, r), mvcc, null);
+    MultiVersionConcurrencyControl mvcc, RegionReplicationSink sink) throws IOException {
+    WALKeyImpl walKey = writeMarker(wal, replicationScope, hri,
+      WALEdit.createRegionEventWALEdit(hri, r), mvcc, null, sink);
     if (LOG.isTraceEnabled()) {
       LOG.trace("Appended region event marker " + TextFormat.shortDebugString(r));
     }
@@ -122,11 +125,11 @@ public class WALUtil {
    * @throws IOException We will throw an IOException if we can not append to the HLog.
    */
   public static WALKeyImpl writeBulkLoadMarkerAndSync(final WAL wal,
-      final NavigableMap<byte[], Integer> replicationScope, final RegionInfo hri,
-      final WALProtos.BulkLoadDescriptor desc, final MultiVersionConcurrencyControl mvcc)
-    throws IOException {
+    final NavigableMap<byte[], Integer> replicationScope, final RegionInfo hri,
+    final WALProtos.BulkLoadDescriptor desc, final MultiVersionConcurrencyControl mvcc,
+    final RegionReplicationSink sink) throws IOException {
     WALKeyImpl walKey = writeMarker(wal, replicationScope, hri,
-      WALEdit.createBulkLoadEvent(hri, desc), mvcc, null);
+      WALEdit.createBulkLoadEvent(hri, desc), mvcc, null, sink);
     if (LOG.isTraceEnabled()) {
       LOG.trace("Appended Bulk Load marker " + TextFormat.shortDebugString(desc));
     }
@@ -135,11 +138,11 @@ public class WALUtil {
 
   private static WALKeyImpl writeMarker(final WAL wal,
     final NavigableMap<byte[], Integer> replicationScope, final RegionInfo hri, final WALEdit edit,
-    final MultiVersionConcurrencyControl mvcc, final Map<String, byte[]> extendedAttributes)
-    throws IOException {
+    final MultiVersionConcurrencyControl mvcc, final Map<String, byte[]> extendedAttributes,
+    final RegionReplicationSink sink) throws IOException {
     // If sync == true in below, then timeout is not used; safe to pass UNSPECIFIED_TIMEOUT
     return doFullMarkerAppendTransaction(wal, replicationScope, hri, edit, mvcc, extendedAttributes,
-      true);
+      true, sink);
   }
 
   /**
@@ -152,19 +155,24 @@ public class WALUtil {
    */
   private static WALKeyImpl doFullMarkerAppendTransaction(final WAL wal,
     final NavigableMap<byte[], Integer> replicationScope, final RegionInfo hri, final WALEdit edit,
-    final MultiVersionConcurrencyControl mvcc,
-    final Map<String, byte[]> extendedAttributes, final boolean sync) throws IOException {
+    final MultiVersionConcurrencyControl mvcc, final Map<String, byte[]> extendedAttributes,
+    final boolean sync, final RegionReplicationSink sink) throws IOException {
     // TODO: Pass in current time to use?
     WALKeyImpl walKey = new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(),
       EnvironmentEdgeManager.currentTime(), mvcc, replicationScope, extendedAttributes);
     long trx = MultiVersionConcurrencyControl.NONE;
     try {
       trx = wal.appendMarker(hri, walKey, edit);
+      WriteEntry writeEntry = walKey.getWriteEntry();
+      if (sink != null) {
+        writeEntry.attachCompletionAction(() -> sink.add(walKey, edit,
+          RpcServer.getCurrentServerCallWithCellScanner().orElse(null)));
+      }
       if (sync) {
         wal.sync(trx);
       }
       // Call complete only here because these are markers only. They are not for clients to read.
-      mvcc.complete(walKey.getWriteEntry());
+      mvcc.complete(writeEntry);
     } catch (IOException ioe) {
       if (walKey.getWriteEntry() != null) {
         mvcc.complete(walKey.getWriteEntry());
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/CatalogReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/CatalogReplicationSource.java
deleted file mode 100644
index 8cb7860..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/CatalogReplicationSource.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication.regionserver;
-
-import java.util.Collections;
-import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
-import org.apache.yetus.audience.InterfaceAudience;
-
-/**
- * ReplicationSource that reads catalog WAL files -- e.g. hbase:meta WAL files -- and lets through
- * all WALEdits from these WALs. This ReplicationSource is NOT created via
- * {@link ReplicationSourceFactory}.
- */
-@InterfaceAudience.Private
-class CatalogReplicationSource extends ReplicationSource {
-  CatalogReplicationSource() {
-    // Filters in hbase:meta WAL files and allows all edits, including 'meta' edits (these are
-    // filtered out in the 'super' class default implementation).
-    super(p -> AbstractFSWALProvider.isMetaFile(p), Collections.emptyList());
-  }
-
-  @Override
-  public void logPositionAndCleanOldLogs(WALEntryBatch entryBatch) {
-    // Noop. This CatalogReplicationSource implementation does not persist state to backing storage
-    // nor does it keep its WALs in a general map up in ReplicationSourceManager --
-    // CatalogReplicationSource is used by the Catalog Read Replica feature which resets everytime
-    // the WAL source process crashes. Skip calling through to the default implementation.
-    // See "4.1 Skip maintaining zookeeper replication queue (offsets/WALs)" in the
-    // design doc attached to HBASE-18070 'Enable memstore replication for meta replica for detail'
-    // for background on why no need to keep WAL state.
-  }
-}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/CatalogReplicationSourcePeer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/CatalogReplicationSourcePeer.java
deleted file mode 100644
index 3bcd414..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/CatalogReplicationSourcePeer.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication.regionserver;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
-import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
-import org.apache.hadoop.hbase.replication.SyncReplicationState;
-import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
-import org.apache.yetus.audience.InterfaceAudience;
-
-/**
- * The 'peer' used internally by Catalog Region Replicas Replication Source.
- * The Replication system has 'peer' baked into its core so though we do not need 'peering', we
- * need a 'peer' and its configuration else the replication system breaks at a few locales.
- * Set "hbase.region.replica.catalog.replication" if you want to change the configured endpoint.
- */
-@InterfaceAudience.Private
-class CatalogReplicationSourcePeer extends ReplicationPeerImpl {
-  /**
-   * @param clusterKey Usually the UUID from zk passed in by caller as a String.
-   */
-  CatalogReplicationSourcePeer(Configuration configuration, String clusterKey) {
-    super(configuration, ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER + "_catalog",
-      ReplicationPeerConfig.newBuilder().
-        setClusterKey(clusterKey).
-        setReplicationEndpointImpl(
-          configuration.get("hbase.region.replica.catalog.replication",
-            RegionReplicaReplicationEndpoint.class.getName())).
-        setBandwidth(0). // '0' means no bandwidth.
-        setSerial(false).
-        build(),
-      true, SyncReplicationState.NONE, SyncReplicationState.NONE);
-  }
-}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
deleted file mode 100644
index 17e7a53..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
+++ /dev/null
@@ -1,407 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.replication.regionserver;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.TreeMap;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HBaseIOException;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.RegionLocations;
-import org.apache.hadoop.hbase.TableDescriptors;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.client.AsyncClusterConnection;
-import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.client.RegionReplicaUtil;
-import org.apache.hadoop.hbase.client.TableDescriptor;
-import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
-import org.apache.hadoop.hbase.replication.WALEntryFilter;
-import org.apache.hadoop.hbase.util.AtomicUtils;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.FutureUtils;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.util.RetryCounter;
-import org.apache.hadoop.hbase.util.RetryCounterFactory;
-import org.apache.hadoop.hbase.wal.WAL.Entry;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
-import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
-import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader;
-import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache;
-
-/**
- * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint} endpoint which receives the WAL
- * edits from the WAL, and sends the edits to replicas of regions.
- */
-@InterfaceAudience.Private
-public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
-
-  private static final Logger LOG = LoggerFactory.getLogger(RegionReplicaReplicationEndpoint.class);
-
-  // Can be configured differently than hbase.client.retries.number
-  private static String CLIENT_RETRIES_NUMBER =
-    "hbase.region.replica.replication.client.retries.number";
-
-  private Configuration conf;
-  private AsyncClusterConnection connection;
-  private TableDescriptors tableDescriptors;
-
-  private int numRetries;
-
-  private long operationTimeoutNs;
-
-  private LoadingCache<TableName, Optional<TableDescriptor>> tableDescriptorCache;
-
-  private Cache<TableName, TableName> disabledTableCache;
-
-  private final RetryCounterFactory retryCounterFactory =
-    new RetryCounterFactory(Integer.MAX_VALUE, 1000, 60000);
-
-  @Override
-  public void init(Context context) throws IOException {
-    super.init(context);
-    this.conf = context.getConfiguration();
-    this.tableDescriptors = context.getTableDescriptors();
-    int memstoreReplicationEnabledCacheExpiryMs = conf
-      .getInt("hbase.region.replica.replication.cache.memstoreReplicationEnabled.expiryMs", 5000);
-    // A cache for the table "memstore replication enabled" flag.
-    // It has a default expiry of 5 sec. This means that if the table is altered
-    // with a different flag value, we might miss to replicate for that amount of
-    // time. But this cache avoid the slow lookup and parsing of the TableDescriptor.
-    tableDescriptorCache = CacheBuilder.newBuilder()
-      .expireAfterWrite(memstoreReplicationEnabledCacheExpiryMs, TimeUnit.MILLISECONDS)
-      .initialCapacity(10).maximumSize(1000)
-      .build(new CacheLoader<TableName, Optional<TableDescriptor>>() {
-
-        @Override
-        public Optional<TableDescriptor> load(TableName tableName) throws Exception {
-          // check if the table requires memstore replication
-          // some unit-test drop the table, so we should do a bypass check and always replicate.
-          return Optional.ofNullable(tableDescriptors.get(tableName));
-        }
-      });
-    int nonExistentTableCacheExpiryMs =
-      conf.getInt("hbase.region.replica.replication.cache.disabledAndDroppedTables.expiryMs", 5000);
-    // A cache for non existing tables that have a default expiry of 5 sec. This means that if the
-    // table is created again with the same name, we might miss to replicate for that amount of
-    // time. But this cache prevents overloading meta requests for every edit from a deleted file.
-    disabledTableCache = CacheBuilder.newBuilder()
-      .expireAfterWrite(nonExistentTableCacheExpiryMs, TimeUnit.MILLISECONDS).initialCapacity(10)
-      .maximumSize(1000).build();
-    // HRS multiplies client retries by 10 globally for meta operations, but we do not want this.
-    // We are resetting it here because we want default number of retries (35) rather than 10 times
-    // that which makes very long retries for disabled tables etc.
-    int defaultNumRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
-      HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
-    if (defaultNumRetries > 10) {
-      int mult = conf.getInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER,
-        HConstants.DEFAULT_HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER);
-      defaultNumRetries = defaultNumRetries / mult; // reset if HRS has multiplied this already
-    }
-    this.numRetries = conf.getInt(CLIENT_RETRIES_NUMBER, defaultNumRetries);
-    // use the regular RPC timeout for replica replication RPC's
-    this.operationTimeoutNs =
-      TimeUnit.MILLISECONDS.toNanos(conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
-        HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT));
-    this.connection = context.getServer().getAsyncClusterConnection();
-  }
-
-  /**
-   * returns true if the specified entry must be replicated. We should always replicate meta
-   * operations (e.g. flush) and use the user HTD flag to decide whether or not replicate the
-   * memstore.
-   */
-  private boolean requiresReplication(Optional<TableDescriptor> tableDesc,
-      Entry entry) {
-    // empty edit does not need to be replicated
-    if (entry.getEdit().isEmpty() || !tableDesc.isPresent()) {
-      return false;
-    }
-    // meta edits (e.g. flush) must be always replicated
-    return entry.getEdit().isMetaEdit() || tableDesc.get().hasRegionMemStoreReplication();
-  }
-
-  private void getRegionLocations(CompletableFuture<RegionLocations> future,
-      TableDescriptor tableDesc, byte[] encodedRegionName, byte[] row, boolean reload) {
-    FutureUtils.addListener(connection.getRegionLocations(tableDesc.getTableName(), row, reload),
-      (locs, e) -> {
-        if (e != null) {
-          future.completeExceptionally(e);
-          return;
-        }
-        // if we are not loading from cache, just return
-        if (reload) {
-          future.complete(locs);
-          return;
-        }
-        // check if the number of region replicas is correct, and also the primary region name
-        // matches.
-        if (locs.size() == tableDesc.getRegionReplication() &&
-          locs.getDefaultRegionLocation() != null &&
-          Bytes.equals(locs.getDefaultRegionLocation().getRegion().getEncodedNameAsBytes(),
-            encodedRegionName)) {
-          future.complete(locs);
-        } else {
-          // reload again as the information in cache maybe stale
-          getRegionLocations(future, tableDesc, encodedRegionName, row, true);
-        }
-      });
-  }
-
-  private void replicate(CompletableFuture<Long> future, RegionLocations locs,
-      TableDescriptor tableDesc, byte[] encodedRegionName, byte[] row, List<Entry> entries) {
-    if (locs.size() == 1) {
-      LOG.info("Only one location for {}.{}, refresh the location cache only for meta now",
-        tableDesc.getTableName(), Bytes.toString(encodedRegionName));
-
-      // This could happen to meta table. In case of meta table comes with no replica and
-      // later it is changed to multiple replicas. The cached location for meta may only has
-      // the primary region. In this case, it needs to clean up and refresh the cached meta
-      // locations.
-      if (tableDesc.isMetaTable()) {
-        connection.getRegionLocator(tableDesc.getTableName()).clearRegionLocationCache();
-      }
-      future.complete(Long.valueOf(entries.size()));
-      return;
-    }
-    RegionInfo defaultReplica = locs.getDefaultRegionLocation().getRegion();
-    if (!Bytes.equals(defaultReplica.getEncodedNameAsBytes(), encodedRegionName)) {
-      // the region name is not equal, this usually means the region has been split or merged, so
-      // give up replicating as the new region(s) should already have all the data of the parent
-      // region(s).
-      if (LOG.isTraceEnabled()) {
-        LOG.trace(
-          "Skipping {} entries in table {} because located region {} is different than" +
-            " the original region {} from WALEdit",
-          tableDesc.getTableName(), defaultReplica.getEncodedName(),
-          Bytes.toStringBinary(encodedRegionName));
-      }
-      future.complete(Long.valueOf(entries.size()));
-      return;
-    }
-    AtomicReference<Throwable> error = new AtomicReference<>();
-    AtomicInteger remainingTasks = new AtomicInteger(locs.size() - 1);
-    AtomicLong skippedEdits = new AtomicLong(0);
-
-    for (int i = 1, n = locs.size(); i < n; i++) {
-      // Do not use the elements other than the default replica as they may be null. We will fail
-      // earlier if the location for default replica is null.
-      final RegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(defaultReplica, i);
-      FutureUtils
-        .addListener(connection.replay(tableDesc.getTableName(), replica.getEncodedNameAsBytes(),
-          row, entries, replica.getReplicaId(), numRetries, operationTimeoutNs), (r, e) -> {
-            if (e != null) {
-              LOG.warn("Failed to replicate to {}", replica, e);
-              error.compareAndSet(null, e);
-            } else {
-              AtomicUtils.updateMax(skippedEdits, r.longValue());
-            }
-            if (remainingTasks.decrementAndGet() == 0) {
-              if (error.get() != null) {
-                future.completeExceptionally(error.get());
-              } else {
-                future.complete(skippedEdits.get());
-              }
-            }
-          });
-    }
-  }
-
-  private void logSkipped(TableName tableName, List<Entry> entries, String reason) {
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("Skipping {} entries because table {} is {}", entries.size(), tableName, reason);
-      for (Entry entry : entries) {
-        LOG.trace("Skipping : {}", entry);
-      }
-    }
-  }
-
-  private CompletableFuture<Long> replicate(TableDescriptor tableDesc, byte[] encodedRegionName,
-      List<Entry> entries) {
-    if (disabledTableCache.getIfPresent(tableDesc.getTableName()) != null) {
-      logSkipped(tableDesc.getTableName(), entries, "cached as a disabled table");
-      return CompletableFuture.completedFuture(Long.valueOf(entries.size()));
-    }
-    byte[] row = CellUtil.cloneRow(entries.get(0).getEdit().getCells().get(0));
-    CompletableFuture<RegionLocations> locateFuture = new CompletableFuture<>();
-    getRegionLocations(locateFuture, tableDesc, encodedRegionName, row, false);
-    CompletableFuture<Long> future = new CompletableFuture<>();
-    FutureUtils.addListener(locateFuture, (locs, error) -> {
-      if (error != null) {
-        future.completeExceptionally(error);
-      } else if (locs.getDefaultRegionLocation() == null) {
-        future.completeExceptionally(
-          new HBaseIOException("No location found for default replica of table=" +
-            tableDesc.getTableName() + " row='" + Bytes.toStringBinary(row) + "'"));
-      } else {
-        replicate(future, locs, tableDesc, encodedRegionName, row, entries);
-      }
-    });
-    return future;
-  }
-
-  @Override
-  public boolean replicate(ReplicateContext replicateContext) {
-    Map<byte[], Pair<TableDescriptor, List<Entry>>> encodedRegionName2Entries =
-      new TreeMap<>(Bytes.BYTES_COMPARATOR);
-    long skippedEdits = 0;
-    RetryCounter retryCounter = retryCounterFactory.create();
-    outer: while (isRunning()) {
-      encodedRegionName2Entries.clear();
-      skippedEdits = 0;
-      for (Entry entry : replicateContext.getEntries()) {
-        Optional<TableDescriptor> tableDesc;
-        try {
-          tableDesc = tableDescriptorCache.get(entry.getKey().getTableName());
-        } catch (ExecutionException e) {
-          LOG.warn("Failed to load table descriptor for {}, attempts={}",
-            entry.getKey().getTableName(), retryCounter.getAttemptTimes(), e.getCause());
-          if (!retryCounter.shouldRetry()) {
-            return false;
-          }
-          try {
-            retryCounter.sleepUntilNextRetry();
-          } catch (InterruptedException e1) {
-            // restore the interrupted state
-            Thread.currentThread().interrupt();
-            return false;
-          }
-          continue outer;
-        }
-        if (!requiresReplication(tableDesc, entry)) {
-          skippedEdits++;
-          continue;
-        }
-        byte[] encodedRegionName = entry.getKey().getEncodedRegionName();
-        encodedRegionName2Entries
-          .computeIfAbsent(encodedRegionName, k -> Pair.newPair(tableDesc.get(), new ArrayList<>()))
-          .getSecond().add(entry);
-      }
-      break;
-    }
-    // send the request to regions
-    retryCounter = retryCounterFactory.create();
-    while (isRunning()) {
-      List<Pair<CompletableFuture<Long>, byte[]>> futureAndEncodedRegionNameList =
-        new ArrayList<Pair<CompletableFuture<Long>, byte[]>>();
-      for (Map.Entry<byte[], Pair<TableDescriptor, List<Entry>>> entry : encodedRegionName2Entries
-        .entrySet()) {
-        CompletableFuture<Long> future =
-          replicate(entry.getValue().getFirst(), entry.getKey(), entry.getValue().getSecond());
-        futureAndEncodedRegionNameList.add(Pair.newPair(future, entry.getKey()));
-      }
-      for (Pair<CompletableFuture<Long>, byte[]> pair : futureAndEncodedRegionNameList) {
-        byte[] encodedRegionName = pair.getSecond();
-        try {
-          skippedEdits += pair.getFirst().get().longValue();
-          encodedRegionName2Entries.remove(encodedRegionName);
-        } catch (InterruptedException e) {
-          // restore the interrupted state
-          Thread.currentThread().interrupt();
-          return false;
-        } catch (ExecutionException e) {
-          Pair<TableDescriptor, List<Entry>> tableAndEntries =
-            encodedRegionName2Entries.get(encodedRegionName);
-          TableName tableName = tableAndEntries.getFirst().getTableName();
-          List<Entry> entries = tableAndEntries.getSecond();
-          Throwable cause = e.getCause();
-          // The table can be disabled or dropped at this time. For disabled tables, we have no
-          // cheap mechanism to detect this case because meta does not contain this information.
-          // ClusterConnection.isTableDisabled() is a zk call which we cannot do for every replay
-          // RPC. So instead we start the replay RPC with retries and check whether the table is
-          // dropped or disabled which might cause SocketTimeoutException, or
-          // RetriesExhaustedException or similar if we get IOE.
-          if (cause instanceof TableNotFoundException) {
-            // add to cache that the table does not exist
-            tableDescriptorCache.put(tableName, Optional.empty());
-            logSkipped(tableName, entries, "dropped");
-            skippedEdits += entries.size();
-            encodedRegionName2Entries.remove(encodedRegionName);
-            continue;
-          }
-          boolean disabled = false;
-          try {
-            disabled = connection.getAdmin().isTableDisabled(tableName).get();
-          } catch (InterruptedException e1) {
-            // restore the interrupted state
-            Thread.currentThread().interrupt();
-            return false;
-          } catch (ExecutionException e1) {
-            LOG.warn("Failed to test whether {} is disabled, assume it is not disabled", tableName,
-              e1.getCause());
-          }
-          if (disabled) {
-            disabledTableCache.put(tableName, tableName);
-            logSkipped(tableName, entries, "disabled");
-            skippedEdits += entries.size();
-            encodedRegionName2Entries.remove(encodedRegionName);
-            continue;
-          }
-          LOG.warn("Failed to replicate {} entries for region {} of table {}", entries.size(),
-            Bytes.toStringBinary(encodedRegionName), tableName);
-        }
-      }
-      // we have done
-      if (encodedRegionName2Entries.isEmpty()) {
-        ctx.getMetrics().incrLogEditsFiltered(skippedEdits);
-        return true;
-      } else {
-        LOG.warn("Failed to replicate all entries, retry={}", retryCounter.getAttemptTimes());
-        if (!retryCounter.shouldRetry()) {
-          return false;
-        }
-        try {
-          retryCounter.sleepUntilNextRetry();
-        } catch (InterruptedException e) {
-          // restore the interrupted state
-          Thread.currentThread().interrupt();
-          return false;
-        }
-      }
-    }
-
-    return false;
-  }
-
-  @Override
-  public boolean canReplicateToSameCluster() {
-    return true;
-  }
-
-  @Override
-  protected WALEntryFilter getScopeWALEntryFilter() {
-    // we do not care about scope. We replicate everything.
-    return null;
-  }
-}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java
index 8863f14..b055902 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java
@@ -27,7 +27,6 @@ import org.slf4j.LoggerFactory;
 /**
  * Constructs a {@link ReplicationSourceInterface}
  * Note, not used to create specialized ReplicationSources
- * @see CatalogReplicationSource
  */
 @InterfaceAudience.Private
 public final class ReplicationSourceFactory {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 73efcfe..9f8d8dc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -39,7 +39,6 @@ import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -49,8 +48,6 @@ import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
-import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationPeer;
 import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
@@ -64,9 +61,7 @@ import org.apache.hadoop.hbase.replication.SyncReplicationState;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider;
-import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALFactory;
-import org.apache.hadoop.hbase.wal.WALProvider;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
@@ -183,16 +178,6 @@ public class ReplicationSourceManager {
   private final MetricsReplicationGlobalSourceSource globalMetrics;
 
   /**
-   * A special ReplicationSource for hbase:meta Region Read Replicas.
-   * Usually this reference remains empty. If an hbase:meta Region is opened on this server, we
-   * will create an instance of a hbase:meta CatalogReplicationSource and it will live the life of
-   * the Server thereafter; i.e. we will not shut it down even if the hbase:meta moves away from
-   * this server (in case it later gets moved back). We synchronize on this instance testing for
-   * presence and if absent, while creating so only created and started once.
-   */
-  AtomicReference<ReplicationSourceInterface> catalogReplicationSource = new AtomicReference<>();
-
-  /**
    * Creates a replication manager and sets the watch on all the other registered region servers
    * @param queueStorage the interface for manipulating replication queues
    * @param conf the configuration to use
@@ -1066,78 +1051,6 @@ public class ReplicationSourceManager {
     return this.globalMetrics;
   }
 
-  /**
-   * Add an hbase:meta Catalog replication source. Called on open of an hbase:meta Region.
-   * Create it once only. If exists already, use the existing one.
-   * @see #removeCatalogReplicationSource(RegionInfo)
-   * @see #addSource(String) This is specialization on the addSource method.
-   */
-  public ReplicationSourceInterface addCatalogReplicationSource(RegionInfo regionInfo)
-      throws IOException {
-    // Poor-man's putIfAbsent
-    synchronized (this.catalogReplicationSource) {
-      ReplicationSourceInterface rs = this.catalogReplicationSource.get();
-      return rs != null ? rs :
-        this.catalogReplicationSource.getAndSet(createCatalogReplicationSource(regionInfo));
-    }
-  }
-
-  /**
-   * Remove the hbase:meta Catalog replication source.
-   * Called when we close hbase:meta.
-   * @see #addCatalogReplicationSource(RegionInfo regionInfo)
-   */
-  public void removeCatalogReplicationSource(RegionInfo regionInfo) {
-    // Nothing to do. Leave any CatalogReplicationSource in place in case an hbase:meta Region
-    // comes back to this server.
-  }
-
-  /**
-   * Create, initialize, and start the Catalog ReplicationSource.
-   * Presumes called one-time only (caller must ensure one-time only call).
-   * This ReplicationSource is NOT created via {@link ReplicationSourceFactory}.
-   * @see #addSource(String) This is a specialization of the addSource call.
-   * @see #catalogReplicationSource for a note on this ReplicationSource's lifecycle (and more on
-   *    why the special handling).
-   */
-  private ReplicationSourceInterface createCatalogReplicationSource(RegionInfo regionInfo)
-      throws IOException {
-    // Instantiate meta walProvider. Instantiated here or over in the #warmupRegion call made by the
-    // Master on a 'move' operation. Need to do extra work if we did NOT instantiate the provider.
-    WALProvider walProvider = this.walFactory.getMetaWALProvider();
-    boolean instantiate = walProvider == null;
-    if (instantiate) {
-      walProvider = this.walFactory.getMetaProvider();
-    }
-    // Here we do a specialization on what {@link ReplicationSourceFactory} does. There is no need
-    // for persisting offset into WALs up in zookeeper (via ReplicationQueueInfo) as the catalog
-    // read replicas feature that makes use of the source does a reset on a crash of the WAL
-    // source process. See "4.1 Skip maintaining zookeeper replication queue (offsets/WALs)" in the
-    // design doc attached to HBASE-18070 'Enable memstore replication for meta replica' for detail.
-    CatalogReplicationSourcePeer peer = new CatalogReplicationSourcePeer(this.conf,
-      this.clusterId.toString());
-    final ReplicationSourceInterface crs = new CatalogReplicationSource();
-    crs.init(conf, fs, this, new NoopReplicationQueueStorage(), peer, server, peer.getId(),
-      clusterId, walProvider.getWALFileLengthProvider(), new MetricsSource(peer.getId()));
-    // Add listener on the provider so we can pick up the WAL to replicate on roll.
-    WALActionsListener listener = new WALActionsListener() {
-      @Override public void postLogRoll(Path oldPath, Path newPath) throws IOException {
-        crs.enqueueLog(newPath);
-      }
-    };
-    walProvider.addWALActionsListener(listener);
-    if (!instantiate) {
-      // If we did not instantiate provider, need to add our listener on already-created WAL
-      // instance too (listeners are passed by provider to WAL instance on creation but if provider
-      // created already, our listener add above is missed). And add the current WAL file to the
-      // Replication Source so it can start replicating it.
-      WAL wal = walProvider.getWAL(regionInfo);
-      wal.registerWALActionsListener(listener);
-      crs.enqueueLog(((AbstractFSWAL)wal).getCurrentFileName());
-    }
-    return crs.startup();
-  }
-
   ReplicationQueueStorage getQueueStorage() {
     return queueStorage;
   }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java
index 5583a47..6a46bd4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java
@@ -22,23 +22,15 @@ import java.io.IOException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
 import org.apache.hadoop.hbase.io.HFileLink;
 import org.apache.hadoop.hbase.io.Reference;
-import org.apache.hadoop.hbase.master.MasterServices;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
-import org.apache.hadoop.hbase.replication.ReplicationException;
-import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
-import org.apache.hadoop.hbase.replication.regionserver.RegionReplicaReplicationEndpoint;
-import org.apache.hadoop.hbase.zookeeper.ZKConfig;
 import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Similar to {@link RegionReplicaUtil} but for the server side
@@ -46,8 +38,6 @@ import org.slf4j.LoggerFactory;
 @InterfaceAudience.Private
 public class ServerRegionReplicaUtil extends RegionReplicaUtil {
 
-  private static final Logger LOG = LoggerFactory.getLogger(ServerRegionReplicaUtil.class);
-
   /**
    * Whether asynchronous WAL replication to the secondary region replicas is enabled or not.
    * If this is enabled, a replication peer named "region_replica_replication" will be created
@@ -59,7 +49,6 @@ public class ServerRegionReplicaUtil extends RegionReplicaUtil {
   public static final String REGION_REPLICA_REPLICATION_CONF_KEY
     = "hbase.region.replica.replication.enabled";
   private static final boolean DEFAULT_REGION_REPLICA_REPLICATION = false;
-  public static final String REGION_REPLICA_REPLICATION_PEER = "region_replica_replication";
 
   /**
    * Same as for {@link #REGION_REPLICA_REPLICATION_CONF_KEY} but for catalog replication.
@@ -162,27 +151,6 @@ public class ServerRegionReplicaUtil extends RegionReplicaUtil {
   }
 
   /**
-   * Create replication peer for replicating user-space Region Read Replicas.
-   * This methods should only be called at master side.
-   */
-  public static void setupRegionReplicaReplication(MasterServices services)
-    throws IOException, ReplicationException {
-    if (!isRegionReplicaReplicationEnabled(services.getConfiguration())) {
-      return;
-    }
-    if (services.getReplicationPeerManager().getPeerConfig(REGION_REPLICA_REPLICATION_PEER)
-      .isPresent()) {
-      return;
-    }
-    LOG.info("Region replica replication peer id=" + REGION_REPLICA_REPLICATION_PEER +
-      " not exist. Creating...");
-    ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder()
-      .setClusterKey(ZKConfig.getZooKeeperClusterKey(services.getConfiguration()))
-      .setReplicationEndpointImpl(RegionReplicaReplicationEndpoint.class.getName()).build();
-    services.addReplicationPeer(REGION_REPLICA_REPLICATION_PEER, peerConfig, true);
-  }
-
-  /**
    * @return True if Region Read Replica is enabled for <code>tn</code> (whether hbase:meta or
    *   user-space tables).
    */
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
index 3c2bc3f..fe16c31 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
@@ -296,8 +296,8 @@ public class TestIOFencing {
         FAMILY, Lists.newArrayList(new Path("/a")), Lists.newArrayList(new Path("/b")),
         new Path("store_dir"));
       WALUtil.writeCompactionMarker(compactingRegion.getWAL(),
-          ((HRegion)compactingRegion).getReplicationScope(),
-        oldHri, compactionDescriptor, compactingRegion.getMVCC());
+        ((HRegion) compactingRegion).getReplicationScope(), oldHri, compactionDescriptor,
+        compactingRegion.getMVCC(), null);
 
       // Wait till flush has happened, otherwise there won't be multiple store files
       long startWaitTime = EnvironmentEdgeManager.currentTime();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java
index 4b10110..ef3511c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java
@@ -126,12 +126,6 @@ public class DummyAsyncClusterConnection implements AsyncClusterConnection {
   }
 
   @Override
-  public CompletableFuture<Long> replay(TableName tableName, byte[] encodedRegionName, byte[] row,
-      List<Entry> entries, int replicaId, int numRetries, long operationTimeoutNs) {
-    return null;
-  }
-
-  @Override
   public CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[] row,
       boolean reload) {
     return null;
@@ -169,4 +163,11 @@ public class DummyAsyncClusterConnection implements AsyncClusterConnection {
   public CompletableFuture<List<ServerName>> getAllBootstrapNodes(ServerName regionServer) {
     return null;
   }
+
+  @Override
+  public CompletableFuture<Void> replicate(RegionInfo replica,
+    List<Entry> entries, int numRetries, long rpcTimeoutNs,
+    long operationTimeoutNs) {
+    return null;
+  }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index a5584ff..187dca5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -996,7 +996,7 @@ public class TestHRegion {
             region.getRegionFileSystem().getStoreDir(Bytes.toString(family)));
 
       WALUtil.writeCompactionMarker(region.getWAL(), this.region.getReplicationScope(),
-          this.region.getRegionInfo(), compactionDescriptor, region.getMVCC());
+          this.region.getRegionInfo(), compactionDescriptor, region.getMVCC(), null);
 
       Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicaFailover.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicaFailover.java
index ea6589d..ba7e9d1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicaFailover.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicaFailover.java
@@ -38,7 +38,7 @@ import org.apache.hadoop.hbase.client.Consistency;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.client.TableDescriptor;
-import org.apache.hadoop.hbase.replication.regionserver.TestRegionReplicaReplicationEndpoint;
+import org.apache.hadoop.hbase.replication.regionserver.TestRegionReplicaReplication;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
@@ -64,7 +64,7 @@ public class TestRegionReplicaFailover {
       HBaseClassTestRule.forClass(TestRegionReplicaFailover.class);
 
   private static final Logger LOG =
-      LoggerFactory.getLogger(TestRegionReplicaReplicationEndpoint.class);
+      LoggerFactory.getLogger(TestRegionReplicaReplication.class);
 
   private static final HBaseTestingUtil HTU = new HBaseTestingUtil();
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplication.java
similarity index 74%
rename from hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplicationEndpoint.java
rename to hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplication.java
index 5a06a11..b501ab2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplication.java
@@ -20,14 +20,13 @@ package org.apache.hadoop.hbase.replication.regionserver;
 import static org.apache.hadoop.hbase.client.RegionLocator.LOCATOR_META_REPLICAS_MODE;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import java.util.Objects;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellScanner;
@@ -36,7 +35,6 @@ import org.apache.hadoop.hbase.ClientMetaTableAccessor;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtil;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
 import org.apache.hadoop.hbase.TableName;
@@ -45,7 +43,6 @@ import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.client.RegionLocator;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
@@ -67,18 +64,17 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Tests RegionReplicaReplicationEndpoint class for hbase:meta by setting up region replicas and
- * verifying async wal replication replays the edits to the secondary region in various scenarios.
- *
- * @see TestRegionReplicaReplicationEndpoint
+ * Tests region replication for hbase:meta by setting up region replicas and verifying async wal
+ * replication replays the edits to the secondary region in various scenarios.
+ * @see TestRegionReplicaReplication
  */
-@Category({LargeTests.class})
-public class TestMetaRegionReplicaReplicationEndpoint {
+@Category({ LargeTests.class })
+public class TestMetaRegionReplicaReplication {
+
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
-    HBaseClassTestRule.forClass(TestMetaRegionReplicaReplicationEndpoint.class);
-  private static final Logger LOG =
-    LoggerFactory.getLogger(TestMetaRegionReplicaReplicationEndpoint.class);
+    HBaseClassTestRule.forClass(TestMetaRegionReplicaReplication.class);
+  private static final Logger LOG = LoggerFactory.getLogger(TestMetaRegionReplicaReplication.class);
   private static final int NB_SERVERS = 4;
   private final HBaseTestingUtil HTU = new HBaseTestingUtil();
   private int numOfMetaReplica = NB_SERVERS - 1;
@@ -102,17 +98,15 @@ public class TestMetaRegionReplicaReplicationEndpoint {
     conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
     conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);
     // Enable hbase:meta replication.
-    conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CATALOG_CONF_KEY,
-      true);
+    conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CATALOG_CONF_KEY, true);
     // Set hbase:meta replicas to be 3.
     // conf.setInt(HConstants.META_REPLICAS_NUM, numOfMetaReplica);
     HTU.startMiniCluster(NB_SERVERS);
     // Enable hbase:meta replication.
     HBaseTestingUtil.setReplicas(HTU.getAdmin(), TableName.META_TABLE_NAME, numOfMetaReplica);
 
-    HTU.waitFor(30000,
-      () -> HTU.getMiniHBaseCluster().getRegions(TableName.META_TABLE_NAME).size()
-      >= numOfMetaReplica);
+    HTU.waitFor(30000, () -> HTU.getMiniHBaseCluster().getRegions(TableName.META_TABLE_NAME)
+      .size() >= numOfMetaReplica);
   }
 
   @After
@@ -121,83 +115,19 @@ public class TestMetaRegionReplicaReplicationEndpoint {
   }
 
   /**
-   * Assert that the ReplicationSource for hbase:meta gets created when hbase:meta is opened.
-   */
-  @Test
-  public void testHBaseMetaReplicationSourceCreatedOnOpen() throws Exception {
-    SingleProcessHBaseCluster cluster = HTU.getMiniHBaseCluster();
-    HRegionServer hrs = cluster.getRegionServer(cluster.getServerHoldingMeta());
-    // Replicate a row to prove all working.
-    testHBaseMetaReplicatesOneRow(0);
-    assertTrue(isMetaRegionReplicaReplicationSource(hrs));
-    // Now move the hbase:meta and make sure the ReplicationSource is in both places.
-    HRegionServer hrsOther = null;
-    for (int i = 0; i < cluster.getNumLiveRegionServers(); i++) {
-      hrsOther = cluster.getRegionServer(i);
-      if (hrsOther.getServerName().equals(hrs.getServerName())) {
-        hrsOther = null;
-        continue;
-      }
-      break;
-    }
-    assertNotNull(hrsOther);
-    assertFalse(isMetaRegionReplicaReplicationSource(hrsOther));
-    Region meta = null;
-    for (Region region : hrs.getOnlineRegionsLocalContext()) {
-      if (region.getRegionInfo().isMetaRegion()) {
-        meta = region;
-        break;
-      }
-    }
-    assertNotNull(meta);
-    HTU.moveRegionAndWait(meta.getRegionInfo(), hrsOther.getServerName());
-    // Assert that there is a ReplicationSource in both places now.
-    assertTrue(isMetaRegionReplicaReplicationSource(hrs));
-    assertTrue(isMetaRegionReplicaReplicationSource(hrsOther));
-    // Replicate to show stuff still works.
-    testHBaseMetaReplicatesOneRow(1);
-    // Now pretend a few hours have gone by... roll the meta WAL in original location... Move the
-    // meta back and retry replication. See if it works.
-    hrs.getWAL(meta.getRegionInfo()).rollWriter(true);
-    testHBaseMetaReplicatesOneRow(2);
-    hrs.getWAL(meta.getRegionInfo()).rollWriter(true);
-    testHBaseMetaReplicatesOneRow(3);
-  }
-
-  /**
-   * Test meta region replica replication. Create some tables and see if replicas pick up the
-   * additions.
-   */
-  private void testHBaseMetaReplicatesOneRow(int i) throws Exception {
-    waitForMetaReplicasToOnline();
-    try (Table table = HTU.createTable(TableName.valueOf(this.name.getMethodName() + "_" + i),
-      HConstants.CATALOG_FAMILY)) {
-      verifyReplication(TableName.META_TABLE_NAME, numOfMetaReplica, getMetaCells(table.getName()));
-    }
-  }
-
-  /**
-   * @return Whether the special meta region replica peer is enabled on <code>hrs</code>
-   */
-  private boolean isMetaRegionReplicaReplicationSource(HRegionServer hrs) {
-    return hrs.getReplicationSourceService().getReplicationManager().
-      catalogReplicationSource.get() != null;
-  }
-
-  /**
    * Test meta region replica replication. Create some tables and see if replicas pick up the
    * additions.
    */
   @Test
   public void testHBaseMetaReplicates() throws Exception {
-    try (Table table = HTU
-      .createTable(TableName.valueOf(this.name.getMethodName() + "_0"), HConstants.CATALOG_FAMILY,
-        Arrays.copyOfRange(HBaseTestingUtil.KEYS, 1, HBaseTestingUtil.KEYS.length))) {
+    try (Table table = HTU.createTable(TableName.valueOf(this.name.getMethodName() + "_0"),
+      HConstants.CATALOG_FAMILY,
+      Arrays.copyOfRange(HBaseTestingUtil.KEYS, 1, HBaseTestingUtil.KEYS.length))) {
       verifyReplication(TableName.META_TABLE_NAME, numOfMetaReplica, getMetaCells(table.getName()));
     }
-    try (Table table = HTU
-      .createTable(TableName.valueOf(this.name.getMethodName() + "_1"), HConstants.CATALOG_FAMILY,
-        Arrays.copyOfRange(HBaseTestingUtil.KEYS, 1, HBaseTestingUtil.KEYS.length))) {
+    try (Table table = HTU.createTable(TableName.valueOf(this.name.getMethodName() + "_1"),
+      HConstants.CATALOG_FAMILY,
+      Arrays.copyOfRange(HBaseTestingUtil.KEYS, 1, HBaseTestingUtil.KEYS.length))) {
       verifyReplication(TableName.META_TABLE_NAME, numOfMetaReplica, getMetaCells(table.getName()));
       // Try delete.
       HTU.deleteTableIfAny(table.getName());
@@ -207,26 +137,22 @@ public class TestMetaRegionReplicaReplicationEndpoint {
 
   @Test
   public void testCatalogReplicaReplicationWithFlushAndCompaction() throws Exception {
-    Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
-    TableName tableName = TableName.valueOf("hbase:meta");
-    Table table = connection.getTable(tableName);
-    try {
+    try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
+      Table table = connection.getTable(TableName.META_TABLE_NAME)) {
       // load the data to the table
       for (int i = 0; i < 5; i++) {
         LOG.info("Writing data from " + i * 1000 + " to " + (i * 1000 + 1000));
         HTU.loadNumericRows(table, HConstants.CATALOG_FAMILY, i * 1000, i * 1000 + 1000);
         LOG.info("flushing table");
-        HTU.flush(tableName);
+        HTU.flush(TableName.META_TABLE_NAME);
         LOG.info("compacting table");
         if (i < 4) {
-          HTU.compact(tableName, false);
+          HTU.compact(TableName.META_TABLE_NAME, false);
         }
       }
 
-      verifyReplication(tableName, numOfMetaReplica, 0, 5000, HConstants.CATALOG_FAMILY);
-    } finally {
-      table.close();
-      connection.close();
+      verifyReplication(TableName.META_TABLE_NAME, numOfMetaReplica, 0, 5000,
+        HConstants.CATALOG_FAMILY);
     }
   }
 
@@ -235,7 +161,6 @@ public class TestMetaRegionReplicaReplicationEndpoint {
     SingleProcessHBaseCluster cluster = HTU.getMiniHBaseCluster();
     HRegionServer hrs = cluster.getRegionServer(cluster.getServerHoldingMeta());
 
-    HRegionServer hrsMetaReplica = null;
     HRegionServer hrsNoMetaReplica = null;
     HRegionServer server = null;
     Region metaReplica = null;
@@ -260,11 +185,8 @@ public class TestMetaRegionReplicaReplicationEndpoint {
         hrsNoMetaReplica = server;
       }
     }
-
-    Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
-    TableName tableName = TableName.valueOf("hbase:meta");
-    Table table = connection.getTable(tableName);
-    try {
+    try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
+      Table table = connection.getTable(TableName.META_TABLE_NAME)) {
       // load the data to the table
       for (int i = 0; i < 5; i++) {
         LOG.info("Writing data from " + i * 1000 + " to " + (i * 1000 + 1000));
@@ -274,10 +196,8 @@ public class TestMetaRegionReplicaReplicationEndpoint {
         }
       }
 
-      verifyReplication(tableName, numOfMetaReplica, 0, 5000, HConstants.CATALOG_FAMILY);
-    } finally {
-      table.close();
-      connection.close();
+      verifyReplication(TableName.META_TABLE_NAME, numOfMetaReplica, 0, 5000,
+        HConstants.CATALOG_FAMILY);
     }
   }
 
@@ -324,22 +244,6 @@ public class TestMetaRegionReplicaReplicationEndpoint {
   }
 
   /**
-   * Replicas come online after primary.
-   */
-  private void waitForMetaReplicasToOnline() throws IOException {
-    final RegionLocator regionLocator =
-      HTU.getConnection().getRegionLocator(TableName.META_TABLE_NAME);
-    HTU.waitFor(10000,
-      // getRegionLocations returns an entry for each replica but if unassigned, entry is null.
-      // Pass reload to force us to skip cache else it just keeps returning default.
-      () -> regionLocator.getRegionLocations(HConstants.EMPTY_START_ROW, true).stream().
-        filter(Objects::nonNull).count() >= numOfMetaReplica);
-    List<HRegionLocation> locations = regionLocator.getRegionLocations(HConstants.EMPTY_START_ROW);
-    LOG.info("Found locations {}", locations);
-    assertEquals(numOfMetaReplica, locations.size());
-  }
-
-  /**
    * Scan hbase:meta for <code>tableName</code> content.
    */
   private List<Result> getMetaCells(TableName tableName) throws IOException {
@@ -373,20 +277,9 @@ public class TestMetaRegionReplicaReplicationEndpoint {
     return regions;
   }
 
-  private Region getOneRegion(TableName tableName) {
-    for (int i = 0; i < NB_SERVERS; i++) {
-      HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i);
-      List<HRegion> onlineRegions = rs.getRegions(tableName);
-      if (onlineRegions.size() > 1) {
-        return onlineRegions.get(0);
-      }
-    }
-    return null;
-  }
-
   /**
-   * Verify when a Table is deleted from primary, then there are no references in replicas
-   * (because they get the delete of the table rows too).
+   * Verify when a Table is deleted from primary, then there are no references in replicas (because
+   * they get the delete of the table rows too).
    */
   private void verifyDeletedReplication(TableName tableName, int regionReplication,
     final TableName deletedTableName) {
@@ -417,8 +310,8 @@ public class TestMetaRegionReplicaReplicationEndpoint {
   }
 
   /**
-   * Cells are from hbase:meta replica so will start w/ 'tableName,'; i.e. the tablename followed
-   * by HConstants.DELIMITER. Make sure the deleted table is no longer present in passed
+   * Cells are from hbase:meta replica so will start w/ 'tableName,'; i.e. the tablename followed by
+   * HConstants.DELIMITER. Make sure the deleted table is no longer present in passed
    * <code>cells</code>.
    */
   private boolean doesNotContain(List<Cell> cells, TableName tableName) {
@@ -491,21 +384,19 @@ public class TestMetaRegionReplicaReplicationEndpoint {
   }
 
   private void primaryNoChangeReplicaIncrease(final long[] before, final long[] after) {
-    assertEquals(before[RegionInfo.DEFAULT_REPLICA_ID],
-      after[RegionInfo.DEFAULT_REPLICA_ID]);
+    assertEquals(before[RegionInfo.DEFAULT_REPLICA_ID], after[RegionInfo.DEFAULT_REPLICA_ID]);
 
-    for (int i = 1; i < after.length; i ++) {
+    for (int i = 1; i < after.length; i++) {
       assertTrue(after[i] > before[i]);
     }
   }
 
   private void primaryIncreaseReplicaNoChange(final long[] before, final long[] after) {
     // There are read requests increase for primary meta replica.
-    assertTrue(after[RegionInfo.DEFAULT_REPLICA_ID] >
-      before[RegionInfo.DEFAULT_REPLICA_ID]);
+    assertTrue(after[RegionInfo.DEFAULT_REPLICA_ID] > before[RegionInfo.DEFAULT_REPLICA_ID]);
 
     // No change for replica regions
-    for (int i = 1; i < after.length; i ++) {
+    for (int i = 1; i < after.length; i++) {
       assertEquals(before[i], after[i]);
     }
   }
@@ -515,13 +406,12 @@ public class TestMetaRegionReplicaReplicationEndpoint {
     for (Region r : metaRegions) {
       LOG.info("read request for region {} is {}", r, r.getReadRequestsCount());
       counters[i] = r.getReadRequestsCount();
-      i ++;
+      i++;
     }
   }
 
   @Test
   public void testHBaseMetaReplicaGets() throws Exception {
-
     TableName tn = TableName.valueOf(this.name.getMethodName());
     final Region[] metaRegions = getAllRegions(TableName.META_TABLE_NAME, numOfMetaReplica);
     long[] readReqsForMetaReplicas = new long[numOfMetaReplica];
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplication.java
new file mode 100644
index 0000000..7dd4255
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplication.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import static org.junit.Assert.assertNotNull;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.testclassification.FlakeyTests;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles;
+
+/**
+ * Tests RegionReplicaReplicationEndpoint class by setting up region replicas and verifying
+ * async wal replication replays the edits to the secondary region in various scenarios.
+ */
+@Category({FlakeyTests.class, LargeTests.class})
+public class TestRegionReplicaReplication {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestRegionReplicaReplication.class);
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestRegionReplicaReplication.class);
+
+  private static final int NB_SERVERS = 2;
+
+  private static final HBaseTestingUtil HTU = new HBaseTestingUtil();
+
+  @Rule
+  public TestName name = new TestName();
+
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    Configuration conf = HTU.getConfiguration();
+    conf.setFloat("hbase.regionserver.logroll.multiplier", 0.0003f);
+    conf.setInt("replication.source.size.capacity", 10240);
+    conf.setLong("replication.source.sleepforretries", 100);
+    conf.setInt("hbase.regionserver.maxlogs", 10);
+    conf.setLong("hbase.master.logcleaner.ttl", 10);
+    conf.setInt("zookeeper.recovery.retry", 1);
+    conf.setInt("zookeeper.recovery.retry.intervalmill", 10);
+    conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true);
+    conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
+    conf.setInt("replication.stats.thread.period.seconds", 5);
+    conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
+    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); // less number of retries is needed
+    conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);
+
+    HTU.startMiniCluster(NB_SERVERS);
+  }
+
+  @AfterClass
+  public static void afterClass() throws Exception {
+    HTU.shutdownMiniCluster();
+  }
+
+  private void testRegionReplicaReplication(int regionReplication) throws Exception {
+    // test region replica replication. Create a table with single region, write some data
+    // ensure that data is replicated to the secondary region
+    TableName tableName = TableName.valueOf("testRegionReplicaReplicationWithReplicas_"
+        + regionReplication);
+    TableDescriptor htd = HTU
+      .createModifyableTableDescriptor(TableName.valueOf(tableName.toString()),
+        ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
+        ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED)
+      .setRegionReplication(regionReplication).build();
+    createOrEnableTableWithRetries(htd, true);
+    TableName tableNameNoReplicas =
+        TableName.valueOf("testRegionReplicaReplicationWithReplicas_NO_REPLICAS");
+    HTU.deleteTableIfAny(tableNameNoReplicas);
+    HTU.createTable(tableNameNoReplicas, HBaseTestingUtil.fam1);
+
+    try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
+      Table table = connection.getTable(tableName);
+      Table tableNoReplicas = connection.getTable(tableNameNoReplicas)) {
+      // load some data to the non-replicated table
+      HTU.loadNumericRows(tableNoReplicas, HBaseTestingUtil.fam1, 6000, 7000);
+
+      // load the data to the table
+      HTU.loadNumericRows(table, HBaseTestingUtil.fam1, 0, 1000);
+
+      verifyReplication(tableName, regionReplication, 0, 1000);
+    } finally {
+      HTU.deleteTableIfAny(tableNameNoReplicas);
+    }
+  }
+
+  private void verifyReplication(TableName tableName, int regionReplication,
+      final int startRow, final int endRow) throws Exception {
+    verifyReplication(tableName, regionReplication, startRow, endRow, true);
+  }
+
+  private void verifyReplication(TableName tableName, int regionReplication,
+      final int startRow, final int endRow, final boolean present) throws Exception {
+    // find the regions
+    final Region[] regions = new Region[regionReplication];
+
+    for (int i=0; i < NB_SERVERS; i++) {
+      HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i);
+      List<HRegion> onlineRegions = rs.getRegions(tableName);
+      for (HRegion region : onlineRegions) {
+        regions[region.getRegionInfo().getReplicaId()] = region;
+      }
+    }
+
+    for (Region region : regions) {
+      assertNotNull(region);
+    }
+
+    for (int i = 1; i < regionReplication; i++) {
+      final Region region = regions[i];
+      // wait until all the data is replicated to all secondary regions
+      Waiter.waitFor(HTU.getConfiguration(), 90000, 1000, new Waiter.Predicate<Exception>() {
+        @Override
+        public boolean evaluate() throws Exception {
+          LOG.info("verifying replication for region replica:" + region.getRegionInfo());
+          try {
+            HTU.verifyNumericRows(region, HBaseTestingUtil.fam1, startRow, endRow, present);
+          } catch(Throwable ex) {
+            LOG.warn("Verification from secondary region is not complete yet", ex);
+            // still wait
+            return false;
+          }
+          return true;
+        }
+      });
+    }
+  }
+
+  @Test
+  public void testRegionReplicaReplicationWith2Replicas() throws Exception {
+    testRegionReplicaReplication(2);
+  }
+
+  @Test
+  public void testRegionReplicaReplicationWith3Replicas() throws Exception {
+    testRegionReplicaReplication(3);
+  }
+
+  @Test
+  public void testRegionReplicaReplicationWith10Replicas() throws Exception {
+    testRegionReplicaReplication(10);
+  }
+
+  @Test
+  public void testRegionReplicaWithoutMemstoreReplication() throws Exception {
+    int regionReplication = 3;
+    TableDescriptor htd = HTU.createModifyableTableDescriptor(name.getMethodName())
+      .setRegionReplication(regionReplication).setRegionMemStoreReplication(false).build();
+    createOrEnableTableWithRetries(htd, true);
+    final TableName tableName = htd.getTableName();
+    Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
+    Table table = connection.getTable(tableName);
+    try {
+      // write data to the primary. The replicas should not receive the data
+      final int STEP = 100;
+      for (int i = 0; i < 3; ++i) {
+        final int startRow = i * STEP;
+        final int endRow = (i + 1) * STEP;
+        LOG.info("Writing data from " + startRow + " to " + endRow);
+        HTU.loadNumericRows(table, HBaseTestingUtil.fam1, startRow, endRow);
+        verifyReplication(tableName, regionReplication, startRow, endRow, false);
+
+        // Flush the table, now the data should show up in the replicas
+        LOG.info("flushing table");
+        HTU.flush(tableName);
+        verifyReplication(tableName, regionReplication, 0, endRow, true);
+      }
+    } finally {
+      table.close();
+      connection.close();
+    }
+  }
+
+  @Test
+  public void testRegionReplicaReplicationForFlushAndCompaction() throws Exception {
+    // Tests a table with region replication 3. Writes some data, and causes flushes and
+    // compactions. Verifies that the data is readable from the replicas. Note that this
+    // does not test whether the replicas actually pick up flushed files and apply compaction
+    // to their stores
+    int regionReplication = 3;
+    TableDescriptor htd = HTU.createModifyableTableDescriptor(name.getMethodName())
+      .setRegionReplication(regionReplication).build();
+    createOrEnableTableWithRetries(htd, true);
+    final TableName tableName = htd.getTableName();
+
+    Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
+    Table table = connection.getTable(tableName);
+    try {
+      // load the data to the table
+
+      for (int i = 0; i < 6000; i += 1000) {
+        LOG.info("Writing data from " + i + " to " + (i+1000));
+        HTU.loadNumericRows(table, HBaseTestingUtil.fam1, i, i+1000);
+        LOG.info("flushing table");
+        HTU.flush(tableName);
+        LOG.info("compacting table");
+        HTU.compact(tableName, false);
+      }
+
+      verifyReplication(tableName, regionReplication, 0, 1000);
+    } finally {
+      table.close();
+      connection.close();
+    }
+  }
+
+  private void createOrEnableTableWithRetries(TableDescriptor htd, boolean createTableOperation) {
+    // Helper function to run create/enable table operations with a retry feature
+    boolean continueToRetry = true;
+    int tries = 0;
+    while (continueToRetry && tries < 50) {
+      try {
+        continueToRetry = false;
+        if (createTableOperation) {
+          HTU.getAdmin().createTable(htd);
+        } else {
+          HTU.getAdmin().enableTable(htd.getTableName());
+        }
+      } catch (IOException e) {
+        if (e.getCause() instanceof ReplicationException) {
+          continueToRetry = true;
+          tries++;
+          Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+        }
+      }
+    }
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java
deleted file mode 100644
index d238e09..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java
+++ /dev/null
@@ -1,515 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication.regionserver;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.Cell.Type;
-import org.apache.hadoop.hbase.CellBuilderFactory;
-import org.apache.hadoop.hbase.CellBuilderType;
-import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HBaseTestingUtil;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.Waiter;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.RegionLocator;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.client.TableDescriptor;
-import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
-import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
-import org.apache.hadoop.hbase.regionserver.Region;
-import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
-import org.apache.hadoop.hbase.replication.ReplicationException;
-import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
-import org.apache.hadoop.hbase.testclassification.FlakeyTests;
-import org.apache.hadoop.hbase.testclassification.LargeTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
-import org.apache.hadoop.hbase.wal.WAL.Entry;
-import org.apache.hadoop.hbase.wal.WALEdit;
-import org.apache.hadoop.hbase.wal.WALKeyImpl;
-import org.apache.hadoop.hbase.zookeeper.ZKConfig;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
-import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles;
-
-/**
- * Tests RegionReplicaReplicationEndpoint class by setting up region replicas and verifying
- * async wal replication replays the edits to the secondary region in various scenarios.
- */
-@Category({FlakeyTests.class, LargeTests.class})
-public class TestRegionReplicaReplicationEndpoint {
-
-  @ClassRule
-  public static final HBaseClassTestRule CLASS_RULE =
-      HBaseClassTestRule.forClass(TestRegionReplicaReplicationEndpoint.class);
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(TestRegionReplicaReplicationEndpoint.class);
-
-  private static final int NB_SERVERS = 2;
-
-  private static final HBaseTestingUtil HTU = new HBaseTestingUtil();
-
-  @Rule
-  public TestName name = new TestName();
-
-  @BeforeClass
-  public static void beforeClass() throws Exception {
-    Configuration conf = HTU.getConfiguration();
-    conf.setFloat("hbase.regionserver.logroll.multiplier", 0.0003f);
-    conf.setInt("replication.source.size.capacity", 10240);
-    conf.setLong("replication.source.sleepforretries", 100);
-    conf.setInt("hbase.regionserver.maxlogs", 10);
-    conf.setLong("hbase.master.logcleaner.ttl", 10);
-    conf.setInt("zookeeper.recovery.retry", 1);
-    conf.setInt("zookeeper.recovery.retry.intervalmill", 10);
-    conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true);
-    conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
-    conf.setInt("replication.stats.thread.period.seconds", 5);
-    conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
-    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); // less number of retries is needed
-    conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);
-
-    HTU.startMiniCluster(NB_SERVERS);
-  }
-
-  @AfterClass
-  public static void afterClass() throws Exception {
-    HTU.shutdownMiniCluster();
-  }
-
-  @Test
-  public void testRegionReplicaReplicationPeerIsCreated() throws IOException {
-    // create a table with region replicas. Check whether the replication peer is created
-    // and replication started.
-    try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
-      Admin admin = connection.getAdmin()) {
-      String peerId = ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER;
-
-      ReplicationPeerConfig peerConfig = null;
-      try {
-        peerConfig = admin.getReplicationPeerConfig(peerId);
-      } catch (ReplicationPeerNotFoundException e) {
-        LOG.warn("Region replica replication peer id=" + peerId + " not exist", e);
-      }
-
-      if (peerConfig != null) {
-        admin.removeReplicationPeer(peerId);
-        peerConfig = null;
-      }
-
-      TableDescriptor htd = HTU
-        .createTableDescriptor(TableName.valueOf("testReplicationPeerIsCreated_no_region_replicas"),
-          ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
-          ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
-      createOrEnableTableWithRetries(htd, true);
-      try {
-        peerConfig = admin.getReplicationPeerConfig(peerId);
-        fail("Should throw ReplicationException, because replication peer id=" + peerId
-          + " not exist");
-      } catch (ReplicationPeerNotFoundException e) {
-      }
-      assertNull(peerConfig);
-
-      htd = HTU.createModifyableTableDescriptor(TableName.valueOf("testReplicationPeerIsCreated"),
-        ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
-        ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED).setRegionReplication(2).build();
-      createOrEnableTableWithRetries(htd, true);
-
-      // assert peer configuration is correct
-      peerConfig = admin.getReplicationPeerConfig(peerId);
-      assertNotNull(peerConfig);
-      assertEquals(peerConfig.getClusterKey(),
-        ZKConfig.getZooKeeperClusterKey(HTU.getConfiguration()));
-      assertEquals(RegionReplicaReplicationEndpoint.class.getName(),
-        peerConfig.getReplicationEndpointImpl());
-    }
-  }
-
-  @Test
-  public void testRegionReplicaReplicationPeerIsCreatedForModifyTable() throws Exception {
-    // modify a table by adding region replicas. Check whether the replication peer is created
-    // and replication started.
-    try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
-      Admin admin = connection.getAdmin()) {
-      String peerId = "region_replica_replication";
-
-      ReplicationPeerConfig peerConfig = null;
-      try {
-        peerConfig = admin.getReplicationPeerConfig(peerId);
-      } catch (ReplicationPeerNotFoundException e) {
-        LOG.warn("Region replica replication peer id=" + peerId + " not exist", e);
-      }
-
-      if (peerConfig != null) {
-        admin.removeReplicationPeer(peerId);
-        peerConfig = null;
-      }
-
-      TableDescriptor htd = HTU.createTableDescriptor(
-        TableName.valueOf("testRegionReplicaReplicationPeerIsCreatedForModifyTable"),
-        ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
-        ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
-      createOrEnableTableWithRetries(htd, true);
-
-      // assert that replication peer is not created yet
-      try {
-        peerConfig = admin.getReplicationPeerConfig(peerId);
-        fail("Should throw ReplicationException, because replication peer id=" + peerId
-          + " not exist");
-      } catch (ReplicationPeerNotFoundException e) {
-      }
-      assertNull(peerConfig);
-
-      HTU.getAdmin().disableTable(htd.getTableName());
-      htd = TableDescriptorBuilder.newBuilder(htd).setRegionReplication(2).build();
-      HTU.getAdmin().modifyTable(htd);
-      createOrEnableTableWithRetries(htd, false);
-
-      // assert peer configuration is correct
-      peerConfig = admin.getReplicationPeerConfig(peerId);
-      assertNotNull(peerConfig);
-      assertEquals(peerConfig.getClusterKey(),
-        ZKConfig.getZooKeeperClusterKey(HTU.getConfiguration()));
-      assertEquals(RegionReplicaReplicationEndpoint.class.getName(),
-        peerConfig.getReplicationEndpointImpl());
-    }
-  }
-
-  public void testRegionReplicaReplication(int regionReplication) throws Exception {
-    // test region replica replication. Create a table with single region, write some data
-    // ensure that data is replicated to the secondary region
-    TableName tableName = TableName.valueOf("testRegionReplicaReplicationWithReplicas_"
-        + regionReplication);
-    TableDescriptor htd = HTU
-      .createModifyableTableDescriptor(TableName.valueOf(tableName.toString()),
-        ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
-        ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED)
-      .setRegionReplication(regionReplication).build();
-    createOrEnableTableWithRetries(htd, true);
-    TableName tableNameNoReplicas =
-        TableName.valueOf("testRegionReplicaReplicationWithReplicas_NO_REPLICAS");
-    HTU.deleteTableIfAny(tableNameNoReplicas);
-    HTU.createTable(tableNameNoReplicas, HBaseTestingUtil.fam1);
-
-    Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
-    Table table = connection.getTable(tableName);
-    Table tableNoReplicas = connection.getTable(tableNameNoReplicas);
-
-    try {
-      // load some data to the non-replicated table
-      HTU.loadNumericRows(tableNoReplicas, HBaseTestingUtil.fam1, 6000, 7000);
-
-      // load the data to the table
-      HTU.loadNumericRows(table, HBaseTestingUtil.fam1, 0, 1000);
-
-      verifyReplication(tableName, regionReplication, 0, 1000);
-
-    } finally {
-      table.close();
-      tableNoReplicas.close();
-      HTU.deleteTableIfAny(tableNameNoReplicas);
-      connection.close();
-    }
-  }
-
-  private void verifyReplication(TableName tableName, int regionReplication,
-      final int startRow, final int endRow) throws Exception {
-    verifyReplication(tableName, regionReplication, startRow, endRow, true);
-  }
-
-  private void verifyReplication(TableName tableName, int regionReplication,
-      final int startRow, final int endRow, final boolean present) throws Exception {
-    // find the regions
-    final Region[] regions = new Region[regionReplication];
-
-    for (int i=0; i < NB_SERVERS; i++) {
-      HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i);
-      List<HRegion> onlineRegions = rs.getRegions(tableName);
-      for (HRegion region : onlineRegions) {
-        regions[region.getRegionInfo().getReplicaId()] = region;
-      }
-    }
-
-    for (Region region : regions) {
-      assertNotNull(region);
-    }
-
-    for (int i = 1; i < regionReplication; i++) {
-      final Region region = regions[i];
-      // wait until all the data is replicated to all secondary regions
-      Waiter.waitFor(HTU.getConfiguration(), 90000, 1000, new Waiter.Predicate<Exception>() {
-        @Override
-        public boolean evaluate() throws Exception {
-          LOG.info("verifying replication for region replica:" + region.getRegionInfo());
-          try {
-            HTU.verifyNumericRows(region, HBaseTestingUtil.fam1, startRow, endRow, present);
-          } catch(Throwable ex) {
-            LOG.warn("Verification from secondary region is not complete yet", ex);
-            // still wait
-            return false;
-          }
-          return true;
-        }
-      });
-    }
-  }
-
-  @Test
-  public void testRegionReplicaReplicationWith2Replicas() throws Exception {
-    testRegionReplicaReplication(2);
-  }
-
-  @Test
-  public void testRegionReplicaReplicationWith3Replicas() throws Exception {
-    testRegionReplicaReplication(3);
-  }
-
-  @Test
-  public void testRegionReplicaReplicationWith10Replicas() throws Exception {
-    testRegionReplicaReplication(10);
-  }
-
-  @Test
-  public void testRegionReplicaWithoutMemstoreReplication() throws Exception {
-    int regionReplication = 3;
-    TableDescriptor htd = HTU.createModifyableTableDescriptor(name.getMethodName())
-      .setRegionReplication(regionReplication).setRegionMemStoreReplication(false).build();
-    createOrEnableTableWithRetries(htd, true);
-    final TableName tableName = htd.getTableName();
-    Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
-    Table table = connection.getTable(tableName);
-    try {
-      // write data to the primary. The replicas should not receive the data
-      final int STEP = 100;
-      for (int i = 0; i < 3; ++i) {
-        final int startRow = i * STEP;
-        final int endRow = (i + 1) * STEP;
-        LOG.info("Writing data from " + startRow + " to " + endRow);
-        HTU.loadNumericRows(table, HBaseTestingUtil.fam1, startRow, endRow);
-        verifyReplication(tableName, regionReplication, startRow, endRow, false);
-
-        // Flush the table, now the data should show up in the replicas
-        LOG.info("flushing table");
-        HTU.flush(tableName);
-        verifyReplication(tableName, regionReplication, 0, endRow, true);
-      }
-    } finally {
-      table.close();
-      connection.close();
-    }
-  }
-
-  @Test
-  public void testRegionReplicaReplicationForFlushAndCompaction() throws Exception {
-    // Tests a table with region replication 3. Writes some data, and causes flushes and
-    // compactions. Verifies that the data is readable from the replicas. Note that this
-    // does not test whether the replicas actually pick up flushed files and apply compaction
-    // to their stores
-    int regionReplication = 3;
-    TableDescriptor htd = HTU.createModifyableTableDescriptor(name.getMethodName())
-      .setRegionReplication(regionReplication).build();
-    createOrEnableTableWithRetries(htd, true);
-    final TableName tableName = htd.getTableName();
-
-    Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
-    Table table = connection.getTable(tableName);
-    try {
-      // load the data to the table
-
-      for (int i = 0; i < 6000; i += 1000) {
-        LOG.info("Writing data from " + i + " to " + (i+1000));
-        HTU.loadNumericRows(table, HBaseTestingUtil.fam1, i, i+1000);
-        LOG.info("flushing table");
-        HTU.flush(tableName);
-        LOG.info("compacting table");
-        HTU.compact(tableName, false);
-      }
-
-      verifyReplication(tableName, regionReplication, 0, 1000);
-    } finally {
-      table.close();
-      connection.close();
-    }
-  }
-
-  @Test
-  public void testRegionReplicaReplicationIgnoresDisabledTables() throws Exception {
-    testRegionReplicaReplicationIgnores(false, false);
-  }
-
-  @Test
-  public void testRegionReplicaReplicationIgnoresDroppedTables() throws Exception {
-    testRegionReplicaReplicationIgnores(true, false);
-  }
-
-  @Test
-  public void testRegionReplicaReplicationIgnoresNonReplicatedTables() throws Exception {
-    testRegionReplicaReplicationIgnores(false, true);
-  }
-
-  private void testRegionReplicaReplicationIgnores(boolean dropTable, boolean disableReplication)
-      throws Exception {
-    // tests having edits from a disabled or dropped table is handled correctly by skipping those
-    // entries and further edits after the edits from dropped/disabled table can be replicated
-    // without problems.
-    int regionReplication = 3;
-    TableDescriptor htd = HTU
-      .createModifyableTableDescriptor(
-        name.getMethodName() + "_drop_" + dropTable + "_disabledReplication_" + disableReplication)
-      .setRegionReplication(regionReplication).build();
-    final TableName tableName = htd.getTableName();
-    HTU.deleteTableIfAny(tableName);
-
-    createOrEnableTableWithRetries(htd, true);
-    TableName toBeDisabledTable = TableName.valueOf(
-      dropTable ? "droppedTable" : (disableReplication ? "disableReplication" : "disabledTable"));
-    HTU.deleteTableIfAny(toBeDisabledTable);
-    htd = HTU
-      .createModifyableTableDescriptor(toBeDisabledTable,
-        ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
-        ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED)
-      .setRegionReplication(regionReplication).build();
-    createOrEnableTableWithRetries(htd, true);
-
-    // both tables are created, now pause replication
-    HTU.getAdmin().disableReplicationPeer(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER);
-
-    // now that the replication is disabled, write to the table to be dropped, then drop the table.
-
-    Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
-    Table table = connection.getTable(tableName);
-    Table tableToBeDisabled = connection.getTable(toBeDisabledTable);
-
-    HTU.loadNumericRows(tableToBeDisabled, HBaseTestingUtil.fam1, 6000, 7000);
-
-    RegionLocator rl = connection.getRegionLocator(toBeDisabledTable);
-    HRegionLocation hrl = rl.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY);
-    byte[] encodedRegionName = hrl.getRegion().getEncodedNameAsBytes();
-
-    Cell cell = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(Bytes.toBytes("A"))
-        .setFamily(HTU.fam1).setValue(Bytes.toBytes("VAL")).setType(Type.Put).build();
-    Entry entry = new Entry(
-      new WALKeyImpl(encodedRegionName, toBeDisabledTable, 1),
-        new WALEdit()
-            .add(cell));
-    HTU.getAdmin().disableTable(toBeDisabledTable); // disable the table
-    if (dropTable) {
-      HTU.getAdmin().deleteTable(toBeDisabledTable);
-    } else if (disableReplication) {
-      htd =
-        TableDescriptorBuilder.newBuilder(htd).setRegionReplication(regionReplication - 2).build();
-      HTU.getAdmin().modifyTable(htd);
-      createOrEnableTableWithRetries(htd, false);
-    }
-
-    HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(0);
-    MetricsSource metrics = mock(MetricsSource.class);
-    ReplicationEndpoint.Context ctx =
-      new ReplicationEndpoint.Context(rs, HTU.getConfiguration(), HTU.getConfiguration(),
-        HTU.getTestFileSystem(), ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER,
-        UUID.fromString(rs.getClusterId()), rs.getReplicationSourceService().getReplicationPeers()
-          .getPeer(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER),
-        metrics, rs.getTableDescriptors(), rs);
-    RegionReplicaReplicationEndpoint rrpe = new RegionReplicaReplicationEndpoint();
-    rrpe.init(ctx);
-    rrpe.start();
-    ReplicationEndpoint.ReplicateContext repCtx = new ReplicationEndpoint.ReplicateContext();
-    repCtx.setEntries(Lists.newArrayList(entry, entry));
-    assertTrue(rrpe.replicate(repCtx));
-    verify(metrics, times(1)).incrLogEditsFiltered(eq(2L));
-    rrpe.stop();
-    if (disableReplication) {
-      // enable replication again so that we can verify replication
-      HTU.getAdmin().disableTable(toBeDisabledTable); // disable the table
-      htd = TableDescriptorBuilder.newBuilder(htd).setRegionReplication(regionReplication).build();
-      HTU.getAdmin().modifyTable(htd);
-      createOrEnableTableWithRetries(htd, false);
-    }
-
-    try {
-      // load some data to the to-be-dropped table
-      // load the data to the table
-      HTU.loadNumericRows(table, HBaseTestingUtil.fam1, 0, 1000);
-
-      // now enable the replication
-      HTU.getAdmin().enableReplicationPeer(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER);
-
-      verifyReplication(tableName, regionReplication, 0, 1000);
-    } finally {
-      table.close();
-      rl.close();
-      tableToBeDisabled.close();
-      HTU.deleteTableIfAny(toBeDisabledTable);
-      connection.close();
-    }
-  }
-
-  private void createOrEnableTableWithRetries(TableDescriptor htd, boolean createTableOperation) {
-    // Helper function to run create/enable table operations with a retry feature
-    boolean continueToRetry = true;
-    int tries = 0;
-    while (continueToRetry && tries < 50) {
-      try {
-        continueToRetry = false;
-        if (createTableOperation) {
-          HTU.getAdmin().createTable(htd);
-        } else {
-          HTU.getAdmin().enableTable(htd.getTableName());
-        }
-      } catch (IOException e) {
-        if (e.getCause() instanceof ReplicationException) {
-          continueToRetry = true;
-          tries++;
-          Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
-        }
-      }
-    }
-  }
-}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
deleted file mode 100644
index a37ab5f..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
+++ /dev/null
@@ -1,281 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication.regionserver;
-
-import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.closeRegion;
-import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.openRegion;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Optional;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HBaseTestingUtil;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.RegionLocations;
-import org.apache.hadoop.hbase.StartTestingClusterOption;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.AsyncClusterConnection;
-import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
-import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
-import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.client.RegionLocator;
-import org.apache.hadoop.hbase.client.RegionReplicaUtil;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.client.TableDescriptor;
-import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
-import org.apache.hadoop.hbase.coprocessor.ObserverContext;
-import org.apache.hadoop.hbase.coprocessor.WALCoprocessor;
-import org.apache.hadoop.hbase.coprocessor.WALCoprocessorEnvironment;
-import org.apache.hadoop.hbase.coprocessor.WALObserver;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
-import org.apache.hadoop.hbase.regionserver.Region;
-import org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster;
-import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
-import org.apache.hadoop.hbase.replication.ReplicationEndpoint.ReplicateContext;
-import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.testclassification.ReplicationTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
-import org.apache.hadoop.hbase.wal.WAL.Entry;
-import org.apache.hadoop.hbase.wal.WALEdit;
-import org.apache.hadoop.hbase.wal.WALKey;
-import org.apache.hadoop.hbase.wal.WALKeyImpl;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
-
-/**
- * Tests RegionReplicaReplicationEndpoint. Unlike TestRegionReplicaReplicationEndpoint this
- * class contains lower level tests using callables.
- */
-@Category({ReplicationTests.class, MediumTests.class})
-public class TestRegionReplicaReplicationEndpointNoMaster {
-
-  @ClassRule
-  public static final HBaseClassTestRule CLASS_RULE =
-      HBaseClassTestRule.forClass(TestRegionReplicaReplicationEndpointNoMaster.class);
-
-  private static final int NB_SERVERS = 2;
-  private static TableName tableName = TableName.valueOf(
-    TestRegionReplicaReplicationEndpointNoMaster.class.getSimpleName());
-  private static Table table;
-  private static final byte[] row = Bytes.toBytes("TestRegionReplicaReplicator");
-
-  private static HRegionServer rs0;
-  private static HRegionServer rs1;
-
-  private static RegionInfo hriPrimary;
-  private static RegionInfo hriSecondary;
-
-  private static final HBaseTestingUtil HTU = new HBaseTestingUtil();
-  private static final byte[] f = HConstants.CATALOG_FAMILY;
-
-  @BeforeClass
-  public static void beforeClass() throws Exception {
-    Configuration conf = HTU.getConfiguration();
-    conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true);
-    conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_WAIT_FOR_PRIMARY_FLUSH_CONF_KEY, false);
-
-    // install WALObserver coprocessor for tests
-    String walCoprocs = HTU.getConfiguration().get(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY);
-    if (walCoprocs == null) {
-      walCoprocs = WALEditCopro.class.getName();
-    } else {
-      walCoprocs += "," + WALEditCopro.class.getName();
-    }
-    HTU.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY,
-      walCoprocs);
-    StartTestingClusterOption option = StartTestingClusterOption.builder()
-      .numAlwaysStandByMasters(1).numRegionServers(NB_SERVERS).numDataNodes(NB_SERVERS).build();
-    HTU.startMiniCluster(option);
-
-    // Create table then get the single region for our new table.
-    TableDescriptor htd = HTU.createTableDescriptor(TableName.valueOf(tableName.getNameAsString()),
-      ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
-      ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
-    table = HTU.createTable(htd, new byte[][]{f}, null);
-
-    try (RegionLocator locator = HTU.getConnection().getRegionLocator(tableName)) {
-      hriPrimary = locator.getRegionLocation(row, false).getRegion();
-    }
-
-    // mock a secondary region info to open
-    hriSecondary = RegionReplicaUtil.getRegionInfoForReplica(hriPrimary, 1);
-
-    // No master
-    TestRegionServerNoMaster.stopMasterAndCacheMetaLocation(HTU);
-    rs0 = HTU.getMiniHBaseCluster().getRegionServer(0);
-    rs1 = HTU.getMiniHBaseCluster().getRegionServer(1);
-  }
-
-  @AfterClass
-  public static void afterClass() throws Exception {
-    HRegionServer.TEST_SKIP_REPORTING_TRANSITION = false;
-    table.close();
-    HTU.shutdownMiniCluster();
-  }
-
-  @Before
-  public void before() throws Exception {
-    entries.clear();
-  }
-
-  @After
-  public void after() throws Exception {
-  }
-
-  static ConcurrentLinkedQueue<Entry> entries = new ConcurrentLinkedQueue<>();
-
-  public static class WALEditCopro implements WALCoprocessor, WALObserver {
-    public WALEditCopro() {
-      entries.clear();
-    }
-
-    @Override
-    public Optional<WALObserver> getWALObserver() {
-      return Optional.of(this);
-    }
-
-    @Override
-    public void postWALWrite(ObserverContext<? extends WALCoprocessorEnvironment> ctx,
-                             RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {
-      // only keep primary region's edits
-      if (logKey.getTableName().equals(tableName) && info.getReplicaId() == 0) {
-        // Presume type is a WALKeyImpl
-        entries.add(new Entry((WALKeyImpl)logKey, logEdit));
-      }
-    }
-  }
-
-  @Test
-  public void testReplayCallable() throws Exception {
-    // tests replaying the edits to a secondary region replica using the Callable directly
-    openRegion(HTU, rs0, hriSecondary);
-
-    // load some data to primary
-    HTU.loadNumericRows(table, f, 0, 1000);
-
-    Assert.assertEquals(1000, entries.size());
-    try (AsyncClusterConnection conn = ClusterConnectionFactory
-      .createAsyncClusterConnection(HTU.getConfiguration(), null, User.getCurrent())) {
-      // replay the edits to the secondary using replay callable
-      replicateUsingCallable(conn, entries);
-    }
-
-    Region region = rs0.getRegion(hriSecondary.getEncodedName());
-    HTU.verifyNumericRows(region, f, 0, 1000);
-
-    HTU.deleteNumericRows(table, f, 0, 1000);
-    closeRegion(HTU, rs0, hriSecondary);
-  }
-
-  private void replicateUsingCallable(AsyncClusterConnection connection, Queue<Entry> entries)
-      throws IOException, ExecutionException, InterruptedException {
-    Entry entry;
-    while ((entry = entries.poll()) != null) {
-      byte[] row = CellUtil.cloneRow(entry.getEdit().getCells().get(0));
-      RegionLocations locations = connection.getRegionLocations(tableName, row, true).get();
-      connection
-        .replay(tableName, locations.getRegionLocation(1).getRegion().getEncodedNameAsBytes(), row,
-          Collections.singletonList(entry), 1, Integer.MAX_VALUE, TimeUnit.SECONDS.toNanos(10))
-        .get();
-    }
-  }
-
-  @Test
-  public void testReplayCallableWithRegionMove() throws Exception {
-    // tests replaying the edits to a secondary region replica using the Callable directly while
-    // the region is moved to another location.It tests handling of RME.
-    try (AsyncClusterConnection conn = ClusterConnectionFactory
-      .createAsyncClusterConnection(HTU.getConfiguration(), null, User.getCurrent())) {
-      openRegion(HTU, rs0, hriSecondary);
-      // load some data to primary
-      HTU.loadNumericRows(table, f, 0, 1000);
-
-      Assert.assertEquals(1000, entries.size());
-
-      // replay the edits to the secondary using replay callable
-      replicateUsingCallable(conn, entries);
-
-      Region region = rs0.getRegion(hriSecondary.getEncodedName());
-      HTU.verifyNumericRows(region, f, 0, 1000);
-
-      HTU.loadNumericRows(table, f, 1000, 2000); // load some more data to primary
-
-      // move the secondary region from RS0 to RS1
-      closeRegion(HTU, rs0, hriSecondary);
-      openRegion(HTU, rs1, hriSecondary);
-
-      // replicate the new data
-      replicateUsingCallable(conn, entries);
-
-      region = rs1.getRegion(hriSecondary.getEncodedName());
-      // verify the new data. old data may or may not be there
-      HTU.verifyNumericRows(region, f, 1000, 2000);
-
-      HTU.deleteNumericRows(table, f, 0, 2000);
-      closeRegion(HTU, rs1, hriSecondary);
-    }
-  }
-
-  @Test
-  public void testRegionReplicaReplicationEndpointReplicate() throws Exception {
-    // tests replaying the edits to a secondary region replica using the RRRE.replicate()
-    openRegion(HTU, rs0, hriSecondary);
-    RegionReplicaReplicationEndpoint replicator = new RegionReplicaReplicationEndpoint();
-
-    ReplicationEndpoint.Context context = mock(ReplicationEndpoint.Context.class);
-    when(context.getConfiguration()).thenReturn(HTU.getConfiguration());
-    when(context.getMetrics()).thenReturn(mock(MetricsSource.class));
-    when(context.getServer()).thenReturn(rs0);
-    when(context.getTableDescriptors()).thenReturn(rs0.getTableDescriptors());
-    replicator.init(context);
-    replicator.startAsync();
-
-    //load some data to primary
-    HTU.loadNumericRows(table, f, 0, 1000);
-
-    Assert.assertEquals(1000, entries.size());
-    // replay the edits to the secondary using replay callable
-    final String fakeWalGroupId = "fakeWALGroup";
-    replicator.replicate(new ReplicateContext().setEntries(Lists.newArrayList(entries))
-        .setWalGroupId(fakeWalGroupId));
-    replicator.stop();
-    Region region = rs0.getRegion(hriSecondary.getEncodedName());
-    HTU.verifyNumericRows(region, f, 0, 1000);
-
-    HTU.deleteNumericRows(table, f, 0, 1000);
-    closeRegion(HTU, rs0, hriSecondary);
-  }
-}

[hbase] 04/06: HBASE-26449 The way we add or clear failedReplicas may have race (#3846)

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch HBASE-26233
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 8083542f2b51241da38d3dc0f448102933657b47
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Sun Nov 14 20:52:04 2021 +0800

    HBASE-26449 The way we add or clear failedReplicas may have race (#3846)
    
    Signed-off-by: Xin Sun <dd...@gmail.com>
---
 .../regionreplication/RegionReplicationSink.java   |  28 ++++-
 .../TestRegionReplicationSink.java                 | 130 ++++++++++++++++++++-
 2 files changed, 150 insertions(+), 8 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java
index 9c6f6e2..68aa508 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java
@@ -30,6 +30,7 @@ import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
+import org.agrona.collections.IntHashSet;
 import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
@@ -123,7 +124,7 @@ public class RegionReplicationSink {
 
   // used to track the replicas which we failed to replicate edits to them
   // will be cleared after we get a flush edit.
-  private final Set<Integer> failedReplicas = new HashSet<>();
+  private final IntHashSet failedReplicas = new IntHashSet();
 
   private final Queue<SinkEntry> entries = new ArrayDeque<>();
 
@@ -135,6 +136,8 @@ public class RegionReplicationSink {
 
   private volatile long pendingSize;
 
+  private long lastFlushSequenceNumber;
+
   private boolean sending;
 
   private boolean stopping;
@@ -162,8 +165,10 @@ public class RegionReplicationSink {
 
   private void onComplete(List<SinkEntry> sent,
     Map<Integer, MutableObject<Throwable>> replica2Error) {
+    long maxSequenceId = Long.MIN_VALUE;
     long toReleaseSize = 0;
     for (SinkEntry entry : sent) {
+      maxSequenceId = Math.max(maxSequenceId, entry.key.getSequenceId());
       entry.replicated();
       toReleaseSize += entry.size;
     }
@@ -173,9 +178,20 @@ public class RegionReplicationSink {
       Integer replicaId = entry.getKey();
       Throwable error = entry.getValue().getValue();
       if (error != null) {
-        LOG.warn("Failed to replicate to secondary replica {} for {}, stop replicating" +
-          " for a while and trigger a flush", replicaId, primary, error);
-        failed.add(replicaId);
+        if (maxSequenceId > lastFlushSequenceNumber) {
+          LOG.warn(
+            "Failed to replicate to secondary replica {} for {}, since the max sequence" +
+              " id of sunk entris is {}, which is greater than the last flush SN {}," +
+              " we will stop replicating for a while and trigger a flush",
+            replicaId, primary, maxSequenceId, lastFlushSequenceNumber, error);
+          failed.add(replicaId);
+        } else {
+          LOG.warn(
+            "Failed to replicate to secondary replica {} for {}, since the max sequence" +
+              " id of sunk entris is {}, which is less than or equal to the last flush SN {}," +
+              " we will not stop replicating",
+            replicaId, primary, maxSequenceId, lastFlushSequenceNumber, error);
+        }
       }
     }
     synchronized (entries) {
@@ -215,6 +231,9 @@ public class RegionReplicationSink {
     AtomicInteger remaining = new AtomicInteger(toSendReplicaCount);
     Map<Integer, MutableObject<Throwable>> replica2Error = new HashMap<>();
     for (int replicaId = 1; replicaId < regionReplication; replicaId++) {
+      if (failedReplicas.contains(replicaId)) {
+        continue;
+      }
       MutableObject<Throwable> error = new MutableObject<>();
       replica2Error.put(replicaId, error);
       RegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(primary, replicaId);
@@ -292,6 +311,7 @@ public class RegionReplicationSink {
                   break;
                 }
               }
+              lastFlushSequenceNumber = flushDesc.getFlushSequenceNumber();
               failedReplicas.clear();
               LOG.debug(
                 "Got a flush all request with sequence id {}, clear failed replicas {}" +
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSink.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSink.java
index 19b1698..248cdba 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSink.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSink.java
@@ -28,11 +28,19 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.apache.commons.lang3.mutable.MutableInt;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.TableNameTestRule;
@@ -45,14 +53,20 @@ import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
 import org.apache.hadoop.hbase.ipc.ServerCall;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.hbase.wal.WALKeyImpl;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
+
 @Category({ RegionServerTests.class, MediumTests.class })
 public class TestRegionReplicationSink {
 
@@ -72,6 +86,8 @@ public class TestRegionReplicationSink {
 
   private RegionReplicationBufferManager manager;
 
+  private RegionReplicationSink sink;
+
   @Rule
   public final TableNameTestRule name = new TableNameTestRule();
 
@@ -84,15 +100,17 @@ public class TestRegionReplicationSink {
     flushRequester = mock(Runnable.class);
     conn = mock(AsyncClusterConnection.class);
     manager = mock(RegionReplicationBufferManager.class);
+    sink = new RegionReplicationSink(conf, primary, td, manager, flushRequester, conn);
   }
 
-  private RegionReplicationSink create() {
-    return new RegionReplicationSink(conf, primary, td, manager, flushRequester, conn);
+  @After
+  public void tearDown() throws InterruptedException {
+    sink.stop();
+    sink.waitUntilStopped();
   }
 
   @Test
   public void testNormal() {
-    RegionReplicationSink sink = create();
     MutableInt next = new MutableInt(0);
     List<CompletableFuture<Void>> futures =
       Arrays.asList(new CompletableFuture<>(), new CompletableFuture<>());
@@ -129,7 +147,6 @@ public class TestRegionReplicationSink {
 
   @Test
   public void testDropEdits() {
-    RegionReplicationSink sink = create();
     MutableInt next = new MutableInt(0);
     List<CompletableFuture<Void>> futures =
       Arrays.asList(new CompletableFuture<>(), new CompletableFuture<>());
@@ -191,4 +208,109 @@ public class TestRegionReplicationSink {
     // replicas
     verify(conn, times(2)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong());
   }
+
+  @Test
+  public void testNotAddToFailedReplicas() {
+    MutableInt next = new MutableInt(0);
+    List<CompletableFuture<Void>> futures =
+      Stream.generate(() -> new CompletableFuture<Void>()).limit(4).collect(Collectors.toList());
+    when(conn.replicate(any(), anyList(), anyInt(), anyLong(), anyLong()))
+      .then(i -> futures.get(next.getAndIncrement()));
+
+    ServerCall<?> rpcCall1 = mock(ServerCall.class);
+    WALKeyImpl key1 = mock(WALKeyImpl.class);
+    when(key1.estimatedSerializedSizeOf()).thenReturn(100L);
+    when(key1.getSequenceId()).thenReturn(1L);
+    WALEdit edit1 = mock(WALEdit.class);
+    when(edit1.estimatedSerializedSizeOf()).thenReturn(1000L);
+    when(manager.increase(anyLong())).thenReturn(true);
+    sink.add(key1, edit1, rpcCall1);
+
+    ServerCall<?> rpcCall2 = mock(ServerCall.class);
+    WALKeyImpl key2 = mock(WALKeyImpl.class);
+    when(key2.estimatedSerializedSizeOf()).thenReturn(200L);
+    when(key2.getSequenceId()).thenReturn(3L);
+
+    Map<byte[], List<Path>> committedFiles = td.getColumnFamilyNames().stream()
+      .collect(Collectors.toMap(Function.identity(), k -> Collections.emptyList(), (u, v) -> {
+        throw new IllegalStateException();
+      }, () -> new TreeMap<>(Bytes.BYTES_COMPARATOR)));
+    FlushDescriptor fd =
+      ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH, primary, 2L, committedFiles);
+    WALEdit edit2 = WALEdit.createFlushWALEdit(primary, fd);
+    sink.add(key2, edit2, rpcCall2);
+
+    // fail the call to replica 2
+    futures.get(0).complete(null);
+    futures.get(1).completeExceptionally(new IOException("inject error"));
+
+    // the failure should not cause replica 2 to be added to failedReplicas, as we have already
+    // trigger a flush after it.
+    verify(conn, times(4)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong());
+
+    futures.get(2).complete(null);
+    futures.get(3).complete(null);
+
+    // should have send out all so no pending entries.
+    assertEquals(0, sink.pendingSize());
+  }
+
+  @Test
+  public void testAddToFailedReplica() {
+    MutableInt next = new MutableInt(0);
+    List<CompletableFuture<Void>> futures =
+      Stream.generate(() -> new CompletableFuture<Void>()).limit(5).collect(Collectors.toList());
+    when(conn.replicate(any(), anyList(), anyInt(), anyLong(), anyLong()))
+      .then(i -> futures.get(next.getAndIncrement()));
+
+    ServerCall<?> rpcCall1 = mock(ServerCall.class);
+    WALKeyImpl key1 = mock(WALKeyImpl.class);
+    when(key1.estimatedSerializedSizeOf()).thenReturn(100L);
+    when(key1.getSequenceId()).thenReturn(1L);
+    WALEdit edit1 = mock(WALEdit.class);
+    when(edit1.estimatedSerializedSizeOf()).thenReturn(1000L);
+    when(manager.increase(anyLong())).thenReturn(true);
+    sink.add(key1, edit1, rpcCall1);
+
+    ServerCall<?> rpcCall2 = mock(ServerCall.class);
+    WALKeyImpl key2 = mock(WALKeyImpl.class);
+    when(key2.estimatedSerializedSizeOf()).thenReturn(200L);
+    when(key2.getSequenceId()).thenReturn(1L);
+    WALEdit edit2 = mock(WALEdit.class);
+    when(edit2.estimatedSerializedSizeOf()).thenReturn(2000L);
+    when(manager.increase(anyLong())).thenReturn(true);
+    sink.add(key2, edit2, rpcCall2);
+
+    // fail the call to replica 2
+    futures.get(0).complete(null);
+    futures.get(1).completeExceptionally(new IOException("inject error"));
+
+    // we should only call replicate once for edit2, since replica 2 is marked as failed
+    verify(conn, times(3)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong());
+    futures.get(2).complete(null);
+    // should have send out all so no pending entries.
+    assertEquals(0, sink.pendingSize());
+
+    ServerCall<?> rpcCall3 = mock(ServerCall.class);
+    WALKeyImpl key3 = mock(WALKeyImpl.class);
+    when(key3.estimatedSerializedSizeOf()).thenReturn(200L);
+    when(key3.getSequenceId()).thenReturn(3L);
+    Map<byte[], List<Path>> committedFiles = td.getColumnFamilyNames().stream()
+      .collect(Collectors.toMap(Function.identity(), k -> Collections.emptyList(), (u, v) -> {
+        throw new IllegalStateException();
+      }, () -> new TreeMap<>(Bytes.BYTES_COMPARATOR)));
+    FlushDescriptor fd =
+      ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH, primary, 2L, committedFiles);
+    WALEdit edit3 = WALEdit.createFlushWALEdit(primary, fd);
+    sink.add(key3, edit3, rpcCall3);
+
+    // the flush marker should have cleared the failedReplicas, so we will send the edit to 2
+    // replicas again
+    verify(conn, times(5)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong());
+    futures.get(3).complete(null);
+    futures.get(4).complete(null);
+
+    // should have send out all so no pending entries.
+    assertEquals(0, sink.pendingSize());
+  }
 }

[hbase] 06/06: HBASE-26457 Should not always clear all the failed replicas when getting a flush all request (#3850)

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch HBASE-26233
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 116f462653615acb9473aca942aedc9df4211c29
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Wed Nov 17 23:20:22 2021 +0800

    HBASE-26457 Should not always clear all the failed replicas when getting a flush all request (#3850)
    
    Signed-off-by: Xiaolin Ha <ha...@apache.org>
---
 .../regionreplication/RegionReplicationSink.java   | 119 +++++++++++++--------
 .../TestRegionReplicationSink.java                 |  87 +++++++++++++++
 2 files changed, 162 insertions(+), 44 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java
index 9095870..d5e2387 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java
@@ -22,15 +22,16 @@ import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Queue;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
-import org.agrona.collections.IntHashSet;
 import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
@@ -123,8 +124,11 @@ public class RegionReplicationSink {
   private final AsyncClusterConnection conn;
 
   // used to track the replicas which we failed to replicate edits to them
-  // will be cleared after we get a flush edit.
-  private final IntHashSet failedReplicas = new IntHashSet();
+  // the key is the replica id, the value is the sequence id of the last failed edit
+  // when we get a flush all request, we will try to remove a replica from this map, the key point
+  // here is the flush sequence number must be greater than the failed sequence id, otherwise we
+  // should not remove the replica from this map
+  private final Map<Integer, Long> failedReplicas = new HashMap<>();
 
   private final Queue<SinkEntry> entries = new ArrayDeque<>();
 
@@ -180,16 +184,16 @@ public class RegionReplicationSink {
       if (error != null) {
         if (maxSequenceId > lastFlushedSequenceId) {
           LOG.warn(
-            "Failed to replicate to secondary replica {} for {}, since the max sequence" +
-              " id of sunk entris is {}, which is greater than the last flush SN {}," +
-              " we will stop replicating for a while and trigger a flush",
+            "Failed to replicate to secondary replica {} for {}, since the max sequence"
+              + " id of sunk entris is {}, which is greater than the last flush SN {},"
+              + " we will stop replicating for a while and trigger a flush",
             replicaId, primary, maxSequenceId, lastFlushedSequenceId, error);
           failed.add(replicaId);
         } else {
           LOG.warn(
-            "Failed to replicate to secondary replica {} for {}, since the max sequence" +
-              " id of sunk entris is {}, which is less than or equal to the last flush SN {}," +
-              " we will not stop replicating",
+            "Failed to replicate to secondary replica {} for {}, since the max sequence"
+              + " id of sunk entris is {}, which is less than or equal to the last flush SN {},"
+              + " we will not stop replicating",
             replicaId, primary, maxSequenceId, lastFlushedSequenceId, error);
         }
       }
@@ -197,7 +201,9 @@ public class RegionReplicationSink {
     synchronized (entries) {
       pendingSize -= toReleaseSize;
       if (!failed.isEmpty()) {
-        failedReplicas.addAll(failed);
+        for (Integer replicaId : failed) {
+          failedReplicas.put(replicaId, maxSequenceId);
+        }
         flushRequester.requestFlush(maxSequenceId);
       }
       sending = false;
@@ -231,7 +237,7 @@ public class RegionReplicationSink {
     AtomicInteger remaining = new AtomicInteger(toSendReplicaCount);
     Map<Integer, MutableObject<Throwable>> replica2Error = new HashMap<>();
     for (int replicaId = 1; replicaId < regionReplication; replicaId++) {
-      if (failedReplicas.contains(replicaId)) {
+      if (failedReplicas.containsKey(replicaId)) {
         continue;
       }
       MutableObject<Throwable> error = new MutableObject<>();
@@ -247,7 +253,7 @@ public class RegionReplicationSink {
     }
   }
 
-  private boolean flushAllStores(FlushDescriptor flushDesc) {
+  private boolean isFlushAllStores(FlushDescriptor flushDesc) {
     Set<byte[]> storesFlushed =
       flushDesc.getStoreFlushesList().stream().map(sfd -> sfd.getFamilyName().toByteArray())
         .collect(Collectors.toCollection(() -> new TreeSet<>(Bytes.BYTES_COMPARATOR)));
@@ -257,6 +263,24 @@ public class RegionReplicationSink {
     return storesFlushed.containsAll(tableDesc.getColumnFamilyNames());
   }
 
+  private Optional<FlushDescriptor> getFlushAllDescriptor(Cell metaCell) {
+    if (!CellUtil.matchingFamily(metaCell, WALEdit.METAFAMILY)) {
+      return Optional.empty();
+    }
+    FlushDescriptor flushDesc;
+    try {
+      flushDesc = WALEdit.getFlushDescriptor(metaCell);
+    } catch (IOException e) {
+      LOG.warn("Failed to parse FlushDescriptor from {}", metaCell);
+      return Optional.empty();
+    }
+    if (flushDesc != null && isFlushAllStores(flushDesc)) {
+      return Optional.of(flushDesc);
+    } else {
+      return Optional.empty();
+    }
+  }
+
   private void clearAllEntries() {
     long toClearSize = 0;
     for (SinkEntry entry : entries) {
@@ -268,6 +292,20 @@ public class RegionReplicationSink {
     manager.decrease(toClearSize);
   }
 
+  private void clearFailedReplica(long flushSequenceNumber) {
+    for (Iterator<Map.Entry<Integer, Long>> iter = failedReplicas.entrySet().iterator(); iter
+      .hasNext();) {
+      Map.Entry<Integer, Long> entry = iter.next();
+      if (entry.getValue().longValue() < flushSequenceNumber) {
+        LOG.debug(
+          "Got a flush all request with sequence id {}, clear failed replica {}" +
+            " with last failed sequence id {}",
+          flushSequenceNumber, entry.getKey(), entry.getValue());
+        iter.remove();
+      }
+    }
+  }
+
   /**
    * Add this edit to replication queue.
    * <p/>
@@ -287,41 +325,34 @@ public class RegionReplicationSink {
         // check whether we flushed all stores, which means we could drop all the previous edits,
         // and also, recover from the previous failure of some replicas
         for (Cell metaCell : edit.getCells()) {
-          if (CellUtil.matchingFamily(metaCell, WALEdit.METAFAMILY)) {
-            FlushDescriptor flushDesc;
-            try {
-              flushDesc = WALEdit.getFlushDescriptor(metaCell);
-            } catch (IOException e) {
-              LOG.warn("Failed to parse FlushDescriptor from {}", metaCell);
-              continue;
-            }
-            if (flushDesc != null && flushAllStores(flushDesc)) {
-              long flushedSequenceId = flushDesc.getFlushSequenceNumber();
-              int toClearCount = 0;
-              long toClearSize = 0;
-              for (;;) {
-                SinkEntry e = entries.peek();
-                if (e == null) {
-                  break;
-                }
-                if (e.key.getSequenceId() < flushedSequenceId) {
-                  entries.poll();
-                  toClearCount++;
-                  toClearSize += e.size;
-                } else {
-                  break;
-                }
+          getFlushAllDescriptor(metaCell).ifPresent(flushDesc -> {
+            long flushSequenceNumber = flushDesc.getFlushSequenceNumber();
+            int toClearCount = 0;
+            long toClearSize = 0;
+            for (;;) {
+              SinkEntry e = entries.peek();
+              if (e == null) {
+                break;
               }
-              lastFlushedSequenceId = flushedSequenceId;
-              failedReplicas.clear();
+              if (e.key.getSequenceId() < flushSequenceNumber) {
+                entries.poll();
+                toClearCount++;
+                toClearSize += e.size;
+              } else {
+                break;
+              }
+            }
+            lastFlushedSequenceId = flushSequenceNumber;
+            if (LOG.isDebugEnabled()) {
               LOG.debug(
-                "Got a flush all request with sequence id {}, clear failed replicas {}" +
-                  " and {} pending entries with size {}",
-                flushedSequenceId, failedReplicas, toClearCount,
+                "Got a flush all request with sequence id {}, clear {} pending"
+                  + " entries with size {}",
+                flushSequenceNumber, toClearCount,
                 StringUtils.TraditionalBinaryPrefix.long2String(toClearSize, "", 1));
-              flushRequester.recordFlush(flushedSequenceId);
             }
-          }
+            clearFailedReplica(flushSequenceNumber);
+            flushRequester.recordFlush(flushSequenceNumber);
+          });
         }
       }
       if (failedReplicas.size() == regionReplication - 1) {
@@ -340,7 +371,7 @@ public class RegionReplicationSink {
         // failed
         clearAllEntries();
         for (int replicaId = 1; replicaId < regionReplication; replicaId++) {
-          failedReplicas.add(replicaId);
+          failedReplicas.put(replicaId, entry.key.getSequenceId());
         }
         flushRequester.requestFlush(entry.key.getSequenceId());
       }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSink.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSink.java
index 248cdba..76a224b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSink.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSink.java
@@ -313,4 +313,91 @@ public class TestRegionReplicationSink {
     // should have send out all so no pending entries.
     assertEquals(0, sink.pendingSize());
   }
+
+  @Test
+  public void testNotClearFailedReplica() {
+    // simulate this scenario:
+    // 1. prepare flush
+    // 2. add one edit broken
+    // 3. commit flush with flush sequence number less than the previous edit(this is the normal
+    // case)
+    // we should not clear the failed replica as we do not flush the broken edit out with this
+    // flush, we need an extra flush to flush it out
+    MutableInt next = new MutableInt(0);
+    List<CompletableFuture<Void>> futures =
+      Stream.generate(() -> new CompletableFuture<Void>()).limit(8).collect(Collectors.toList());
+    when(conn.replicate(any(), anyList(), anyInt(), anyLong(), anyLong()))
+      .then(i -> futures.get(next.getAndIncrement()));
+    when(manager.increase(anyLong())).thenReturn(true);
+
+    ServerCall<?> rpcCall1 = mock(ServerCall.class);
+    WALKeyImpl key1 = mock(WALKeyImpl.class);
+    when(key1.estimatedSerializedSizeOf()).thenReturn(100L);
+    when(key1.getSequenceId()).thenReturn(1L);
+    Map<byte[], List<Path>> committedFiles = td.getColumnFamilyNames().stream()
+      .collect(Collectors.toMap(Function.identity(), k -> Collections.emptyList(), (u, v) -> {
+        throw new IllegalStateException();
+      }, () -> new TreeMap<>(Bytes.BYTES_COMPARATOR)));
+    FlushDescriptor fd =
+      ProtobufUtil.toFlushDescriptor(FlushAction.START_FLUSH, primary, 1L, committedFiles);
+    WALEdit edit1 = WALEdit.createFlushWALEdit(primary, fd);
+    sink.add(key1, edit1, rpcCall1);
+
+    futures.get(0).complete(null);
+    futures.get(1).complete(null);
+
+    ServerCall<?> rpcCall2 = mock(ServerCall.class);
+    WALKeyImpl key2 = mock(WALKeyImpl.class);
+    when(key2.estimatedSerializedSizeOf()).thenReturn(200L);
+    when(key2.getSequenceId()).thenReturn(2L);
+    WALEdit edit2 = mock(WALEdit.class);
+    when(edit2.estimatedSerializedSizeOf()).thenReturn(2000L);
+    sink.add(key2, edit2, rpcCall2);
+
+    // fail the call to replica 1
+    futures.get(2).completeExceptionally(new IOException("inject error"));
+    futures.get(3).complete(null);
+
+    ServerCall<?> rpcCall3 = mock(ServerCall.class);
+    WALKeyImpl key3 = mock(WALKeyImpl.class);
+    when(key3.estimatedSerializedSizeOf()).thenReturn(300L);
+    when(key3.getSequenceId()).thenReturn(3L);
+    FlushDescriptor fd3 =
+      ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH, primary, 1L, committedFiles);
+    WALEdit edit3 = WALEdit.createFlushWALEdit(primary, fd3);
+    sink.add(key3, edit3, rpcCall3);
+
+    // we should only call replicate once for edit3, since replica 1 is marked as failed, and the
+    // flush request can not clean the failed replica since the flush sequence number is not greater
+    // than sequence id of the last failed edit
+    verify(conn, times(5)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong());
+    futures.get(4).complete(null);
+
+    ServerCall<?> rpcCall4 = mock(ServerCall.class);
+    WALKeyImpl key4 = mock(WALKeyImpl.class);
+    when(key4.estimatedSerializedSizeOf()).thenReturn(400L);
+    when(key4.getSequenceId()).thenReturn(4L);
+    WALEdit edit4 = mock(WALEdit.class);
+    when(edit4.estimatedSerializedSizeOf()).thenReturn(4000L);
+    sink.add(key4, edit4, rpcCall4);
+
+    // still, only send to replica 2
+    verify(conn, times(6)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong());
+    futures.get(5).complete(null);
+
+    ServerCall<?> rpcCall5 = mock(ServerCall.class);
+    WALKeyImpl key5 = mock(WALKeyImpl.class);
+    when(key5.estimatedSerializedSizeOf()).thenReturn(300L);
+    when(key5.getSequenceId()).thenReturn(3L);
+    FlushDescriptor fd5 =
+      ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH, primary, 4L, committedFiles);
+    WALEdit edit5 = WALEdit.createFlushWALEdit(primary, fd5);
+    sink.add(key5, edit5, rpcCall5);
+
+    futures.get(6).complete(null);
+    futures.get(7).complete(null);
+    // should have cleared the failed replica because the flush sequence number is greater than than
+    // the sequence id of the last failed edit
+    verify(conn, times(8)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong());
+  }
 }