You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2021/11/22 13:50:35 UTC

[hbase] branch HBASE-26233 updated: HBASE-26416 Implement a new method for region replication instead of using replay (#3864)

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch HBASE-26233
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/HBASE-26233 by this push:
     new 5848844  HBASE-26416 Implement a new method for region replication instead of using replay (#3864)
5848844 is described below

commit 584884486bffadf139949519e125b48e5918f92a
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Mon Nov 22 21:50:02 2021 +0800

    HBASE-26416 Implement a new method for region replication instead of using replay (#3864)
    
    Signed-off-by: Xiaolin Ha <ha...@apache.org>
---
 .../java/org/apache/hadoop/hbase/HConstants.java   |   6 +
 .../src/main/protobuf/server/region/Admin.proto    |   5 +
 .../AsyncRegionReplicationRetryingCaller.java      |   2 +-
 .../hadoop/hbase/master/MasterRpcServices.java     |   6 +
 .../apache/hadoop/hbase/regionserver/HRegion.java  | 328 ++++++++++++++---
 .../hadoop/hbase/regionserver/RSRpcServices.java   |  67 +++-
 .../RegionReplicationFlushRequester.java           |   1 +
 .../regionreplication/RegionReplicationSink.java   |  87 ++---
 .../org/apache/hadoop/hbase/wal/WALSplitUtil.java  |   2 +
 .../hadoop/hbase/master/MockRegionServer.java      |   6 +
 .../regionserver/TestHRegionReplayEvents.java      |   2 +-
 .../hbase/regionserver/TestReplicateToReplica.java | 388 +++++++++++++++++++++
 .../TestRegionReplicationSink.java                 |  91 +----
 .../TestMetaRegionReplicaReplication.java          |   1 -
 .../regionserver/TestRegionReplicaReplication.java |   1 -
 15 files changed, 795 insertions(+), 198 deletions(-)

diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index 6464158..14701f5 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -1192,7 +1192,13 @@ public final class HConstants {
   public static final int PRIORITY_UNSET = -1;
   public static final int NORMAL_QOS = 0;
   public static final int REPLICATION_QOS = 5;
+  /**
+   * @deprecated since 3.0.0, will be removed in 4.0.0. DLR has been purged for a long time and
+   *             region replication has its own 'replay' method.
+   */
+  @Deprecated
   public static final int REPLAY_QOS = 6;
+  public static final int REGION_REPLICATION_QOS = REPLAY_QOS;
   public static final int QOS_THRESHOLD = 10;
   public static final int ADMIN_QOS = 100;
   public static final int HIGH_QOS = 200;
diff --git a/hbase-protocol-shaded/src/main/protobuf/server/region/Admin.proto b/hbase-protocol-shaded/src/main/protobuf/server/region/Admin.proto
index 0667292..89b9985 100644
--- a/hbase-protocol-shaded/src/main/protobuf/server/region/Admin.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/server/region/Admin.proto
@@ -360,6 +360,11 @@ service AdminService {
     returns(ReplicateWALEntryResponse);
 
   rpc Replay(ReplicateWALEntryRequest)
+    returns(ReplicateWALEntryResponse) {
+    option deprecated = true;
+  };
+
+  rpc ReplicateToReplica(ReplicateWALEntryRequest)
     returns(ReplicateWALEntryResponse);
 
   rpc RollWALWriter(RollWALWriterRequest)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicationRetryingCaller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicationRetryingCaller.java
index a0ce418..c854ba3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicationRetryingCaller.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicationRetryingCaller.java
@@ -67,7 +67,7 @@ public class AsyncRegionReplicationRetryingCaller extends AsyncRpcRetryingCaller
       .buildReplicateWALEntryRequest(entries, replica.getEncodedNameAsBytes(), null, null, null);
     resetCallTimeout();
     controller.setCellScanner(pair.getSecond());
-    stub.replay(controller, pair.getFirst(), r -> {
+    stub.replicateToReplica(controller, pair.getFirst(), r -> {
       if (controller.failed()) {
         onError(controller.getFailed(),
           () -> "Call to " + loc.getServerName() + " for " + replica + " failed",
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
index 46bc8c2..d231796 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
@@ -3492,4 +3492,10 @@ public class MasterRpcServices extends HBaseRpcServicesBase<HMaster>
       .forEach(builder::addServer);
     return builder.build();
   }
+
+  @Override
+  public ReplicateWALEntryResponse replicateToReplica(RpcController controller,
+    ReplicateWALEntryRequest request) throws ServiceException {
+    throw new ServiceException(new DoNotRetryIOException("Unsupported method on master"));
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 92b8734..eaee74b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -65,6 +65,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.LongAdder;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -197,6 +198,7 @@ import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
 import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
 
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad;
@@ -355,7 +357,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   private Path regionDir;
   private FileSystem walFS;
 
-  // set to true if the region is restored from snapshot
+  // set to true if the region is restored from snapshot for reading by ClientSideRegionScanner
   private boolean isRestoredRegion = false;
 
   public void setRestoredRegion(boolean restoredRegion) {
@@ -411,9 +413,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   // The following map is populated when opening the region
   Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR);
 
+  // lock used to protect the replay operation for secondary replicas, so the below two fields does
+  // not need to be volatile.
+  private Lock replayLock;
+
   /** Saved state from replaying prepare flush cache */
   private PrepareFlushResult prepareFlushResult = null;
 
+  private long lastReplayedSequenceId = HConstants.NO_SEQNUM;
+
   private volatile ConfigurationManager configurationManager;
 
   // Used for testing.
@@ -1072,7 +1080,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
             LOG.debug("Failed to clean up wrong region WAL directory {}", wrongRegionWALDir);
           }
         }
+      } else {
+        lastReplayedSequenceId = nextSeqId - 1;
+        replayLock = new ReentrantLock();
       }
+      initializeRegionReplicationSink(reporter, status);
     }
 
     LOG.info("Opened {}; next sequenceid={}; {}, {}", this.getRegionInfo().getShortNameToLog(),
@@ -1087,7 +1099,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       status.setStatus("Running coprocessor post-open hooks");
       coprocessorHost.postOpen();
     }
-    initializeRegionReplicationSink(reporter, status);
     status.markComplete("Region opened successfully");
     return nextSeqId;
   }
@@ -1241,6 +1252,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     RegionEventDescriptor regionEventDesc = ProtobufUtil.toRegionEventDescriptor(
       RegionEventDescriptor.EventType.REGION_CLOSE, getRegionInfo(), mvcc.getReadPoint(),
       getRegionServerServices().getServerName(), storeFiles);
+    // we do not care region close event at secondary replica side so just pass a null
+    // RegionReplicationSink
     WALUtil.writeRegionEventMarker(wal, getReplicationScope(), getRegionInfo(), regionEventDesc,
       mvcc, null);
 
@@ -1683,7 +1696,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         status.setStatus("Failed pre-flush " + this + "; " + ioe.getMessage());
       }
     }
-
+    if (regionReplicationSink.isPresent()) {
+      // stop replicating to secondary replicas
+      // the open event marker can make secondary replicas refresh store files and catch up
+      // everything, so here we just give up replicating later edits, to speed up the reopen process
+      RegionReplicationSink sink = regionReplicationSink.get();
+      sink.stop();
+      try {
+        regionReplicationSink.get().waitUntilStopped();
+      } catch (InterruptedException e) {
+        throw throwOnInterrupt(e);
+      }
+    }
     // Set the closing flag
     // From this point new arrivals at the region lock will get NSRE.
 
@@ -1887,16 +1911,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         RegionReplicaUtil.isDefaultReplica(getRegionInfo())) {
         writeRegionCloseMarker(wal);
       }
-      if (regionReplicationSink.isPresent()) {
-        // stop replicating to secondary replicas
-        RegionReplicationSink sink = regionReplicationSink.get();
-        sink.stop();
-        try {
-          regionReplicationSink.get().waitUntilStopped();
-        } catch (InterruptedException e) {
-          throw throwOnInterrupt(e);
-        }
-      }
       this.closed.set(true);
       if (!canFlush) {
         decrMemStoreSize(this.memStoreSizing.getMemStoreSize());
@@ -2857,7 +2871,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
             getRegionInfo(), flushOpSeqId, committedFiles);
         // No sync. Sync is below where no updates lock and we do FlushAction.COMMIT_FLUSH
         WALUtil.writeFlushMarker(wal, this.getReplicationScope(), getRegionInfo(), desc, false,
-          mvcc, null);
+          mvcc, regionReplicationSink.orElse(null));
       }
 
       // Prepare flush (take a snapshot)
@@ -2955,7 +2969,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   /**
    * Writes a marker to WAL indicating a flush is requested but cannot be complete due to various
    * reasons. Ignores exceptions from WAL. Returns whether the write succeeded.
-   * @param wal
    * @return whether WAL write was successful
    */
   private boolean writeFlushRequestMarkerToWAL(WAL wal, boolean writeFlushWalMarker) {
@@ -2964,11 +2977,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         getRegionInfo(), -1, new TreeMap<>(Bytes.BYTES_COMPARATOR));
       try {
         WALUtil.writeFlushMarker(wal, this.getReplicationScope(), getRegionInfo(), desc, true, mvcc,
-          null);
+          regionReplicationSink.orElse(null));
         return true;
       } catch (IOException e) {
-        LOG.warn(getRegionInfo().getEncodedName() + " : "
-            + "Received exception while trying to write the flush request to wal", e);
+        LOG.warn(getRegionInfo().getEncodedName() + " : " +
+          "Received exception while trying to write the flush request to wal", e);
       }
     }
     return false;
@@ -4413,7 +4426,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   /**
    * Batch of mutations for replay. Base class is shared with {@link MutationBatchOperation} as most
    * of the logic is same.
+   * @deprecated Since 3.0.0, will be removed in 4.0.0. Now we will not use this operation to apply
+   *             edits at secondary replica side.
    */
+  @Deprecated
   private static final class ReplayBatchOperation extends BatchOperation<MutationReplay> {
 
     private long origLogSeqNum = 0;
@@ -4551,8 +4567,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       () -> createRegionSpan("Region.batchMutate"));
   }
 
-  public OperationStatus[] batchReplay(MutationReplay[] mutations, long replaySeqId)
-      throws IOException {
+  /**
+   * @deprecated Since 3.0.0, will be removed in 4.0.0. Now we use
+   *             {@link #replayWALEntry(WALEntry, CellScanner)} for replaying edits at secondary
+   *             replica side.
+   */
+  @Deprecated
+  OperationStatus[] batchReplay(MutationReplay[] mutations, long replaySeqId) throws IOException {
     if (!RegionReplicaUtil.isDefaultReplica(getRegionInfo())
         && replaySeqId < lastReplayedOpenRegionSeqId) {
       // if it is a secondary replica we should ignore these entries silently
@@ -5708,9 +5729,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     }
   }
 
+  /**
+   * @deprecated Since 3.0.0, will be removed in 4.0.0. Only for keep compatibility for old region
+   *             replica implementation.
+   */
+  @Deprecated
   void replayWALFlushMarker(FlushDescriptor flush, long replaySeqId) throws IOException {
-    checkTargetRegion(flush.getEncodedRegionName().toByteArray(),
-      "Flush marker from WAL ", flush);
+    checkTargetRegion(flush.getEncodedRegionName().toByteArray(), "Flush marker from WAL ", flush);
 
     if (ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) {
       return; // if primary nothing to do
@@ -5750,25 +5775,34 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     }
   }
 
-  /** Replay the flush marker from primary region by creating a corresponding snapshot of
-   * the store memstores, only if the memstores do not have a higher seqId from an earlier wal
-   * edit (because the events may be coming out of order).
-   */
-  PrepareFlushResult replayWALFlushStartMarker(FlushDescriptor flush) throws IOException {
-    long flushSeqId = flush.getFlushSequenceNumber();
-
-    HashSet<HStore> storesToFlush = new HashSet<>();
-    for (StoreFlushDescriptor storeFlush : flush.getStoreFlushesList()) {
+  private Collection<HStore> getStoresToFlush(FlushDescriptor flushDesc) {
+    List<HStore> storesToFlush = new ArrayList<>();
+    for (StoreFlushDescriptor storeFlush : flushDesc.getStoreFlushesList()) {
       byte[] family = storeFlush.getFamilyName().toByteArray();
       HStore store = getStore(family);
       if (store == null) {
-        LOG.warn(getRegionInfo().getEncodedName() + " : "
-          + "Received a flush start marker from primary, but the family is not found. Ignoring"
-          + " StoreFlushDescriptor:" + TextFormat.shortDebugString(storeFlush));
+        LOG.warn(getRegionInfo().getEncodedName() + " : " +
+          "Received a flush start marker from primary, but the family is not found. Ignoring" +
+          " StoreFlushDescriptor:" + TextFormat.shortDebugString(storeFlush));
         continue;
       }
       storesToFlush.add(store);
     }
+    return storesToFlush;
+  }
+
+  /**
+   * Replay the flush marker from primary region by creating a corresponding snapshot of the store
+   * memstores, only if the memstores do not have a higher seqId from an earlier wal edit (because
+   * the events may be coming out of order).
+   * @deprecated Since 3.0.0, will be removed in 4.0.0. Only for keep compatibility for old region
+   *             replica implementation.
+   */
+  @Deprecated
+  PrepareFlushResult replayWALFlushStartMarker(FlushDescriptor flush) throws IOException {
+    long flushSeqId = flush.getFlushSequenceNumber();
+
+    Collection<HStore> storesToFlush = getStoresToFlush(flush);
 
     MonitoredTask status = TaskMonitor.get().createStatus("Preparing flush " + this);
 
@@ -5864,6 +5898,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     return null;
   }
 
+  /**
+   * @deprecated Since 3.0.0, will be removed in 4.0.0. Only for keep compatibility for old region
+   *             replica implementation.
+   */
+  @Deprecated
   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",
     justification="Intentional; post memstore flush")
   void replayWALFlushCommitMarker(FlushDescriptor flush) throws IOException {
@@ -5985,11 +6024,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   /**
    * Replays the given flush descriptor by opening the flush files in stores and dropping the
    * memstore snapshots if requested.
-   * @param flush
-   * @param prepareFlushResult
-   * @param dropMemstoreSnapshot
-   * @throws IOException
+   * @deprecated Since 3.0.0, will be removed in 4.0.0. Only for keep compatibility for old region
+   *             replica implementation.
    */
+  @Deprecated
   private void replayFlushInStores(FlushDescriptor flush, PrepareFlushResult prepareFlushResult,
       boolean dropMemstoreSnapshot)
       throws IOException {
@@ -6083,7 +6121,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   /**
    * Drops the memstore contents after replaying a flush descriptor or region open event replay
    * if the memstore edits have seqNums smaller than the given seq id
-   * @throws IOException
    */
   private MemStoreSize dropMemStoreContentsForSeqId(long seqId, HStore store) throws IOException {
     MemStoreSizing totalFreedSize = new NonThreadSafeMemStoreSizing();
@@ -6155,8 +6192,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     return prepareFlushResult;
   }
 
-  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",
-      justification="Intentional; cleared the memstore")
+  /**
+   * @deprecated Since 3.0.0, will be removed in 4.0.0. Only for keep compatibility for old region
+   *             replica implementation.
+   */
+  @Deprecated
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NN_NAKED_NOTIFY",
+    justification = "Intentional; cleared the memstore")
   void replayWALRegionEventMarker(RegionEventDescriptor regionEvent) throws IOException {
     checkTargetRegion(regionEvent.getEncodedRegionName().toByteArray(),
       "RegionEvent marker from WAL ", regionEvent);
@@ -6273,6 +6315,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     }
   }
 
+  /**
+   * @deprecated Since 3.0.0, will be removed in 4.0.0. Only for keep compatibility for old region
+   *             replica implementation.
+   */
+  @Deprecated
   void replayWALBulkLoadEventMarker(WALProtos.BulkLoadDescriptor bulkLoadEvent) throws IOException {
     checkTargetRegion(bulkLoadEvent.getEncodedRegionName().toByteArray(),
       "BulkLoad marker from WAL ", bulkLoadEvent);
@@ -6354,6 +6401,205 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   }
 
   /**
+   * Replay the batch mutate for secondary replica.
+   * <p/>
+   * We will directly apply the cells to the memstore. This is because:
+   * <ol>
+   * <li>All the cells are gotten from {@link WALEdit}, so we only have {@link Put} and
+   * {@link Delete} here</li>
+   * <li>The replay is single threaded, we do not need to acquire row lock, as the region is read
+   * only so no one else can write it.</li>
+   * <li>We do not need to write WAL.</li>
+   * <li>We will advance MVCC in the caller directly.</li>
+   * </ol>
+   */
+  private void replayWALBatchMutate(Map<byte[], List<Cell>> family2Cells) throws IOException {
+    startRegionOperation(Operation.REPLAY_BATCH_MUTATE);
+    try {
+      for (Map.Entry<byte[], List<Cell>> entry : family2Cells.entrySet()) {
+        applyToMemStore(getStore(entry.getKey()), entry.getValue(), false, memStoreSizing);
+      }
+    } finally {
+      closeRegionOperation(Operation.REPLAY_BATCH_MUTATE);
+    }
+  }
+
+  /**
+   * Replay the meta edits, i.e, flush marker, compaction marker, bulk load marker, region event
+   * marker, etc.
+   * <p/>
+   * For all events other than start flush, we will just call {@link #refreshStoreFiles()} as the
+   * logic is straight-forward and robust. For start flush, we need to snapshot the memstore, so
+   * later {@link #refreshStoreFiles()} call could drop the snapshot, otherwise we may run out of
+   * memory.
+   */
+  private void replayWALMetaEdit(Cell cell) throws IOException {
+    startRegionOperation(Operation.REPLAY_EVENT);
+    try {
+      FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(cell);
+      if (flushDesc != null) {
+        switch (flushDesc.getAction()) {
+          case START_FLUSH:
+            // for start flush, we need to take a snapshot of the current memstore
+            synchronized (writestate) {
+              if (!writestate.flushing) {
+                this.writestate.flushing = true;
+              } else {
+                // usually this should not happen but let's make the code more robust, it is not a
+                // big deal to just ignore it, the refreshStoreFiles call should have the ability to
+                // clean up the inconsistent state.
+                LOG.debug("NOT flushing {} as already flushing", getRegionInfo());
+                break;
+              }
+            }
+            MonitoredTask status =
+              TaskMonitor.get().createStatus("Preparing flush " + getRegionInfo());
+            Collection<HStore> storesToFlush = getStoresToFlush(flushDesc);
+            try {
+              PrepareFlushResult prepareResult =
+                internalPrepareFlushCache(null, flushDesc.getFlushSequenceNumber(), storesToFlush,
+                  status, false, FlushLifeCycleTracker.DUMMY);
+              if (prepareResult.result == null) {
+                // save the PrepareFlushResult so that we can use it later from commit flush
+                this.prepareFlushResult = prepareResult;
+                status.markComplete("Flush prepare successful");
+                if (LOG.isDebugEnabled()) {
+                  LOG.debug("{} prepared flush with seqId: {}", getRegionInfo(),
+                    flushDesc.getFlushSequenceNumber());
+                }
+              } else {
+                // special case empty memstore. We will still save the flush result in this case,
+                // since our memstore is empty, but the primary is still flushing
+                if (prepareResult.getResult()
+                  .getResult() == FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY) {
+                  this.prepareFlushResult = prepareResult;
+                  if (LOG.isDebugEnabled()) {
+                    LOG.debug("{} prepared empty flush with seqId: {}", getRegionInfo(),
+                      flushDesc.getFlushSequenceNumber());
+                  }
+                }
+                status.abort("Flush prepare failed with " + prepareResult.result);
+                // nothing much to do. prepare flush failed because of some reason.
+              }
+            } finally {
+              status.cleanup();
+            }
+            break;
+          case ABORT_FLUSH:
+            // do nothing, an abort flush means the source region server will crash itself, after
+            // the primary region online, it will send us an open region marker, then we can clean
+            // up the memstore.
+            synchronized (writestate) {
+              writestate.flushing = false;
+            }
+            break;
+          case COMMIT_FLUSH:
+          case CANNOT_FLUSH:
+            // just call refreshStoreFiles
+            refreshStoreFiles();
+            logRegionFiles();
+            synchronized (writestate) {
+              writestate.flushing = false;
+            }
+            break;
+          default:
+            LOG.warn("{} received a flush event with unknown action: {}", getRegionInfo(),
+              TextFormat.shortDebugString(flushDesc));
+        }
+      } else {
+        // for all other region events, we will do a refreshStoreFiles
+        refreshStoreFiles();
+        logRegionFiles();
+      }
+    } finally {
+      closeRegionOperation(Operation.REPLAY_EVENT);
+    }
+  }
+
+  /**
+   * Replay remote wal entry sent by primary replica.
+   * <p/>
+   * Should only call this method on secondary replicas.
+   */
+  void replayWALEntry(WALEntry entry, CellScanner cells) throws IOException {
+    long timeout = -1L;
+    Optional<RpcCall> call = RpcServer.getCurrentCall();
+    if (call.isPresent()) {
+      long deadline = call.get().getDeadline();
+      if (deadline < Long.MAX_VALUE) {
+        timeout = deadline - EnvironmentEdgeManager.currentTime();
+        if (timeout <= 0) {
+          throw new TimeoutIOException("Timeout while replaying edits for " + getRegionInfo());
+        }
+      }
+    }
+    if (timeout > 0) {
+      try {
+        if (!replayLock.tryLock(timeout, TimeUnit.MILLISECONDS)) {
+          throw new TimeoutIOException(
+            "Timeout while waiting for lock when replaying edits for " + getRegionInfo());
+        }
+      } catch (InterruptedException e) {
+        throw throwOnInterrupt(e);
+      }
+    } else {
+      replayLock.lock();
+    }
+    try {
+      int count = entry.getAssociatedCellCount();
+      long sequenceId = entry.getKey().getLogSequenceNumber();
+      if (lastReplayedSequenceId >= sequenceId) {
+        // we have already replayed this edit, skip
+        // remember to advance the CellScanner, as we may have multiple WALEntries, we may still
+        // need apply later WALEntries
+        for (int i = 0; i < count; i++) {
+          // Throw index out of bounds if our cell count is off
+          if (!cells.advance()) {
+            throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
+          }
+        }
+        return;
+      }
+      Map<byte[], List<Cell>> family2Cells = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+      for (int i = 0; i < count; i++) {
+        // Throw index out of bounds if our cell count is off
+        if (!cells.advance()) {
+          throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
+        }
+        Cell cell = cells.current();
+        if (WALEdit.isMetaEditFamily(cell)) {
+          // If there is meta edit, i.e, we have done flush/compaction/open, then we need to apply
+          // the previous cells first, and then replay the special meta edit. The meta edit is like
+          // a barrier, We need to keep the order. For example, the flush marker will contain a
+          // flush sequence number, which makes us possible to drop memstore content, but if we
+          // apply some edits which have greater sequence id first, then we can not drop the
+          // memstore content when replaying the flush marker, which is not good as we could run out
+          // of memory.
+          // And usually, a meta edit will have a special WALEntry for it, so this is just a safe
+          // guard logic to make sure we do not break things in the worst case.
+          if (!family2Cells.isEmpty()) {
+            replayWALBatchMutate(family2Cells);
+            family2Cells.clear();
+          }
+          replayWALMetaEdit(cell);
+        } else {
+          family2Cells
+            .computeIfAbsent(CellUtil.cloneFamily(cell), k -> new ArrayList<>())
+            .add(cell);
+        }
+      }
+      // do not forget to apply the remaining cells
+      if (!family2Cells.isEmpty()) {
+        replayWALBatchMutate(family2Cells);
+      }
+      mvcc.advanceTo(sequenceId);
+      lastReplayedSequenceId = sequenceId;
+    } finally {
+      replayLock.unlock();
+    }
+  }
+
+  /**
    * If all stores ended up dropping their snapshots, we can safely drop the prepareFlushResult
    */
   private void dropPrepareFlushIfPossible() {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 724da1a..22357f2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -1088,15 +1088,15 @@ public class RSRpcServices extends HBaseRpcServicesBase<HRegionServer>
   /**
    * Execute a list of Put/Delete mutations. The function returns OperationStatus instead of
    * constructing MultiResponse to save a possible loop if caller doesn't need MultiResponse.
-   * @param region
-   * @param mutations
-   * @param replaySeqId
    * @return an array of OperationStatus which internally contains the OperationStatusCode and the
    *         exceptionMessage if any
-   * @throws IOException
+   * @deprecated Since 3.0.0, will be removed in 4.0.0. We do not use this method for replaying
+   *             edits for secondary replicas any more, see
+   *             {@link #replicateToReplica(RpcController, ReplicateWALEntryRequest)}.
    */
-  private OperationStatus [] doReplayBatchOp(final HRegion region,
-      final List<MutationReplay> mutations, long replaySeqId) throws IOException {
+  @Deprecated
+  private OperationStatus[] doReplayBatchOp(final HRegion region,
+    final List<MutationReplay> mutations, long replaySeqId) throws IOException {
     long before = EnvironmentEdgeManager.currentTime();
     boolean batchContainsPuts = false, batchContainsDelete = false;
     try {
@@ -2075,21 +2075,30 @@ public class RSRpcServices extends HBaseRpcServicesBase<HRegionServer>
     return response;
   }
 
+  private CellScanner getAndReset(RpcController controller) {
+    HBaseRpcController hrc = (HBaseRpcController) controller;
+    CellScanner cells = hrc.cellScanner();
+    hrc.setCellScanner(null);
+    return cells;
+  }
+
   /**
    * Replay the given changes when distributedLogReplay WAL edits from a failed RS. The guarantee is
    * that the given mutations will be durable on the receiving RS if this method returns without any
    * exception.
    * @param controller the RPC controller
    * @param request the request
-   * @throws ServiceException
+   * @deprecated Since 3.0.0, will be removed in 4.0.0. Not used any more, put here only for
+   *             compatibility with old region replica implementation. Now we will use
+   *             {@code replicateToReplica} method instead.
    */
+  @Deprecated
   @Override
   @QosPriority(priority = HConstants.REPLAY_QOS)
   public ReplicateWALEntryResponse replay(final RpcController controller,
-      final ReplicateWALEntryRequest request) throws ServiceException {
+    final ReplicateWALEntryRequest request) throws ServiceException {
     long before = EnvironmentEdgeManager.currentTime();
-    CellScanner cells = ((HBaseRpcController) controller).cellScanner();
-    ((HBaseRpcController) controller).setCellScanner(null);
+    CellScanner cells = getAndReset(controller);
     try {
       checkOpen();
       List<WALEntry> entries = request.getEntryList();
@@ -2176,6 +2185,41 @@ public class RSRpcServices extends HBaseRpcServicesBase<HRegionServer>
     }
   }
 
+  /**
+   * Replay the given changes on a secondary replica
+   */
+  @Override
+  public ReplicateWALEntryResponse replicateToReplica(RpcController controller,
+    ReplicateWALEntryRequest request) throws ServiceException {
+    CellScanner cells = getAndReset(controller);
+    try {
+      checkOpen();
+      List<WALEntry> entries = request.getEntryList();
+      if (entries == null || entries.isEmpty()) {
+        // empty input
+        return ReplicateWALEntryResponse.newBuilder().build();
+      }
+      ByteString regionName = entries.get(0).getKey().getEncodedRegionName();
+      HRegion region = server.getRegionByEncodedName(regionName.toStringUtf8());
+      if (RegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) {
+        throw new DoNotRetryIOException(
+          "Should not replicate to primary replica " + region.getRegionInfo() + ", CODE BUG?");
+      }
+      for (WALEntry entry : entries) {
+        if (!regionName.equals(entry.getKey().getEncodedRegionName())) {
+          throw new NotServingRegionException(
+            "ReplicateToReplica request contains entries from multiple " +
+              "regions. First region:" + regionName.toStringUtf8() + " , other region:" +
+              entry.getKey().getEncodedRegionName());
+        }
+        region.replayWALEntry(entry, cells);
+      }
+      return ReplicateWALEntryResponse.newBuilder().build();
+    } catch (IOException ie) {
+      throw new ServiceException(ie);
+    }
+  }
+
   private void checkShouldRejectReplicationRequest(List<WALEntry> entries) throws IOException {
     ReplicationSourceService replicationSource = server.getReplicationSourceService();
     if (replicationSource == null || entries.isEmpty()) {
@@ -2209,8 +2253,7 @@ public class RSRpcServices extends HBaseRpcServicesBase<HRegionServer>
         requestCount.increment();
         List<WALEntry> entries = request.getEntryList();
         checkShouldRejectReplicationRequest(entries);
-        CellScanner cellScanner = ((HBaseRpcController) controller).cellScanner();
-        ((HBaseRpcController) controller).setCellScanner(null);
+        CellScanner cellScanner = getAndReset(controller);
         server.getRegionServerCoprocessorHost().preReplicateLogEntries();
         server.getReplicationSinkService().replicateLogEntries(entries, cellScanner,
           request.getReplicationClusterId(), request.getSourceBaseNamespaceDirPath(),
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationFlushRequester.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationFlushRequester.java
index 960f57e..3431324 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationFlushRequester.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationFlushRequester.java
@@ -120,6 +120,7 @@ class RegionReplicationFlushRequester {
     }
     // schedule a timer task
     HashedWheelTimer timer = getTimer();
+    pendingFlushRequestSequenceId = sequenceId;
     pendingFlushRequest =
       timer.newTimeout(this::flush, minIntervalSecs - elapsedSecs, TimeUnit.SECONDS);
   }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java
index d5e2387..f0129b7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java
@@ -22,7 +22,6 @@ import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -32,6 +31,7 @@ import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
+import org.agrona.collections.IntHashSet;
 import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
@@ -54,6 +54,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
 
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
 
 /**
  * The class for replicating WAL edits to secondary replicas, one instance per region.
@@ -128,7 +129,7 @@ public class RegionReplicationSink {
   // when we get a flush all request, we will try to remove a replica from this map, the key point
   // here is the flush sequence number must be greater than the failed sequence id, otherwise we
   // should not remove the replica from this map
-  private final Map<Integer, Long> failedReplicas = new HashMap<>();
+  private final IntHashSet failedReplicas;
 
   private final Queue<SinkEntry> entries = new ArrayDeque<>();
 
@@ -165,6 +166,7 @@ public class RegionReplicationSink {
       TimeUnit.MILLISECONDS.toNanos(conf.getLong(RPC_TIMEOUT_MS, RPC_TIMEOUT_MS_DEFAULT));
     this.operationTimeoutNs = TimeUnit.MILLISECONDS
       .toNanos(conf.getLong(OPERATION_TIMEOUT_MS, OPERATION_TIMEOUT_MS_DEFAULT));
+    this.failedReplicas = new IntHashSet(regionReplication - 1);
   }
 
   private void onComplete(List<SinkEntry> sent,
@@ -184,16 +186,16 @@ public class RegionReplicationSink {
       if (error != null) {
         if (maxSequenceId > lastFlushedSequenceId) {
           LOG.warn(
-            "Failed to replicate to secondary replica {} for {}, since the max sequence"
-              + " id of sunk entris is {}, which is greater than the last flush SN {},"
-              + " we will stop replicating for a while and trigger a flush",
+            "Failed to replicate to secondary replica {} for {}, since the max sequence" +
+              " id of sunk entris is {}, which is greater than the last flush SN {}," +
+              " we will stop replicating for a while and trigger a flush",
             replicaId, primary, maxSequenceId, lastFlushedSequenceId, error);
           failed.add(replicaId);
         } else {
           LOG.warn(
-            "Failed to replicate to secondary replica {} for {}, since the max sequence"
-              + " id of sunk entris is {}, which is less than or equal to the last flush SN {},"
-              + " we will not stop replicating",
+            "Failed to replicate to secondary replica {} for {}, since the max sequence" +
+              " id of sunk entris is {}, which is less than or equal to the last flush SN {}," +
+              " we will not stop replicating",
             replicaId, primary, maxSequenceId, lastFlushedSequenceId, error);
         }
       }
@@ -201,9 +203,7 @@ public class RegionReplicationSink {
     synchronized (entries) {
       pendingSize -= toReleaseSize;
       if (!failed.isEmpty()) {
-        for (Integer replicaId : failed) {
-          failedReplicas.put(replicaId, maxSequenceId);
-        }
+        failedReplicas.addAll(failed);
         flushRequester.requestFlush(maxSequenceId);
       }
       sending = false;
@@ -237,7 +237,7 @@ public class RegionReplicationSink {
     AtomicInteger remaining = new AtomicInteger(toSendReplicaCount);
     Map<Integer, MutableObject<Throwable>> replica2Error = new HashMap<>();
     for (int replicaId = 1; replicaId < regionReplication; replicaId++) {
-      if (failedReplicas.containsKey(replicaId)) {
+      if (failedReplicas.contains(replicaId)) {
         continue;
       }
       MutableObject<Throwable> error = new MutableObject<>();
@@ -253,7 +253,15 @@ public class RegionReplicationSink {
     }
   }
 
-  private boolean isFlushAllStores(FlushDescriptor flushDesc) {
+  private boolean isStartFlushAllStores(FlushDescriptor flushDesc) {
+    if (flushDesc.getAction() == FlushAction.CANNOT_FLUSH) {
+      // this means the memstore is empty, which means all data before this sequence id are flushed
+      // out, so it equals to a flush all, return true
+      return true;
+    }
+    if (flushDesc.getAction() != FlushAction.START_FLUSH) {
+      return false;
+    }
     Set<byte[]> storesFlushed =
       flushDesc.getStoreFlushesList().stream().map(sfd -> sfd.getFamilyName().toByteArray())
         .collect(Collectors.toCollection(() -> new TreeSet<>(Bytes.BYTES_COMPARATOR)));
@@ -263,7 +271,7 @@ public class RegionReplicationSink {
     return storesFlushed.containsAll(tableDesc.getColumnFamilyNames());
   }
 
-  private Optional<FlushDescriptor> getFlushAllDescriptor(Cell metaCell) {
+  private Optional<FlushDescriptor> getStartFlushAllDescriptor(Cell metaCell) {
     if (!CellUtil.matchingFamily(metaCell, WALEdit.METAFAMILY)) {
       return Optional.empty();
     }
@@ -274,14 +282,14 @@ public class RegionReplicationSink {
       LOG.warn("Failed to parse FlushDescriptor from {}", metaCell);
       return Optional.empty();
     }
-    if (flushDesc != null && isFlushAllStores(flushDesc)) {
+    if (flushDesc != null && isStartFlushAllStores(flushDesc)) {
       return Optional.of(flushDesc);
     } else {
       return Optional.empty();
     }
   }
 
-  private void clearAllEntries() {
+  private long clearAllEntries() {
     long toClearSize = 0;
     for (SinkEntry entry : entries) {
       toClearSize += entry.size;
@@ -290,20 +298,7 @@ public class RegionReplicationSink {
     entries.clear();
     pendingSize -= toClearSize;
     manager.decrease(toClearSize);
-  }
-
-  private void clearFailedReplica(long flushSequenceNumber) {
-    for (Iterator<Map.Entry<Integer, Long>> iter = failedReplicas.entrySet().iterator(); iter
-      .hasNext();) {
-      Map.Entry<Integer, Long> entry = iter.next();
-      if (entry.getValue().longValue() < flushSequenceNumber) {
-        LOG.debug(
-          "Got a flush all request with sequence id {}, clear failed replica {}" +
-            " with last failed sequence id {}",
-          flushSequenceNumber, entry.getKey(), entry.getValue());
-        iter.remove();
-      }
-    }
+    return toClearSize;
   }
 
   /**
@@ -325,32 +320,20 @@ public class RegionReplicationSink {
         // check whether we flushed all stores, which means we could drop all the previous edits,
         // and also, recover from the previous failure of some replicas
         for (Cell metaCell : edit.getCells()) {
-          getFlushAllDescriptor(metaCell).ifPresent(flushDesc -> {
+          getStartFlushAllDescriptor(metaCell).ifPresent(flushDesc -> {
             long flushSequenceNumber = flushDesc.getFlushSequenceNumber();
-            int toClearCount = 0;
-            long toClearSize = 0;
-            for (;;) {
-              SinkEntry e = entries.peek();
-              if (e == null) {
-                break;
-              }
-              if (e.key.getSequenceId() < flushSequenceNumber) {
-                entries.poll();
-                toClearCount++;
-                toClearSize += e.size;
-              } else {
-                break;
-              }
-            }
             lastFlushedSequenceId = flushSequenceNumber;
+            long clearedCount = entries.size();
+            long clearedSize = clearAllEntries();
             if (LOG.isDebugEnabled()) {
               LOG.debug(
-                "Got a flush all request with sequence id {}, clear {} pending"
-                  + " entries with size {}",
-                flushSequenceNumber, toClearCount,
-                StringUtils.TraditionalBinaryPrefix.long2String(toClearSize, "", 1));
+                "Got a flush all request with sequence id {}, clear {} pending" +
+                  " entries with size {}, clear failed replicas {}",
+                flushSequenceNumber, clearedCount,
+                StringUtils.TraditionalBinaryPrefix.long2String(clearedSize, "", 1),
+                failedReplicas);
             }
-            clearFailedReplica(flushSequenceNumber);
+            failedReplicas.clear();
             flushRequester.recordFlush(flushSequenceNumber);
           });
         }
@@ -371,7 +354,7 @@ public class RegionReplicationSink {
         // failed
         clearAllEntries();
         for (int replicaId = 1; replicaId < regionReplication; replicaId++) {
-          failedReplicas.put(replicaId, entry.key.getSequenceId());
+          failedReplicas.add(replicaId);
         }
         flushRequester.requestFlush(entry.key.getSequenceId());
       }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java
index 27c86c2..e84195b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java
@@ -466,7 +466,9 @@ public final class WALSplitUtil {
    * @param logEntry pair of WALKey and WALEdit instance stores WALKey and WALEdit instances
    *          extracted from the passed in WALEntry.
    * @return list of Pair&lt;MutationType, Mutation&gt; to be replayed
+   * @deprecated Since 3.0.0, will be removed in 4.0.0.
    */
+  @Deprecated
   public static List<MutationReplay> getMutationsFromWALEntry(AdminProtos.WALEntry entry,
       CellScanner cells, Pair<WALKey, WALEdit> logEntry, Durability durability) throws IOException {
     if (entry == null) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
index 56813af..4e5cd28 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
@@ -755,4 +755,10 @@ class MockRegionServer implements AdminProtos.AdminService.BlockingInterface,
   public RegionReplicationBufferManager getRegionReplicationBufferManager() {
     return null;
   }
+
+  @Override
+  public ReplicateWALEntryResponse replicateToReplica(RpcController controller,
+    ReplicateWALEntryRequest request) throws ServiceException {
+    return null;
+  }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
index 1db4bb8..6214b31 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
@@ -62,7 +62,6 @@ import org.apache.hadoop.hbase.client.RegionInfoBuilder;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
 import org.apache.hadoop.hbase.executor.ExecutorService;
-import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorConfig;
 import org.apache.hadoop.hbase.executor.ExecutorType;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
@@ -113,6 +112,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescript
  * Tests of HRegion methods for replaying flush, compaction, region open, etc events for secondary
  * region replicas
  */
+@SuppressWarnings("deprecation")
 @Category(LargeTests.class)
 public class TestHRegionReplayEvents {
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReplicateToReplica.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReplicateToReplica.java
new file mode 100644
index 0000000..d9f846d
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReplicateToReplica.java
@@ -0,0 +1,388 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNameTestRule;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.client.RegionReplicaUtil;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.executor.ExecutorService;
+import org.apache.hadoop.hbase.executor.ExecutorType;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil;
+import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult;
+import org.apache.hadoop.hbase.regionserver.regionreplication.RegionReplicationBufferManager;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
+
+@Category({ RegionServerTests.class, MediumTests.class })
+public class TestReplicateToReplica {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestReplicateToReplica.class);
+
+  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
+
+  private static byte[] FAMILY = Bytes.toBytes("family");
+
+  private static byte[] QUAL = Bytes.toBytes("qualifier");
+
+  private static ExecutorService EXEC;
+
+  @Rule
+  public final TableNameTestRule name = new TableNameTestRule();
+
+  private TableName tableName;
+
+  private Path testDir;
+
+  private TableDescriptor td;
+
+  private RegionServerServices rss;
+
+  private AsyncClusterConnection conn;
+
+  private RegionReplicationBufferManager manager;
+
+  private FlushRequester flushRequester;
+
+  private HRegion primary;
+
+  private HRegion secondary;
+
+  private WALFactory walFactory;
+
+  private boolean queueReqAndResps;
+
+  private Queue<Pair<List<WAL.Entry>, CompletableFuture<Void>>> reqAndResps;
+
+  private static List<Put> TO_ADD_AFTER_PREPARE_FLUSH;
+
+  public static final class HRegionForTest extends HRegion {
+
+    public HRegionForTest(HRegionFileSystem fs, WAL wal, Configuration confParam,
+      TableDescriptor htd, RegionServerServices rsServices) {
+      super(fs, wal, confParam, htd, rsServices);
+    }
+
+    @SuppressWarnings("deprecation")
+    public HRegionForTest(Path tableDir, WAL wal, FileSystem fs, Configuration confParam,
+      RegionInfo regionInfo, TableDescriptor htd, RegionServerServices rsServices) {
+      super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices);
+    }
+
+    @Override
+    protected PrepareFlushResult internalPrepareFlushCache(WAL wal, long myseqid,
+      Collection<HStore> storesToFlush, MonitoredTask status, boolean writeFlushWalMarker,
+      FlushLifeCycleTracker tracker) throws IOException {
+      PrepareFlushResult result = super.internalPrepareFlushCache(wal, myseqid, storesToFlush,
+        status, writeFlushWalMarker, tracker);
+      for (Put put : TO_ADD_AFTER_PREPARE_FLUSH) {
+        put(put);
+      }
+      TO_ADD_AFTER_PREPARE_FLUSH.clear();
+      return result;
+    }
+
+  }
+
+  @BeforeClass
+  public static void setUpBeforeClass() {
+    Configuration conf = UTIL.getConfiguration();
+    conf.setInt("hbase.region.read-replica.sink.flush.min-interval.secs", 1);
+    conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true);
+    conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CATALOG_CONF_KEY, true);
+    conf.setClass(HConstants.REGION_IMPL, HRegionForTest.class, HRegion.class);
+    EXEC = new ExecutorService("test");
+    EXEC.startExecutorService(EXEC.new ExecutorConfig().setCorePoolSize(1)
+      .setExecutorType(ExecutorType.RS_COMPACTED_FILES_DISCHARGER));
+    ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null,
+      MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() {
+    EXEC.shutdown();
+    UTIL.cleanupTestDir();
+  }
+
+  @Before
+  public void setUp() throws IOException {
+    TO_ADD_AFTER_PREPARE_FLUSH = new ArrayList<>();
+    tableName = name.getTableName();
+    testDir = UTIL.getDataTestDir(tableName.getNameAsString());
+    Configuration conf = UTIL.getConfiguration();
+    conf.set(HConstants.HBASE_DIR, testDir.toString());
+
+    td = TableDescriptorBuilder.newBuilder(tableName)
+      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).setRegionReplication(2)
+      .setRegionMemStoreReplication(true).build();
+
+    reqAndResps = new ArrayDeque<>();
+    queueReqAndResps = true;
+    conn = mock(AsyncClusterConnection.class);
+    when(conn.replicate(any(), anyList(), anyInt(), anyLong(), anyLong())).thenAnswer(i -> {
+      if (queueReqAndResps) {
+        @SuppressWarnings("unchecked")
+        List<WAL.Entry> entries = i.getArgument(1, List.class);
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        reqAndResps.add(Pair.newPair(entries, future));
+        return future;
+      } else {
+        return CompletableFuture.completedFuture(null);
+      }
+    });
+
+    flushRequester = mock(FlushRequester.class);
+
+    rss = mock(RegionServerServices.class);
+    when(rss.getServerName()).thenReturn(ServerName.valueOf("foo", 1, 1));
+    when(rss.getConfiguration()).thenReturn(conf);
+    when(rss.getRegionServerAccounting()).thenReturn(new RegionServerAccounting(conf));
+    when(rss.getExecutorService()).thenReturn(EXEC);
+    when(rss.getAsyncClusterConnection()).thenReturn(conn);
+    when(rss.getFlushRequester()).thenReturn(flushRequester);
+
+    manager = new RegionReplicationBufferManager(rss);
+    when(rss.getRegionReplicationBufferManager()).thenReturn(manager);
+
+    RegionInfo primaryHri = RegionInfoBuilder.newBuilder(td.getTableName()).build();
+    RegionInfo secondaryHri = RegionReplicaUtil.getRegionInfoForReplica(primaryHri, 1);
+
+    walFactory = new WALFactory(conf, UUID.randomUUID().toString());
+    WAL wal = walFactory.getWAL(primaryHri);
+    primary = HRegion.createHRegion(primaryHri, testDir, conf, td, wal);
+    primary.close();
+
+    primary = HRegion.openHRegion(testDir, primaryHri, td, wal, conf, rss, null);
+    secondary = HRegion.openHRegion(secondaryHri, td, null, conf, rss, null);
+
+    when(rss.getRegions()).then(i -> {
+      return Arrays.asList(primary, secondary);
+    });
+
+    // process the open events
+    replicateAll();
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    // close region will issue a flush, which will enqueue an edit into the replication sink so we
+    // need to complete it otherwise the test will hang.
+    queueReqAndResps = false;
+    failAll();
+    HBaseTestingUtil.closeRegionAndWAL(primary);
+    HBaseTestingUtil.closeRegionAndWAL(secondary);
+    if (walFactory != null) {
+      walFactory.close();
+    }
+  }
+
+  private FlushResult flushPrimary() throws IOException {
+    return primary.flushcache(true, true, FlushLifeCycleTracker.DUMMY);
+  }
+
+  private void replicate(Pair<List<WAL.Entry>, CompletableFuture<Void>> pair) throws IOException {
+    Pair<ReplicateWALEntryRequest, CellScanner> params = ReplicationProtobufUtil
+      .buildReplicateWALEntryRequest(pair.getFirst().toArray(new WAL.Entry[0]),
+        secondary.getRegionInfo().getEncodedNameAsBytes(), null, null, null);
+    for (WALEntry entry : params.getFirst().getEntryList()) {
+      secondary.replayWALEntry(entry, params.getSecond());
+    }
+    pair.getSecond().complete(null);
+  }
+
+  private void replicateOne() throws IOException {
+    replicate(reqAndResps.remove());
+  }
+
+  private void replicateAll() throws IOException {
+    for (Pair<List<WAL.Entry>, CompletableFuture<Void>> pair;;) {
+      pair = reqAndResps.poll();
+      if (pair == null) {
+        break;
+      }
+      replicate(pair);
+    }
+  }
+
+  private void failOne() {
+    reqAndResps.remove().getSecond().completeExceptionally(new IOException("Inject error"));
+  }
+
+  private void failAll() {
+    for (Pair<List<WAL.Entry>, CompletableFuture<Void>> pair;;) {
+      pair = reqAndResps.poll();
+      if (pair == null) {
+        break;
+      }
+      pair.getSecond().completeExceptionally(new IOException("Inject error"));
+    }
+  }
+
+  @Test
+  public void testNormalReplicate() throws IOException {
+    byte[] row = Bytes.toBytes(0);
+    primary.put(new Put(row).addColumn(FAMILY, QUAL, Bytes.toBytes(1)));
+    replicateOne();
+    assertEquals(1, Bytes.toInt(secondary.get(new Get(row)).getValue(FAMILY, QUAL)));
+  }
+
+  @Test
+  public void testNormalFlush() throws IOException {
+    byte[] row = Bytes.toBytes(0);
+    primary.put(new Put(row).addColumn(FAMILY, QUAL, Bytes.toBytes(1)));
+    TO_ADD_AFTER_PREPARE_FLUSH.add(new Put(row).addColumn(FAMILY, QUAL, Bytes.toBytes(2)));
+    flushPrimary();
+    replicateAll();
+    assertEquals(2, Bytes.toInt(secondary.get(new Get(row)).getValue(FAMILY, QUAL)));
+
+    // we should have the same memstore size, i.e, the secondary should have also dropped the
+    // snapshot
+    assertEquals(primary.getMemStoreDataSize(), secondary.getMemStoreDataSize());
+  }
+
+  @Test
+  public void testErrorBeforeFlushStart() throws IOException {
+    byte[] row = Bytes.toBytes(0);
+    primary.put(new Put(row).addColumn(FAMILY, QUAL, Bytes.toBytes(1)));
+    failOne();
+    verify(flushRequester, times(1)).requestFlush(any(), anyList(), any());
+    TO_ADD_AFTER_PREPARE_FLUSH.add(new Put(row).addColumn(FAMILY, QUAL, Bytes.toBytes(2)));
+    flushPrimary();
+    // this also tests start flush with empty memstore at secondary replica side
+    replicateAll();
+    assertEquals(2, Bytes.toInt(secondary.get(new Get(row)).getValue(FAMILY, QUAL)));
+    assertEquals(primary.getMemStoreDataSize(), secondary.getMemStoreDataSize());
+  }
+
+  @Test
+  public void testErrorAfterFlushStartBeforeFlushCommit() throws IOException {
+    primary.put(new Put(Bytes.toBytes(0)).addColumn(FAMILY, QUAL, Bytes.toBytes(1)));
+    replicateAll();
+    TO_ADD_AFTER_PREPARE_FLUSH
+      .add(new Put(Bytes.toBytes(1)).addColumn(FAMILY, QUAL, Bytes.toBytes(2)));
+    flushPrimary();
+    // replicate the start flush edit
+    replicateOne();
+    // fail the remaining edits, the put and the commit flush edit
+    failOne();
+    verify(flushRequester, times(1)).requestFlush(any(), anyList(), any());
+    primary.put(new Put(Bytes.toBytes(2)).addColumn(FAMILY, QUAL, Bytes.toBytes(3)));
+    flushPrimary();
+    replicateAll();
+    for (int i = 0; i < 3; i++) {
+      assertEquals(i + 1,
+        Bytes.toInt(secondary.get(new Get(Bytes.toBytes(i))).getValue(FAMILY, QUAL)));
+    }
+    // should have nothing in memstore
+    assertEquals(0, secondary.getMemStoreDataSize());
+  }
+
+  @Test
+  public void testCatchUpWithCannotFlush() throws IOException, InterruptedException {
+    byte[] row = Bytes.toBytes(0);
+    primary.put(new Put(row).addColumn(FAMILY, QUAL, Bytes.toBytes(1)));
+    failOne();
+    verify(flushRequester, times(1)).requestFlush(any(), anyList(), any());
+    flushPrimary();
+    failAll();
+    Thread.sleep(2000);
+    // we will request flush the second time
+    verify(flushRequester, times(2)).requestFlush(any(), anyList(), any());
+    // we can not flush because no content in memstore
+    FlushResult result = flushPrimary();
+    assertEquals(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, result.getResult());
+    // the secondary replica does not have this row yet
+    assertFalse(secondary.get(new Get(row).setCheckExistenceOnly(true)).getExists().booleanValue());
+    // replicate the can not flush edit
+    replicateOne();
+    // we should have the row now
+    assertEquals(1, Bytes.toInt(secondary.get(new Get(row)).getValue(FAMILY, QUAL)));
+  }
+
+  @Test
+  public void testCatchUpWithReopen() throws IOException {
+    byte[] row = Bytes.toBytes(0);
+    primary.put(new Put(row).addColumn(FAMILY, QUAL, Bytes.toBytes(1)));
+    failOne();
+    primary.close();
+    // the secondary replica does not have this row yet, although the above close has flushed the
+    // data out
+    assertFalse(secondary.get(new Get(row).setCheckExistenceOnly(true)).getExists().booleanValue());
+
+    // reopen
+    primary = HRegion.openHRegion(testDir, primary.getRegionInfo(), td, primary.getWAL(),
+      UTIL.getConfiguration(), rss, null);
+    replicateAll();
+    // we should have the row now
+    assertEquals(1, Bytes.toInt(secondary.get(new Get(row)).getValue(FAMILY, QUAL)));
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSink.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSink.java
index 76a224b..918f644 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSink.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSink.java
@@ -236,7 +236,7 @@ public class TestRegionReplicationSink {
         throw new IllegalStateException();
       }, () -> new TreeMap<>(Bytes.BYTES_COMPARATOR)));
     FlushDescriptor fd =
-      ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH, primary, 2L, committedFiles);
+      ProtobufUtil.toFlushDescriptor(FlushAction.START_FLUSH, primary, 2L, committedFiles);
     WALEdit edit2 = WALEdit.createFlushWALEdit(primary, fd);
     sink.add(key2, edit2, rpcCall2);
 
@@ -300,7 +300,7 @@ public class TestRegionReplicationSink {
         throw new IllegalStateException();
       }, () -> new TreeMap<>(Bytes.BYTES_COMPARATOR)));
     FlushDescriptor fd =
-      ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH, primary, 2L, committedFiles);
+      ProtobufUtil.toFlushDescriptor(FlushAction.START_FLUSH, primary, 2L, committedFiles);
     WALEdit edit3 = WALEdit.createFlushWALEdit(primary, fd);
     sink.add(key3, edit3, rpcCall3);
 
@@ -313,91 +313,4 @@ public class TestRegionReplicationSink {
     // should have send out all so no pending entries.
     assertEquals(0, sink.pendingSize());
   }
-
-  @Test
-  public void testNotClearFailedReplica() {
-    // simulate this scenario:
-    // 1. prepare flush
-    // 2. add one edit broken
-    // 3. commit flush with flush sequence number less than the previous edit(this is the normal
-    // case)
-    // we should not clear the failed replica as we do not flush the broken edit out with this
-    // flush, we need an extra flush to flush it out
-    MutableInt next = new MutableInt(0);
-    List<CompletableFuture<Void>> futures =
-      Stream.generate(() -> new CompletableFuture<Void>()).limit(8).collect(Collectors.toList());
-    when(conn.replicate(any(), anyList(), anyInt(), anyLong(), anyLong()))
-      .then(i -> futures.get(next.getAndIncrement()));
-    when(manager.increase(anyLong())).thenReturn(true);
-
-    ServerCall<?> rpcCall1 = mock(ServerCall.class);
-    WALKeyImpl key1 = mock(WALKeyImpl.class);
-    when(key1.estimatedSerializedSizeOf()).thenReturn(100L);
-    when(key1.getSequenceId()).thenReturn(1L);
-    Map<byte[], List<Path>> committedFiles = td.getColumnFamilyNames().stream()
-      .collect(Collectors.toMap(Function.identity(), k -> Collections.emptyList(), (u, v) -> {
-        throw new IllegalStateException();
-      }, () -> new TreeMap<>(Bytes.BYTES_COMPARATOR)));
-    FlushDescriptor fd =
-      ProtobufUtil.toFlushDescriptor(FlushAction.START_FLUSH, primary, 1L, committedFiles);
-    WALEdit edit1 = WALEdit.createFlushWALEdit(primary, fd);
-    sink.add(key1, edit1, rpcCall1);
-
-    futures.get(0).complete(null);
-    futures.get(1).complete(null);
-
-    ServerCall<?> rpcCall2 = mock(ServerCall.class);
-    WALKeyImpl key2 = mock(WALKeyImpl.class);
-    when(key2.estimatedSerializedSizeOf()).thenReturn(200L);
-    when(key2.getSequenceId()).thenReturn(2L);
-    WALEdit edit2 = mock(WALEdit.class);
-    when(edit2.estimatedSerializedSizeOf()).thenReturn(2000L);
-    sink.add(key2, edit2, rpcCall2);
-
-    // fail the call to replica 1
-    futures.get(2).completeExceptionally(new IOException("inject error"));
-    futures.get(3).complete(null);
-
-    ServerCall<?> rpcCall3 = mock(ServerCall.class);
-    WALKeyImpl key3 = mock(WALKeyImpl.class);
-    when(key3.estimatedSerializedSizeOf()).thenReturn(300L);
-    when(key3.getSequenceId()).thenReturn(3L);
-    FlushDescriptor fd3 =
-      ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH, primary, 1L, committedFiles);
-    WALEdit edit3 = WALEdit.createFlushWALEdit(primary, fd3);
-    sink.add(key3, edit3, rpcCall3);
-
-    // we should only call replicate once for edit3, since replica 1 is marked as failed, and the
-    // flush request can not clean the failed replica since the flush sequence number is not greater
-    // than sequence id of the last failed edit
-    verify(conn, times(5)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong());
-    futures.get(4).complete(null);
-
-    ServerCall<?> rpcCall4 = mock(ServerCall.class);
-    WALKeyImpl key4 = mock(WALKeyImpl.class);
-    when(key4.estimatedSerializedSizeOf()).thenReturn(400L);
-    when(key4.getSequenceId()).thenReturn(4L);
-    WALEdit edit4 = mock(WALEdit.class);
-    when(edit4.estimatedSerializedSizeOf()).thenReturn(4000L);
-    sink.add(key4, edit4, rpcCall4);
-
-    // still, only send to replica 2
-    verify(conn, times(6)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong());
-    futures.get(5).complete(null);
-
-    ServerCall<?> rpcCall5 = mock(ServerCall.class);
-    WALKeyImpl key5 = mock(WALKeyImpl.class);
-    when(key5.estimatedSerializedSizeOf()).thenReturn(300L);
-    when(key5.getSequenceId()).thenReturn(3L);
-    FlushDescriptor fd5 =
-      ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH, primary, 4L, committedFiles);
-    WALEdit edit5 = WALEdit.createFlushWALEdit(primary, fd5);
-    sink.add(key5, edit5, rpcCall5);
-
-    futures.get(6).complete(null);
-    futures.get(7).complete(null);
-    // should have cleared the failed replica because the flush sequence number is greater than than
-    // the sequence id of the last failed edit
-    verify(conn, times(8)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong());
-  }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplication.java
index a4da640..2fcfc29 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplication.java
@@ -89,7 +89,6 @@ public class TestMetaRegionReplicaReplication {
     conf.setInt("zookeeper.recovery.retry", 1);
     conf.setInt("zookeeper.recovery.retry.intervalmill", 10);
     conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
-    conf.setInt("replication.stats.thread.period.seconds", 5);
     conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
     conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);
     // Enable hbase:meta replication.
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplication.java
index 231c9e1..ac279ed 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplication.java
@@ -85,7 +85,6 @@ public class TestRegionReplicaReplication {
     conf.setInt("zookeeper.recovery.retry.intervalmill", 10);
     conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true);
     conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
-    conf.setInt("replication.stats.thread.period.seconds", 5);
     conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
     conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); // less number of retries is needed
     conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);