You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2021/11/16 07:07:51 UTC

[hbase] 03/05: HBASE-26413 Limit the total size of buffered region replication entries (#3844)

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch HBASE-26233
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 13423d0d39b04f569f794a4362ce0f759d68b954
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Sat Nov 13 10:45:54 2021 +0800

    HBASE-26413 Limit the total size of buffered region replication entries (#3844)
    
    Signed-off-by: GeorryHuang <hu...@apache.org>
---
 .../apache/hadoop/hbase/regionserver/HRegion.java  |  12 +-
 .../hadoop/hbase/regionserver/HRegionServer.java   |  10 +-
 .../hbase/regionserver/RegionServerServices.java   |  12 ++
 .../RegionReplicationBufferManager.java            | 145 +++++++++++++++
 .../RegionReplicationSink.java                     | 106 ++++++++---
 .../hadoop/hbase/regionserver/wal/WALUtil.java     |   2 +-
 .../hadoop/hbase/MockRegionServerServices.java     |  15 +-
 .../hadoop/hbase/master/MockRegionServer.java      |  13 +-
 .../TestRegionReplicationBufferManager.java        | 125 +++++++++++++
 .../TestRegionReplicationSink.java                 | 194 +++++++++++++++++++++
 .../TestMetaRegionReplicaReplication.java          |   5 -
 11 files changed, 595 insertions(+), 44 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index d3da9f3..f533d23 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -145,6 +145,7 @@ import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl.Write
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
 import org.apache.hadoop.hbase.regionserver.compactions.ForbidMajorCompactionChecker;
+import org.apache.hadoop.hbase.regionserver.regionreplication.RegionReplicationSink;
 import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
 import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
 import org.apache.hadoop.hbase.regionserver.throttle.StoreHotnessProtector;
@@ -1104,11 +1105,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       return;
     }
     status.setStatus("Initializaing region replication sink");
-    regionReplicationSink = Optional.of(new RegionReplicationSink(conf, regionInfo, td, () -> {
-      rss.getFlushRequester().requestFlush(this, new ArrayList<>(td.getColumnFamilyNames()),
-        FlushLifeCycleTracker.DUMMY);
-    }, rss.getAsyncClusterConnection()));
-
+    regionReplicationSink = Optional.of(new RegionReplicationSink(conf, regionInfo, td,
+      rss.getRegionReplicationBufferManager(), () -> rss.getFlushRequester().requestFlush(this,
+        new ArrayList<>(td.getColumnFamilyNames()), FlushLifeCycleTracker.DUMMY),
+      rss.getAsyncClusterConnection()));
   }
 
   /**
@@ -2491,7 +2491,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     boolean isCompactionNeeded();
   }
 
-  FlushResultImpl flushcache(boolean flushAllStores, boolean writeFlushRequestWalMarker,
+  public FlushResultImpl flushcache(boolean flushAllStores, boolean writeFlushRequestWalMarker,
     FlushLifeCycleTracker tracker) throws IOException {
     List<byte[]> families = null;
     if (flushAllStores) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 4bf2d9c..d81c5b3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -126,6 +126,7 @@ import org.apache.hadoop.hbase.regionserver.handler.RSProcedureHandler;
 import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler;
 import org.apache.hadoop.hbase.regionserver.http.RSDumpServlet;
 import org.apache.hadoop.hbase.regionserver.http.RSStatusServlet;
+import org.apache.hadoop.hbase.regionserver.regionreplication.RegionReplicationBufferManager;
 import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory;
 import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
@@ -459,6 +460,7 @@ public class HRegionServer extends HBaseServerBase<RSRpcServices>
   // A timer to shutdown the process if abort takes too long
   private Timer abortMonitor;
 
+  private RegionReplicationBufferManager regionReplicationBufferManager;
   /**
    * Starts a HRegionServer at the default location.
    * <p/>
@@ -643,6 +645,7 @@ public class HRegionServer extends HBaseServerBase<RSRpcServices>
       initializeZooKeeper();
       setupClusterConnection();
       bootstrapNodeManager = new BootstrapNodeManager(asyncClusterConnection, masterAddressTracker);
+      regionReplicationBufferManager = new RegionReplicationBufferManager(this);
       // Setup RPC client for master communication
       this.rpcClient = asyncClusterConnection.getRpcClient();
     } catch (Throwable t) {
@@ -881,7 +884,7 @@ public class HRegionServer extends HBaseServerBase<RSRpcServices>
       closeUserRegions(abortRequested.get());
       LOG.info("stopping server " + this.serverName);
     }
-
+    regionReplicationBufferManager.stop();
     closeClusterConnection();
     // Closing the compactSplit thread before closing meta regions
     if (!this.killed && containsMetaTableRegions()) {
@@ -3495,4 +3498,9 @@ public class HRegionServer extends HBaseServerBase<RSRpcServices>
     shutdownChore(fsUtilizationChore);
     shutdownChore(slowLogTableOpsChore);
   }
+
+  @Override
+  public RegionReplicationBufferManager getRegionReplicationBufferManager() {
+    return regionReplicationBufferManager;
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
index fce8df1..ec27fb2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
 import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
 import org.apache.hadoop.hbase.quotas.RegionSizeStore;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
+import org.apache.hadoop.hbase.regionserver.regionreplication.RegionReplicationBufferManager;
 import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 import org.apache.hadoop.hbase.security.access.AccessChecker;
 import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher;
@@ -317,4 +318,15 @@ public interface RegionServerServices extends Server, MutableOnlineRegions, Favo
    * @return {@link ZKPermissionWatcher}
    */
   ZKPermissionWatcher getZKPermissionWatcher();
+
+  RegionReplicationBufferManager getRegionReplicationBufferManager();
+
+  @Override
+  HRegion getRegion(String encodedRegionName);
+
+  @Override
+  List<HRegion> getRegions(TableName tableName) throws IOException;
+
+  @Override
+  List<HRegion> getRegions();
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationBufferManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationBufferManager.java
new file mode 100644
index 0000000..bda3cb4
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationBufferManager.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.regionreplication;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * Manager the buffer size for all {@link RegionReplicationSink}.
+ * <p/>
+ * If the buffer size exceeds the soft limit, we will find out the region with largest pending size
+ * and trigger a flush, so it can drop all the pending entries and save memories.
+ * <p/>
+ * If the buffer size exceeds the hard limit, we will return {@code false} for
+ * {@link #increase(long)} and let the {@link RegionReplicationSink} to drop the edits immediately.
+ */
+@InterfaceAudience.Private
+public class RegionReplicationBufferManager {
+
+  private static final Logger LOG = LoggerFactory.getLogger(RegionReplicationBufferManager.class);
+
+  /**
+   * This is the total size of pending entries for all the sinks.
+   */
+  public static final String MAX_PENDING_SIZE = "hbase.region.read-replica.sink.max-pending-size";
+
+  public static final long MAX_PENDING_SIZE_DEFAULT = 100L * 1024 * 1024;
+
+  public static final String SOFT_LIMIT_PERCENTAGE =
+    "hbase.region.read-replica.sink.max-pending-size.soft-limit-percentage";
+
+  public static final float SOFT_LIMIT_PERCENTAGE_DEFAULT = 0.8f;
+
+  private final RegionServerServices rsServices;
+
+  private final long maxPendingSize;
+
+  private final long softMaxPendingSize;
+
+  private final AtomicLong pendingSize = new AtomicLong();
+
+  private final ThreadPoolExecutor executor;
+
+  public RegionReplicationBufferManager(RegionServerServices rsServices) {
+    this.rsServices = rsServices;
+    Configuration conf = rsServices.getConfiguration();
+    this.maxPendingSize = conf.getLong(MAX_PENDING_SIZE, MAX_PENDING_SIZE_DEFAULT);
+    this.softMaxPendingSize =
+      (long) (conf.getFloat(SOFT_LIMIT_PERCENTAGE, SOFT_LIMIT_PERCENTAGE_DEFAULT) * maxPendingSize);
+    this.executor = new ThreadPoolExecutor(
+      1, 1, 1, TimeUnit.SECONDS, new SynchronousQueue<>(), new ThreadFactoryBuilder()
+        .setDaemon(true).setNameFormat("Region-Replication-Flusher-%d").build(),
+      (r, e) -> LOG.debug("A flush task is ongoing, drop the new scheduled one"));
+    executor.allowCoreThreadTimeOut(true);
+  }
+
+  private void flush() {
+    long max = Long.MIN_VALUE;
+    HRegion toFlush = null;
+    for (HRegion region : rsServices.getRegions()) {
+      Optional<RegionReplicationSink> sink = region.getRegionReplicationSink();
+      if (sink.isPresent()) {
+        RegionReplicationSink s = sink.get();
+        long p = s.pendingSize();
+        if (p > max) {
+          max = p;
+          toFlush = region;
+        }
+      }
+    }
+    if (toFlush != null) {
+      // here we need to write flush marker out, so we can drop all the pending edits in the region
+      // replication sink.
+      try {
+        LOG.info("Going to flush {} with {} pending entry size", toFlush.getRegionInfo(),
+          StringUtils.TraditionalBinaryPrefix.long2String(max, "", 1));
+        FlushResult result = toFlush.flushcache(true, true, FlushLifeCycleTracker.DUMMY);
+        if (!result.isFlushSucceeded()) {
+          LOG.warn("Failed to flush {}, the result is {}", toFlush.getRegionInfo(),
+            result.getResult());
+        }
+      } catch (IOException e) {
+        LOG.warn("Failed to flush {}", toFlush.getRegionInfo(), e);
+      }
+    } else {
+      // usually this should not happen but since the flush operation is async, theoretically it
+      // could happen. Let's log it, no real harm.
+      LOG.warn("Can not find a region to flush");
+    }
+  }
+
+  /**
+   * Return whether we should just drop all the edits, if we have reached the hard limit of max
+   * pending size.
+   * @return {@code true} means OK, {@code false} means drop all the edits.
+   */
+  public boolean increase(long size) {
+    long sz = pendingSize.addAndGet(size);
+    if (sz > softMaxPendingSize) {
+      executor.execute(this::flush);
+    }
+    return sz <= maxPendingSize;
+  }
+
+  /**
+   * Called after you ship the edits out.
+   */
+  public void decrease(long size) {
+    pendingSize.addAndGet(-size);
+  }
+
+  public void stop() {
+    executor.shutdown();
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java
similarity index 74%
rename from hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionReplicationSink.java
rename to hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java
index cdd77e8..9c6f6e2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionReplicationSink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hbase.regionserver;
+package org.apache.hadoop.hbase.regionserver.regionreplication;
 
 import java.io.IOException;
 import java.util.ArrayDeque;
@@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.util.FutureUtils;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.hbase.wal.WALKeyImpl;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -60,10 +61,6 @@ public class RegionReplicationSink {
 
   private static final Logger LOG = LoggerFactory.getLogger(RegionReplicationSink.class);
 
-  public static final String MAX_PENDING_SIZE = "hbase.region.read-replica.sink.max-pending-size";
-
-  public static final long MAX_PENDING_SIZE_DEFAULT = 10L * 1024 * 1024;
-
   public static final String RETRIES_NUMBER = "hbase.region.read-replica.sink.retries.number";
 
   public static final int RETRIES_NUMBER_DEFAULT = 3;
@@ -85,10 +82,13 @@ public class RegionReplicationSink {
 
     final ServerCall<?> rpcCall;
 
+    final long size;
+
     SinkEntry(WALKeyImpl key, WALEdit edit, ServerCall<?> rpcCall) {
       this.key = key;
       this.edit = edit;
       this.rpcCall = rpcCall;
+      this.size = key.estimatedSerializedSizeOf() + edit.estimatedSerializedSizeOf();
       if (rpcCall != null) {
         // increase the reference count to avoid the rpc framework free the memory before we
         // actually sending them out.
@@ -112,6 +112,11 @@ public class RegionReplicationSink {
 
   private final TableDescriptor tableDesc;
 
+  // store it here to avoid passing it every time when calling TableDescriptor.getRegionReplication.
+  private final int regionReplication;
+
+  private final RegionReplicationBufferManager manager;
+
   private final Runnable flushRequester;
 
   private final AsyncClusterConnection conn;
@@ -128,20 +133,24 @@ public class RegionReplicationSink {
 
   private final long operationTimeoutNs;
 
+  private volatile long pendingSize;
+
   private boolean sending;
 
   private boolean stopping;
 
   private boolean stopped;
 
-  RegionReplicationSink(Configuration conf, RegionInfo primary, TableDescriptor td,
-    Runnable flushRequester, AsyncClusterConnection conn) {
+  public RegionReplicationSink(Configuration conf, RegionInfo primary, TableDescriptor td,
+    RegionReplicationBufferManager manager, Runnable flushRequester, AsyncClusterConnection conn) {
     Preconditions.checkArgument(RegionReplicaUtil.isDefaultReplica(primary), "%s is not primary",
       primary);
-    Preconditions.checkArgument(td.getRegionReplication() > 1,
-      "region replication should be greater than 1 but got %s", td.getRegionReplication());
+    this.regionReplication = td.getRegionReplication();
+    Preconditions.checkArgument(this.regionReplication > 1,
+      "region replication should be greater than 1 but got %s", this.regionReplication);
     this.primary = primary;
     this.tableDesc = td;
+    this.manager = manager;
     this.flushRequester = flushRequester;
     this.conn = conn;
     this.retries = conf.getInt(RETRIES_NUMBER, RETRIES_NUMBER_DEFAULT);
@@ -153,7 +162,12 @@ public class RegionReplicationSink {
 
   private void onComplete(List<SinkEntry> sent,
     Map<Integer, MutableObject<Throwable>> replica2Error) {
-    sent.forEach(SinkEntry::replicated);
+    long toReleaseSize = 0;
+    for (SinkEntry entry : sent) {
+      entry.replicated();
+      toReleaseSize += entry.size;
+    }
+    manager.decrease(toReleaseSize);
     Set<Integer> failed = new HashSet<>();
     for (Map.Entry<Integer, MutableObject<Throwable>> entry : replica2Error.entrySet()) {
       Integer replicaId = entry.getKey();
@@ -165,6 +179,7 @@ public class RegionReplicationSink {
       }
     }
     synchronized (entries) {
+      pendingSize -= toReleaseSize;
       if (!failed.isEmpty()) {
         failedReplicas.addAll(failed);
         flushRequester.run();
@@ -190,7 +205,7 @@ public class RegionReplicationSink {
       }
       toSend.add(entry);
     }
-    int toSendReplicaCount = tableDesc.getRegionReplication() - 1 - failedReplicas.size();
+    int toSendReplicaCount = regionReplication - 1 - failedReplicas.size();
     if (toSendReplicaCount <= 0) {
       return;
     }
@@ -199,7 +214,7 @@ public class RegionReplicationSink {
       toSend.stream().map(e -> new WAL.Entry(e.key, e.edit)).collect(Collectors.toList());
     AtomicInteger remaining = new AtomicInteger(toSendReplicaCount);
     Map<Integer, MutableObject<Throwable>> replica2Error = new HashMap<>();
-    for (int replicaId = 1; replicaId < tableDesc.getRegionReplication(); replicaId++) {
+    for (int replicaId = 1; replicaId < regionReplication; replicaId++) {
       MutableObject<Throwable> error = new MutableObject<>();
       replica2Error.put(replicaId, error);
       RegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(primary, replicaId);
@@ -223,6 +238,17 @@ public class RegionReplicationSink {
     return storesFlushed.containsAll(tableDesc.getColumnFamilyNames());
   }
 
+  private void clearAllEntries() {
+    long toClearSize = 0;
+    for (SinkEntry entry : entries) {
+      toClearSize += entry.size;
+      entry.replicated();
+    }
+    entries.clear();
+    pendingSize -= toClearSize;
+    manager.decrease(toClearSize);
+  }
+
   /**
    * Add this edit to replication queue.
    * <p/>
@@ -251,31 +277,67 @@ public class RegionReplicationSink {
               continue;
             }
             if (flushDesc != null && flushAllStores(flushDesc)) {
-              LOG.debug("Got a flush all request, clear failed replicas {} and {} pending" +
-                " replication entries", failedReplicas, entries.size());
-              entries.clear();
+              int toClearCount = 0;
+              long toClearSize = 0;
+              for (;;) {
+                SinkEntry e = entries.peek();
+                if (e == null) {
+                  break;
+                }
+                if (e.key.getSequenceId() < flushDesc.getFlushSequenceNumber()) {
+                  entries.poll();
+                  toClearCount++;
+                  toClearSize += e.size;
+                } else {
+                  break;
+                }
+              }
               failedReplicas.clear();
+              LOG.debug(
+                "Got a flush all request with sequence id {}, clear failed replicas {}" +
+                  " and {} pending entries with size {}",
+                flushDesc.getFlushSequenceNumber(), failedReplicas, toClearCount,
+                StringUtils.TraditionalBinaryPrefix.long2String(toClearSize, "", 1));
             }
           }
         }
       }
-      // TODO: limit the total cached entries here, and we should have a global limitation, not for
-      // only this region.
-      entries.add(new SinkEntry(key, edit, rpcCall));
-      if (!sending) {
-        send();
+      if (failedReplicas.size() == regionReplication - 1) {
+        // this means we have marked all the replicas as failed, so just give up here
+        return;
+      }
+      SinkEntry entry = new SinkEntry(key, edit, rpcCall);
+      entries.add(entry);
+      pendingSize += entry.size;
+      if (manager.increase(entry.size)) {
+        if (!sending) {
+          send();
+        }
+      } else {
+        // we have run out of the max pending size, drop all the edits, and mark all replicas as
+        // failed
+        clearAllEntries();
+        for (int replicaId = 1; replicaId < regionReplication; replicaId++) {
+          failedReplicas.add(replicaId);
+        }
+        flushRequester.run();
       }
     }
   }
 
+  long pendingSize() {
+    return pendingSize;
+  }
+
   /**
    * Stop the replication sink.
    * <p/>
    * Usually this should only be called when you want to close a region.
    */
-  void stop() {
+  public void stop() {
     synchronized (entries) {
       stopping = true;
+      clearAllEntries();
       if (!sending) {
         stopped = true;
         entries.notifyAll();
@@ -291,7 +353,7 @@ public class RegionReplicationSink {
    * <p/>
    * This is used to keep the replicating order the same with the WAL edit order when writing.
    */
-  void waitUntilStopped() throws InterruptedException {
+  public void waitUntilStopped() throws InterruptedException {
     synchronized (entries) {
       while (!stopped) {
         entries.wait();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java
index 15be95e..2076dd4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java
@@ -31,7 +31,7 @@ import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.ipc.RpcServer;
 import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
 import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl.WriteEntry;
-import org.apache.hadoop.hbase.regionserver.RegionReplicationSink;
+import org.apache.hadoop.hbase.regionserver.regionreplication.RegionReplicationSink;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.wal.WAL;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
index 82f7a85..ebe6edd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
@@ -47,13 +47,13 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HeapMemoryManager;
 import org.apache.hadoop.hbase.regionserver.LeaseManager;
 import org.apache.hadoop.hbase.regionserver.MetricsRegionServer;
-import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionServerAccounting;
 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
 import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
 import org.apache.hadoop.hbase.regionserver.SecureBulkLoadManager;
 import org.apache.hadoop.hbase.regionserver.ServerNonceManager;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
+import org.apache.hadoop.hbase.regionserver.regionreplication.RegionReplicationBufferManager;
 import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 import org.apache.hadoop.hbase.security.access.AccessChecker;
 import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher;
@@ -72,7 +72,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
  */
 public class MockRegionServerServices implements RegionServerServices {
   protected static final Logger LOG = LoggerFactory.getLogger(MockRegionServerServices.class);
-  private final Map<String, Region> regions = new HashMap<>();
+  private final Map<String, HRegion> regions = new HashMap<>();
   private final ConcurrentSkipListMap<byte[], Boolean> rit =
     new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
   private HFileSystem hfs = null;
@@ -108,17 +108,17 @@ public class MockRegionServerServices implements RegionServerServices {
   }
 
   @Override
-  public Region getRegion(String encodedRegionName) {
+  public HRegion getRegion(String encodedRegionName) {
     return this.regions.get(encodedRegionName);
   }
 
   @Override
-  public List<Region> getRegions(TableName tableName) throws IOException {
+  public List<HRegion> getRegions(TableName tableName) throws IOException {
     return null;
   }
 
   @Override
-  public List<Region> getRegions() {
+  public List<HRegion> getRegions() {
     return null;
   }
 
@@ -379,4 +379,9 @@ public class MockRegionServerServices implements RegionServerServices {
   public AsyncClusterConnection getAsyncClusterConnection() {
     return null;
   }
+
+  @Override
+  public RegionReplicationBufferManager getRegionReplicationBufferManager() {
+    return null;
+  }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
index 69a7a79..56813af 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
@@ -61,17 +61,16 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HeapMemoryManager;
 import org.apache.hadoop.hbase.regionserver.LeaseManager;
 import org.apache.hadoop.hbase.regionserver.MetricsRegionServer;
-import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionServerAccounting;
 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
 import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
 import org.apache.hadoop.hbase.regionserver.SecureBulkLoadManager;
 import org.apache.hadoop.hbase.regionserver.ServerNonceManager;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
+import org.apache.hadoop.hbase.regionserver.regionreplication.RegionReplicationBufferManager;
 import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 import org.apache.hadoop.hbase.security.access.AccessChecker;
 import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
@@ -138,6 +137,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBul
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
 
@@ -463,7 +463,7 @@ class MockRegionServer implements AdminProtos.AdminService.BlockingInterface,
   }
 
   @Override
-  public List<Region> getRegions() {
+  public List<HRegion> getRegions() {
     return null;
   }
 
@@ -527,7 +527,7 @@ class MockRegionServer implements AdminProtos.AdminService.BlockingInterface,
   }
 
   @Override
-  public List<Region> getRegions(TableName tableName) throws IOException {
+  public List<HRegion> getRegions(TableName tableName) throws IOException {
     return null;
   }
 
@@ -750,4 +750,9 @@ class MockRegionServer implements AdminProtos.AdminService.BlockingInterface,
   public AsyncClusterConnection getAsyncClusterConnection() {
     return null;
   }
+
+  @Override
+  public RegionReplicationBufferManager getRegionReplicationBufferManager() {
+    return null;
+  }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationBufferManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationBufferManager.java
new file mode 100644
index 0000000..8b56d09
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationBufferManager.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.regionreplication;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegion.FlushResultImpl;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ RegionServerTests.class, MediumTests.class })
+public class TestRegionReplicationBufferManager {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestRegionReplicationBufferManager.class);
+
+  private Configuration conf;
+
+  private RegionServerServices rsServices;
+
+  private RegionReplicationBufferManager manager;
+
+  @Before
+  public void setUp() {
+    conf = HBaseConfiguration.create();
+    rsServices = mock(RegionServerServices.class);
+    when(rsServices.getConfiguration()).thenReturn(conf);
+  }
+
+  @After
+  public void tearDown() {
+    if (manager != null) {
+      manager.stop();
+    }
+  }
+
+  private HRegion mockRegion(RegionInfo regionInfo, long pendingSize) throws IOException {
+    HRegion region = mock(HRegion.class);
+    when(region.getRegionInfo()).thenReturn(regionInfo);
+    if (pendingSize < 0) {
+      when(region.getRegionReplicationSink()).thenReturn(Optional.empty());
+    } else {
+      RegionReplicationSink sink = mock(RegionReplicationSink.class);
+      when(sink.pendingSize()).thenReturn(pendingSize);
+      when(region.getRegionReplicationSink()).thenReturn(Optional.of(sink));
+    }
+    return region;
+  }
+
+  @Test
+  public void testScheduleFlush() throws IOException, InterruptedException {
+    conf.setLong(RegionReplicationBufferManager.MAX_PENDING_SIZE, 1024 * 1024);
+    manager = new RegionReplicationBufferManager(rsServices);
+    RegionInfo info1 = RegionInfoBuilder.newBuilder(TableName.valueOf("info1")).build();
+    RegionInfo info2 = RegionInfoBuilder.newBuilder(TableName.valueOf("info2")).build();
+    HRegion region1 = mockRegion(info1, 1000);
+    HRegion region2 = mockRegion(info2, 10000);
+    when(rsServices.getRegions()).thenReturn(Arrays.asList(region1, region2));
+    CountDownLatch arrive = new CountDownLatch(1);
+    CountDownLatch resume = new CountDownLatch(1);
+    when(region2.flushcache(anyBoolean(), anyBoolean(), any())).then(i -> {
+      arrive.countDown();
+      resume.await();
+      FlushResultImpl result = mock(FlushResultImpl.class);
+      when(result.isFlushSucceeded()).thenReturn(true);
+      return result;
+    });
+    // hit the soft limit, should trigger a flush
+    assertTrue(manager.increase(1000 * 1024));
+    arrive.await();
+
+    // we should have called getRegions once to find the region to flush
+    verify(rsServices, times(1)).getRegions();
+
+    // hit the hard limit, but since the background thread is running as we haven't call the
+    // resume.countDown yet, the schedule of the new background flush task should be discard
+    // silently.
+    assertFalse(manager.increase(100 * 1024));
+    resume.countDown();
+
+    // wait several seconds and then check the getRegions call, we should not call it second time
+    Thread.sleep(2000);
+    verify(rsServices, times(1)).getRegions();
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSink.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSink.java
new file mode 100644
index 0000000..19b1698
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSink.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.regionreplication;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableNameTestRule;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.ipc.ServerCall;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALKeyImpl;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ RegionServerTests.class, MediumTests.class })
+public class TestRegionReplicationSink {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestRegionReplicationSink.class);
+
+  private Configuration conf;
+
+  private TableDescriptor td;
+
+  private RegionInfo primary;
+
+  private Runnable flushRequester;
+
+  private AsyncClusterConnection conn;
+
+  private RegionReplicationBufferManager manager;
+
+  @Rule
+  public final TableNameTestRule name = new TableNameTestRule();
+
+  @Before
+  public void setUp() {
+    conf = HBaseConfiguration.create();
+    td = TableDescriptorBuilder.newBuilder(name.getTableName())
+      .setColumnFamily(ColumnFamilyDescriptorBuilder.of("cf")).setRegionReplication(3).build();
+    primary = RegionInfoBuilder.newBuilder(name.getTableName()).build();
+    flushRequester = mock(Runnable.class);
+    conn = mock(AsyncClusterConnection.class);
+    manager = mock(RegionReplicationBufferManager.class);
+  }
+
+  private RegionReplicationSink create() {
+    return new RegionReplicationSink(conf, primary, td, manager, flushRequester, conn);
+  }
+
+  @Test
+  public void testNormal() {
+    RegionReplicationSink sink = create();
+    MutableInt next = new MutableInt(0);
+    List<CompletableFuture<Void>> futures =
+      Arrays.asList(new CompletableFuture<>(), new CompletableFuture<>());
+    when(conn.replicate(any(), anyList(), anyInt(), anyLong(), anyLong()))
+      .then(i -> futures.get(next.getAndIncrement()));
+    ServerCall<?> rpcCall = mock(ServerCall.class);
+    WALKeyImpl key = mock(WALKeyImpl.class);
+    when(key.estimatedSerializedSizeOf()).thenReturn(100L);
+    WALEdit edit = mock(WALEdit.class);
+    when(edit.estimatedSerializedSizeOf()).thenReturn(1000L);
+    when(manager.increase(anyLong())).thenReturn(true);
+
+    sink.add(key, edit, rpcCall);
+    // should call increase on manager
+    verify(manager, times(1)).increase(anyLong());
+    // should have been retained
+    verify(rpcCall, times(1)).retainByWAL();
+    assertEquals(1100, sink.pendingSize());
+
+    futures.get(0).complete(null);
+    // should not call decrease yet
+    verify(manager, never()).decrease(anyLong());
+    // should not call release yet
+    verify(rpcCall, never()).releaseByWAL();
+    assertEquals(1100, sink.pendingSize());
+
+    futures.get(1).complete(null);
+    // should call decrease
+    verify(manager, times(1)).decrease(anyLong());
+    // should call release
+    verify(rpcCall, times(1)).releaseByWAL();
+    assertEquals(0, sink.pendingSize());
+  }
+
+  @Test
+  public void testDropEdits() {
+    RegionReplicationSink sink = create();
+    MutableInt next = new MutableInt(0);
+    List<CompletableFuture<Void>> futures =
+      Arrays.asList(new CompletableFuture<>(), new CompletableFuture<>());
+    when(conn.replicate(any(), anyList(), anyInt(), anyLong(), anyLong()))
+      .then(i -> futures.get(next.getAndIncrement()));
+    ServerCall<?> rpcCall1 = mock(ServerCall.class);
+    WALKeyImpl key1 = mock(WALKeyImpl.class);
+    when(key1.estimatedSerializedSizeOf()).thenReturn(100L);
+    WALEdit edit1 = mock(WALEdit.class);
+    when(edit1.estimatedSerializedSizeOf()).thenReturn(1000L);
+    when(manager.increase(anyLong())).thenReturn(true);
+
+    sink.add(key1, edit1, rpcCall1);
+    verify(manager, times(1)).increase(anyLong());
+    verify(manager, never()).decrease(anyLong());
+    verify(rpcCall1, times(1)).retainByWAL();
+    assertEquals(1100, sink.pendingSize());
+
+    ServerCall<?> rpcCall2 = mock(ServerCall.class);
+    WALKeyImpl key2 = mock(WALKeyImpl.class);
+    when(key2.estimatedSerializedSizeOf()).thenReturn(200L);
+    WALEdit edit2 = mock(WALEdit.class);
+    when(edit2.estimatedSerializedSizeOf()).thenReturn(2000L);
+
+    sink.add(key2, edit2, rpcCall2);
+    verify(manager, times(2)).increase(anyLong());
+    verify(manager, never()).decrease(anyLong());
+    verify(rpcCall2, times(1)).retainByWAL();
+    assertEquals(3300, sink.pendingSize());
+
+    ServerCall<?> rpcCall3 = mock(ServerCall.class);
+    WALKeyImpl key3 = mock(WALKeyImpl.class);
+    when(key3.estimatedSerializedSizeOf()).thenReturn(200L);
+    WALEdit edit3 = mock(WALEdit.class);
+    when(edit3.estimatedSerializedSizeOf()).thenReturn(3000L);
+    when(manager.increase(anyLong())).thenReturn(false);
+
+    // should not buffer this edit
+    sink.add(key3, edit3, rpcCall3);
+    verify(manager, times(3)).increase(anyLong());
+    verify(manager, times(1)).decrease(anyLong());
+    // should retain and then release immediately
+    verify(rpcCall3, times(1)).retainByWAL();
+    verify(rpcCall3, times(1)).releaseByWAL();
+    // should also clear the pending edit
+    verify(rpcCall2, times(1)).releaseByWAL();
+    assertEquals(1100, sink.pendingSize());
+    // should have request flush
+    verify(flushRequester, times(1)).run();
+
+    // finish the replication for first edit, we should decrease the size, release the rpc call,and
+    // the pendingSize should be 0 as there are no pending entries
+    futures.forEach(f -> f.complete(null));
+    verify(manager, times(2)).decrease(anyLong());
+    verify(rpcCall1, times(1)).releaseByWAL();
+    assertEquals(0, sink.pendingSize());
+
+    // should only call replicate 2 times for replicating the first edit, as we have 2 secondary
+    // replicas
+    verify(conn, times(2)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong());
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplication.java
index b501ab2..a4da640 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplication.java
@@ -86,11 +86,6 @@ public class TestMetaRegionReplicaReplication {
   @Before
   public void before() throws Exception {
     Configuration conf = HTU.getConfiguration();
-    conf.setFloat("hbase.regionserver.logroll.multiplier", 0.0003f);
-    conf.setInt("replication.source.size.capacity", 10240);
-    conf.setLong("replication.source.sleepforretries", 100);
-    conf.setInt("hbase.regionserver.maxlogs", 10);
-    conf.setLong("hbase.master.logcleaner.ttl", 10);
     conf.setInt("zookeeper.recovery.retry", 1);
     conf.setInt("zookeeper.recovery.retry.intervalmill", 10);
     conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);