You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2022/06/03 08:39:23 UTC

[hbase] branch master updated: HBASE-26993 Make the new framework for region replication could work for SKIP_WAL (#4392)

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new d57159f31cb HBASE-26993 Make the new framework for region replication could work for SKIP_WAL (#4392)
d57159f31cb is described below

commit d57159f31cb7be4d9ced0d7b95e2c78c43d160a1
Author: chenglei <ch...@apache.org>
AuthorDate: Fri Jun 3 16:39:17 2022 +0800

    HBASE-26993 Make the new framework for region replication could work for SKIP_WAL (#4392)
    
    Signed-off-by: Duo Zhang <zh...@apache.org>
---
 .../apache/hadoop/hbase/regionserver/HRegion.java  | 258 +++++++++++++++++----
 .../regionserver/MiniBatchOperationInProgress.java |  15 ++
 .../java/org/apache/hadoop/hbase/wal/WALEdit.java  |  24 ++
 .../TestRegionReplicaReplicationError.java         |  39 +++-
 .../TestRegionReplicationForSkipWAL.java           | 193 +++++++++++++++
 .../regionserver/TestRegionReplicaReplication.java |  33 ++-
 6 files changed, 502 insertions(+), 60 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index d14bb1a7e7b..f71a94ad4ef 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -2891,7 +2891,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
        * {@link FlushDescriptor} and attach the {@link RegionReplicationSink#add} to the
        * flushOpSeqIdMVCCEntry,see HBASE-26960 for more details.
        */
-      this.attachReplicateRegionReplicaToFlushOpSeqIdMVCCEntry(flushOpSeqIdMVCCEntry, desc, sink);
+      this.attachRegionReplicationToFlushOpSeqIdMVCCEntry(flushOpSeqIdMVCCEntry, desc, sink);
       return false;
     }
 
@@ -2912,7 +2912,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    * Create {@link WALEdit} for {@link FlushDescriptor} and attach {@link RegionReplicationSink#add}
    * to the flushOpSeqIdMVCCEntry.
    */
-  private void attachReplicateRegionReplicaToFlushOpSeqIdMVCCEntry(WriteEntry flushOpSeqIdMVCCEntry,
+  private void attachRegionReplicationToFlushOpSeqIdMVCCEntry(WriteEntry flushOpSeqIdMVCCEntry,
     FlushDescriptor desc, RegionReplicationSink sink) {
     assert !flushOpSeqIdMVCCEntry.getCompletionAction().isPresent();
     WALEdit flushMarkerWALEdit = WALEdit.createFlushWALEdit(getRegionInfo(), desc);
@@ -3372,8 +3372,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
      * Write mini-batch operations to MemStore
      */
     public abstract WriteEntry writeMiniBatchOperationsToMemStore(
-      final MiniBatchOperationInProgress<Mutation> miniBatchOp, final WriteEntry writeEntry)
-      throws IOException;
+      final MiniBatchOperationInProgress<Mutation> miniBatchOp, final WriteEntry writeEntry,
+      long now) throws IOException;
 
     protected void writeMiniBatchOperationsToMemStore(
       final MiniBatchOperationInProgress<Mutation> miniBatchOp, final long writeNumber)
@@ -3592,6 +3592,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         walEditsFromCoprocessors, nextIndexToProcess, lastIndexExclusive, readyToWriteCount);
     }
 
+    protected WALEdit createWALEdit(final MiniBatchOperationInProgress<Mutation> miniBatchOp) {
+      return new WALEdit(miniBatchOp.getCellCount(), isInReplay());
+    }
+
     /**
      * Builds separate WALEdit per nonce by applying input mutations. If WALEdits from CP are
      * present, they are merged to result WALEdit.
@@ -3609,6 +3613,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
           // we use durability of the original mutation for the mutation passed by CP.
           if (region.getEffectiveDurability(m.getDurability()) == Durability.SKIP_WAL) {
             region.recordMutationWithoutWal(m.getFamilyCellMap());
+            /**
+             * Here is for HBASE-26993,in order to make the new framework for region replication
+             * could work for SKIP_WAL, we save the {@link Mutation} which
+             * {@link Mutation#getDurability} is {@link Durability#SKIP_WAL} in miniBatchOp.
+             */
+            cacheSkipWALMutationForRegionReplication(miniBatchOp, walEdits, familyCellMaps[index]);
             return true;
           }
 
@@ -3622,27 +3632,38 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
               || curWALEditForNonce.getFirst().getNonceGroup() != nonceGroup
               || curWALEditForNonce.getFirst().getNonce() != nonce
           ) {
-            curWALEditForNonce = new Pair<>(new NonceKey(nonceGroup, nonce),
-              new WALEdit(miniBatchOp.getCellCount(), isInReplay()));
+            curWALEditForNonce =
+              new Pair<>(new NonceKey(nonceGroup, nonce), createWALEdit(miniBatchOp));
             walEdits.add(curWALEditForNonce);
           }
           WALEdit walEdit = curWALEditForNonce.getSecond();
 
           // Add WAL edits from CPs.
           WALEdit fromCP = walEditsFromCoprocessors[index];
-          if (fromCP != null) {
-            for (Cell cell : fromCP.getCells()) {
-              walEdit.add(cell);
-            }
-          }
-          walEdit.add(familyCellMaps[index]);
-
+          List<Cell> cellsFromCP = fromCP == null ? Collections.emptyList() : fromCP.getCells();
+          addNonSkipWALMutationsToWALEdit(miniBatchOp, walEdit, cellsFromCP, familyCellMaps[index]);
           return true;
         }
       });
       return walEdits;
     }
 
+    protected void addNonSkipWALMutationsToWALEdit(
+      final MiniBatchOperationInProgress<Mutation> miniBatchOp, WALEdit walEdit,
+      List<Cell> cellsFromCP, Map<byte[], List<Cell>> familyCellMap) {
+      doAddCellsToWALEdit(walEdit, cellsFromCP, familyCellMap);
+    }
+
+    protected static void doAddCellsToWALEdit(WALEdit walEdit, List<Cell> cellsFromCP,
+      Map<byte[], List<Cell>> familyCellMap) {
+      walEdit.add(cellsFromCP);
+      walEdit.add(familyCellMap);
+    }
+
+    protected abstract void cacheSkipWALMutationForRegionReplication(
+      final MiniBatchOperationInProgress<Mutation> miniBatchOp,
+      List<Pair<NonceKey, WALEdit>> walEdits, Map<byte[], List<Cell>> familyCellMap);
+
     /**
      * This method completes mini-batch operations by calling postBatchMutate() CP hook (if
      * required) and completing mvcc.
@@ -3717,6 +3738,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     private long nonceGroup;
     private long nonce;
     protected boolean canProceed;
+    private boolean regionReplicateEnable;
 
     public MutationBatchOperation(final HRegion region, Mutation[] operations, boolean atomic,
       long nonceGroup, long nonce) {
@@ -3724,6 +3746,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       this.atomic = atomic;
       this.nonceGroup = nonceGroup;
       this.nonce = nonce;
+      this.regionReplicateEnable = region.regionReplicationSink.isPresent();
     }
 
     @Override
@@ -4140,17 +4163,115 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       return walEdits;
     }
 
+    /**
+     * Here is for HBASE-26993,in order to make the new framework for region replication could work
+     * for SKIP_WAL, we save the {@link Mutation} which {@link Mutation#getDurability} is
+     * {@link Durability#SKIP_WAL} in miniBatchOp.
+     */
+    @Override
+    protected void cacheSkipWALMutationForRegionReplication(
+      MiniBatchOperationInProgress<Mutation> miniBatchOp,
+      List<Pair<NonceKey, WALEdit>> nonceKeyAndWALEdits, Map<byte[], List<Cell>> familyCellMap) {
+      if (!this.regionReplicateEnable) {
+        return;
+      }
+
+      WALEdit walEditForReplicateIfExistsSkipWAL =
+        miniBatchOp.getWalEditForReplicateIfExistsSkipWAL();
+      /**
+       * When there is a SKIP_WAL {@link Mutation},we create a new {@link WALEdit} for replicating
+       * to region replica,first we fill the existing {@link WALEdit} to it and then add the
+       * {@link Mutation} which is SKIP_WAL to it.
+       */
+      if (walEditForReplicateIfExistsSkipWAL == null) {
+        walEditForReplicateIfExistsSkipWAL =
+          this.createWALEditForReplicateSkipWAL(miniBatchOp, nonceKeyAndWALEdits);
+        miniBatchOp.setWalEditForReplicateIfExistsSkipWAL(walEditForReplicateIfExistsSkipWAL);
+      }
+      walEditForReplicateIfExistsSkipWAL.add(familyCellMap);
+
+    }
+
+    private WALEdit createWALEditForReplicateSkipWAL(
+      MiniBatchOperationInProgress<Mutation> miniBatchOp,
+      List<Pair<NonceKey, WALEdit>> nonceKeyAndWALEdits) {
+      if (nonceKeyAndWALEdits.isEmpty()) {
+        return this.createWALEdit(miniBatchOp);
+      }
+      // for MutationBatchOperation, more than one nonce is not allowed
+      assert nonceKeyAndWALEdits.size() == 1;
+      WALEdit currentWALEdit = nonceKeyAndWALEdits.get(0).getSecond();
+      return new WALEdit(currentWALEdit);
+    }
+
+    @Override
+    protected void addNonSkipWALMutationsToWALEdit(
+      final MiniBatchOperationInProgress<Mutation> miniBatchOp, WALEdit walEdit,
+      List<Cell> cellsFromCP, Map<byte[], List<Cell>> familyCellMap) {
+
+      super.addNonSkipWALMutationsToWALEdit(miniBatchOp, walEdit, cellsFromCP, familyCellMap);
+      WALEdit walEditForReplicateIfExistsSkipWAL =
+        miniBatchOp.getWalEditForReplicateIfExistsSkipWAL();
+      if (walEditForReplicateIfExistsSkipWAL == null) {
+        return;
+      }
+      /**
+       * When walEditForReplicateIfExistsSkipWAL is not null,it means there exists SKIP_WAL
+       * {@link Mutation} and we create a new {@link WALEdit} in
+       * {@link MutationBatchOperation#cacheSkipWALMutationForReplicateRegionReplica} for
+       * replicating to region replica, so here we also add non SKIP_WAL{@link Mutation}s to
+       * walEditForReplicateIfExistsSkipWAL.
+       */
+      doAddCellsToWALEdit(walEditForReplicateIfExistsSkipWAL, cellsFromCP, familyCellMap);
+    }
+
     @Override
     public WriteEntry writeMiniBatchOperationsToMemStore(
-      final MiniBatchOperationInProgress<Mutation> miniBatchOp, @Nullable WriteEntry writeEntry)
-      throws IOException {
+      final MiniBatchOperationInProgress<Mutation> miniBatchOp, @Nullable WriteEntry writeEntry,
+      long now) throws IOException {
+      boolean newWriteEntry = false;
       if (writeEntry == null) {
         writeEntry = region.mvcc.begin();
+        newWriteEntry = true;
       }
       super.writeMiniBatchOperationsToMemStore(miniBatchOp, writeEntry.getWriteNumber());
+      if (newWriteEntry) {
+        /**
+         * Here is for HBASE-26993 case 2,all {@link Mutation}s are {@link Durability#SKIP_WAL}. In
+         * order to make the new framework for region replication could work for SKIP_WAL,because
+         * there is no {@link RegionReplicationSink#add} attached in {@link HRegion#doWALAppend},so
+         * here we get {@link WALEdit} from
+         * {@link MiniBatchOperationInProgress#getWalEditForReplicateIfExistsSkipWAL} and attach
+         * {@link RegionReplicationSink#add} to the new mvcc writeEntry.
+         */
+        attachRegionReplicationToMVCCEntry(miniBatchOp, writeEntry, now);
+      }
       return writeEntry;
     }
 
+    private WALKeyImpl createWALKey(long now) {
+      // for MutationBatchOperation,isReplay is false.
+      return this.region.createWALKeyForWALAppend(false, this, now, this.nonceGroup, this.nonce);
+    }
+
+    /**
+     * Create {@link WALKeyImpl} and get {@link WALEdit} from miniBatchOp and attach
+     * {@link RegionReplicationSink#add} to the mvccWriteEntry.
+     */
+    private void attachRegionReplicationToMVCCEntry(
+      final MiniBatchOperationInProgress<Mutation> miniBatchOp, WriteEntry mvccWriteEntry, long now)
+      throws IOException {
+      if (!this.regionReplicateEnable) {
+        return;
+      }
+      assert !mvccWriteEntry.getCompletionAction().isPresent();
+
+      final WALKeyImpl walKey = this.createWALKey(now);
+      walKey.setWriteEntry(mvccWriteEntry);
+      region.doAttachReplicateRegionReplicaAction(walKey,
+        miniBatchOp.getWalEditForReplicateIfExistsSkipWAL(), mvccWriteEntry);
+    }
+
     @Override
     public void completeMiniBatchOperations(
       final MiniBatchOperationInProgress<Mutation> miniBatchOp, final WriteEntry writeEntry)
@@ -4466,8 +4587,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
     @Override
     public WriteEntry writeMiniBatchOperationsToMemStore(
-      final MiniBatchOperationInProgress<Mutation> miniBatchOp, final WriteEntry writeEntry)
-      throws IOException {
+      final MiniBatchOperationInProgress<Mutation> miniBatchOp, final WriteEntry writeEntry,
+      long now) throws IOException {
       super.writeMiniBatchOperationsToMemStore(miniBatchOp, getOrigLogSeqNum());
       return writeEntry;
     }
@@ -4479,6 +4600,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       super.completeMiniBatchOperations(miniBatchOp, writeEntry);
       region.mvcc.advanceTo(getOrigLogSeqNum());
     }
+
+    @Override
+    protected void cacheSkipWALMutationForRegionReplication(
+      MiniBatchOperationInProgress<Mutation> miniBatchOp, List<Pair<NonceKey, WALEdit>> walEdits,
+      Map<byte[], List<Cell>> familyCellMap) {
+      // There is no action to do if current region is secondary replica
+    }
+
   }
 
   public OperationStatus[] batchMutate(Mutation[] mutations, boolean atomic, long nonceGroup,
@@ -4647,8 +4776,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         NonceKey nonceKey = nonceKeyWALEditPair.getFirst();
 
         if (walEdit != null && !walEdit.isEmpty()) {
-          writeEntry = doWALAppend(walEdit, batchOp.durability, batchOp.getClusterIds(), now,
-            nonceKey.getNonceGroup(), nonceKey.getNonce(), batchOp.getOrigLogSeqNum());
+          writeEntry = doWALAppend(walEdit, batchOp, miniBatchOp, now, nonceKey);
         }
 
         // Complete mvcc for all but last writeEntry (for replay case)
@@ -4660,7 +4788,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
       // STEP 5. Write back to memStore
       // NOTE: writeEntry can be null here
-      writeEntry = batchOp.writeMiniBatchOperationsToMemStore(miniBatchOp, writeEntry);
+      writeEntry = batchOp.writeMiniBatchOperationsToMemStore(miniBatchOp, writeEntry, now);
 
       // STEP 6. Complete MiniBatchOperations: If required calls postBatchMutate() CP hook and
       // complete mvcc for last writeEntry
@@ -7903,42 +8031,46 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     }, () -> createRegionSpan("Region.increment"));
   }
 
+  private WALKeyImpl createWALKeyForWALAppend(boolean isReplay, BatchOperation<?> batchOp, long now,
+    long nonceGroup, long nonce) {
+    WALKeyImpl walKey = isReplay
+      ? new WALKeyImpl(this.getRegionInfo().getEncodedNameAsBytes(),
+        this.htableDescriptor.getTableName(), SequenceId.NO_SEQUENCE_ID, now,
+        batchOp.getClusterIds(), nonceGroup, nonce, mvcc)
+      : new WALKeyImpl(this.getRegionInfo().getEncodedNameAsBytes(),
+        this.htableDescriptor.getTableName(), SequenceId.NO_SEQUENCE_ID, now,
+        batchOp.getClusterIds(), nonceGroup, nonce, mvcc, this.getReplicationScope());
+    if (isReplay) {
+      walKey.setOrigLogSeqNum(batchOp.getOrigLogSeqNum());
+    }
+    return walKey;
+  }
+
   /**
    * @return writeEntry associated with this append
    */
-  private WriteEntry doWALAppend(WALEdit walEdit, Durability durability, List<UUID> clusterIds,
-    long now, long nonceGroup, long nonce, long origLogSeqNum) throws IOException {
+  private WriteEntry doWALAppend(WALEdit walEdit, BatchOperation<?> batchOp,
+    MiniBatchOperationInProgress<Mutation> miniBatchOp, long now, NonceKey nonceKey)
+    throws IOException {
     Preconditions.checkArgument(walEdit != null && !walEdit.isEmpty(), "WALEdit is null or empty!");
-    Preconditions.checkArgument(!walEdit.isReplay() || origLogSeqNum != SequenceId.NO_SEQUENCE_ID,
+    Preconditions.checkArgument(
+      !walEdit.isReplay() || batchOp.getOrigLogSeqNum() != SequenceId.NO_SEQUENCE_ID,
       "Invalid replay sequence Id for replay WALEdit!");
-    // Using default cluster id, as this can only happen in the originating cluster.
-    // A slave cluster receives the final value (not the delta) as a Put. We use HLogKey
-    // here instead of WALKeyImpl directly to support legacy coprocessors.
-    WALKeyImpl walKey = walEdit.isReplay()
-      ? new WALKeyImpl(this.getRegionInfo().getEncodedNameAsBytes(),
-        this.htableDescriptor.getTableName(), SequenceId.NO_SEQUENCE_ID, now, clusterIds,
-        nonceGroup, nonce, mvcc)
-      : new WALKeyImpl(this.getRegionInfo().getEncodedNameAsBytes(),
-        this.htableDescriptor.getTableName(), SequenceId.NO_SEQUENCE_ID, now, clusterIds,
-        nonceGroup, nonce, mvcc, this.getReplicationScope());
-    if (walEdit.isReplay()) {
-      walKey.setOrigLogSeqNum(origLogSeqNum);
-    }
+
+    WALKeyImpl walKey = createWALKeyForWALAppend(walEdit.isReplay(), batchOp, now,
+      nonceKey.getNonceGroup(), nonceKey.getNonce());
     // don't call the coproc hook for writes to the WAL caused by
     // system lifecycle events like flushes or compactions
     if (this.coprocessorHost != null && !walEdit.isMetaEdit()) {
       this.coprocessorHost.preWALAppend(walKey, walEdit);
     }
-    ServerCall<?> rpcCall = RpcServer.getCurrentServerCallWithCellScanner().orElse(null);
     try {
       long txid = this.wal.appendData(this.getRegionInfo(), walKey, walEdit);
       WriteEntry writeEntry = walKey.getWriteEntry();
-      regionReplicationSink.ifPresent(sink -> writeEntry.attachCompletionAction(() -> {
-        sink.add(walKey, walEdit, rpcCall);
-      }));
+      this.attachRegionReplicationInWALAppend(batchOp, miniBatchOp, walKey, walEdit, writeEntry);
       // Call sync on our edit.
       if (txid != 0) {
-        sync(txid, durability);
+        sync(txid, batchOp.durability);
       }
       return writeEntry;
     } catch (IOException ioe) {
@@ -7947,7 +8079,51 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       }
       throw ioe;
     }
+  }
 
+  /**
+   * Attach {@link RegionReplicationSink#add} to the mvcc writeEntry for replicating to region
+   * replica.
+   */
+  private void attachRegionReplicationInWALAppend(BatchOperation<?> batchOp,
+    MiniBatchOperationInProgress<Mutation> miniBatchOp, WALKeyImpl walKey, WALEdit walEdit,
+    WriteEntry writeEntry) throws IOException {
+    if (!regionReplicationSink.isPresent()) {
+      return;
+    }
+    /**
+     * If {@link HRegion#regionReplicationSink} is present,only {@link MutationBatchOperation} is
+     * used and {@link NonceKey} is all the same for {@link Mutation}s in
+     * {@link MutationBatchOperation},so for HBASE-26993 case 1,if
+     * {@link MiniBatchOperationInProgress#getWalEditForReplicateSkipWAL} is not null and we could
+     * enter {@link HRegion#doWALAppend},that means partial {@link Mutation}s are
+     * {@link Durability#SKIP_WAL}, we use
+     * {@link MiniBatchOperationInProgress#getWalEditForReplicateSkipWAL} to replicate to region
+     * replica,but if {@link MiniBatchOperationInProgress#getWalEditForReplicateSkipWAL} is
+     * null,that means there is no {@link Mutation} is {@link Durability#SKIP_WAL},so we just use
+     * walEdit to replicate.
+     */
+    assert batchOp instanceof MutationBatchOperation;
+    WALEdit walEditToUse = miniBatchOp.getWalEditForReplicateIfExistsSkipWAL();
+    if (walEditToUse == null) {
+      walEditToUse = walEdit;
+    }
+    doAttachReplicateRegionReplicaAction(walKey, walEditToUse, writeEntry);
+  }
+
+  /**
+   * Attach {@link RegionReplicationSink#add} to the mvcc writeEntry for replicating to region
+   * replica.
+   */
+  private void doAttachReplicateRegionReplicaAction(WALKeyImpl walKey, WALEdit walEdit,
+    WriteEntry writeEntry) throws IOException {
+    if (walEdit == null || walEdit.isEmpty()) {
+      return;
+    }
+    final ServerCall<?> rpcCall = RpcServer.getCurrentServerCallWithCellScanner().orElse(null);
+    regionReplicationSink.ifPresent(sink -> writeEntry.attachCompletionAction(() -> {
+      sink.add(walKey, walEdit, rpcCall);
+    }));
   }
 
   public static final long FIXED_OVERHEAD = ClassSize.estimateBase(HRegion.class, false);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java
index 0fa18cddb6f..69fb426f394 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java
@@ -48,6 +48,13 @@ public class MiniBatchOperationInProgress<T> {
   private int numOfDeletes = 0;
   private int numOfIncrements = 0;
   private int numOfAppends = 0;
+  /**
+   * Here is for HBASE-26993,saving the all the {@link Mutation}s if there is
+   * {@link Durability#SKIP_WAL} in {@link HRegion.BatchOperation#buildWALEdits} for
+   * {@link HRegion#doMiniBatchMutate} to also replicate {@link Mutation} which is
+   * {@link Durability#SKIP_WAL} to region replica.
+   */
+  private WALEdit walEditForReplicateIfExistsSkipWAL = null;
 
   public MiniBatchOperationInProgress(T[] operations, OperationStatus[] retCodeDetails,
     WALEdit[] walEditsFromCoprocessors, int firstIndex, int lastIndexExclusive,
@@ -182,4 +189,12 @@ public class MiniBatchOperationInProgress<T> {
   public void incrementNumOfAppends() {
     this.numOfAppends += 1;
   }
+
+  public WALEdit getWalEditForReplicateIfExistsSkipWAL() {
+    return walEditForReplicateIfExistsSkipWAL;
+  }
+
+  public void setWalEditForReplicateIfExistsSkipWAL(WALEdit walEditForReplicateSkipWAL) {
+    this.walEditForReplicateIfExistsSkipWAL = walEditForReplicateSkipWAL;
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALEdit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALEdit.java
index 6794c2d5bd2..c688f6b1de5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALEdit.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALEdit.java
@@ -177,6 +177,19 @@ public class WALEdit implements HeapSize {
     cells = new ArrayList<>(cellCount);
   }
 
+  /**
+   * Create a new WALEdit from a existing {@link WALEdit}.
+   */
+  public WALEdit(WALEdit walEdit) {
+    this.replay = walEdit.replay;
+    cells = new ArrayList<>(walEdit.cells);
+    if (walEdit.families != null) {
+      this.families = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+      this.families.addAll(walEdit.families);
+    }
+
+  }
+
   private Set<byte[]> getOrCreateFamilies() {
     if (this.families == null) {
       this.families = new TreeSet<>(Bytes.BYTES_COMPARATOR);
@@ -237,6 +250,17 @@ public class WALEdit implements HeapSize {
     return add(cell, CellUtil.cloneFamily(cell));
   }
 
+  @InterfaceAudience.Private
+  public WALEdit add(List<Cell> cells) {
+    if (cells == null || cells.isEmpty()) {
+      return this;
+    }
+    for (Cell cell : cells) {
+      add(cell);
+    }
+    return this;
+  }
+
   public boolean isEmpty() {
     return cells.isEmpty();
   }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicaReplicationError.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicaReplicationError.java
index e7022cfaa1c..e0d7895e083 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicaReplicationError.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicaReplicationError.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
@@ -28,6 +29,7 @@ import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
 import org.apache.hadoop.hbase.StartTestingClusterOption;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
@@ -37,6 +39,7 @@ import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
 import org.apache.hadoop.hbase.testclassification.FlakeyTests;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ConcurrentMapUtils;
 import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -69,7 +72,8 @@ public class TestRegionReplicaReplicationError {
 
   public static final class ErrorReplayRSRpcServices extends RSRpcServices {
 
-    private final AtomicInteger count = new AtomicInteger(0);
+    private final ConcurrentHashMap<HRegion, AtomicInteger> regionToCounter =
+      new ConcurrentHashMap<HRegion, AtomicInteger>();
 
     public ErrorReplayRSRpcServices(HRegionServer rs) throws IOException {
       super(rs);
@@ -89,8 +93,12 @@ public class TestRegionReplicaReplicationError {
       } catch (NotServingRegionException e) {
         throw new ServiceException(e);
       }
+
+      AtomicInteger counter =
+        ConcurrentMapUtils.computeIfAbsent(regionToCounter, region, () -> new AtomicInteger(0));
+
       // fail the first several request
-      if (region.getRegionInfo().getReplicaId() == 1 && count.addAndGet(entries.size()) < 100) {
+      if (region.getRegionInfo().getReplicaId() == 1 && counter.addAndGet(entries.size()) < 100) {
         throw new ServiceException("Inject error!");
       }
       return super.replicateToReplica(controller, request);
@@ -112,7 +120,7 @@ public class TestRegionReplicaReplicationError {
 
   private static final HBaseTestingUtil HTU = new HBaseTestingUtil();
 
-  private static TableName TN = TableName.valueOf("test");
+  private static String TN = "test";
 
   private static byte[] CF = Bytes.toBytes("cf");
 
@@ -124,9 +132,6 @@ public class TestRegionReplicaReplicationError {
       true);
     HTU.startMiniCluster(
       StartTestingClusterOption.builder().rsClass(RSForTest.class).numRegionServers(3).build());
-    TableDescriptor td = TableDescriptorBuilder.newBuilder(TN).setRegionReplication(3)
-      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(CF)).build();
-    HTU.getAdmin().createTable(td);
   }
 
   @AfterClass
@@ -145,8 +150,26 @@ public class TestRegionReplicaReplicationError {
   }
 
   @Test
-  public void test() throws IOException {
-    try (Table table = HTU.getConnection().getTable(TN)) {
+  public void testDefaultDurability() throws IOException {
+    doTest(false);
+  }
+
+  @Test
+  public void testSkipWAL() throws IOException {
+    doTest(true);
+  }
+
+  private void doTest(boolean skipWAL) throws IOException {
+    TableName tableName = TableName.valueOf(TN + (skipWAL ? "_skipWAL" : ""));
+    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName)
+      .setRegionReplication(3).setColumnFamily(ColumnFamilyDescriptorBuilder.of(CF));
+    if (skipWAL) {
+      builder.setDurability(Durability.SKIP_WAL);
+    }
+    TableDescriptor td = builder.build();
+    HTU.getAdmin().createTable(td);
+
+    try (Table table = HTU.getConnection().getTable(tableName)) {
       for (int i = 0; i < 500; i++) {
         table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
       }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationForSkipWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationForSkipWAL.java
new file mode 100644
index 00000000000..e8880267118
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationForSkipWAL.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.regionreplication;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.StartTestingClusterOption;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Consistency;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionReplicaUtil;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ RegionServerTests.class, LargeTests.class })
+public class TestRegionReplicationForSkipWAL {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestRegionReplicationForSkipWAL.class);
+
+  private static final byte[] FAM1 = Bytes.toBytes("family_test1");
+
+  private static final byte[] QUAL1 = Bytes.toBytes("qualifier_test1");
+
+  private static final byte[] FAM2 = Bytes.toBytes("family_test2");
+
+  private static final byte[] QUAL2 = Bytes.toBytes("qualifier_test2");
+
+  private static final byte[] FAM3 = Bytes.toBytes("family_test3");
+
+  private static final byte[] QUAL3 = Bytes.toBytes("qualifier_test3");
+
+  private static final byte[] FAM4 = Bytes.toBytes("family_test4");
+
+  private static final byte[] QUAL4 = Bytes.toBytes("qualifier_test4");
+
+  private static final byte[] FAM5 = Bytes.toBytes("family_test5");
+
+  private static final byte[] QUAL5 = Bytes.toBytes("qualifier_test5");
+
+  private static final byte[] FAM6 = Bytes.toBytes("family_test6");
+
+  private static final byte[] QUAL6 = Bytes.toBytes("qualifier_test6");
+
+  private static final HBaseTestingUtil HTU = new HBaseTestingUtil();
+  private static final int NB_SERVERS = 2;
+
+  private static final String strTableName = "TestRegionReplicationForSkipWAL";
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    Configuration conf = HTU.getConfiguration();
+    conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true);
+    conf.setBoolean(RegionReplicaUtil.REGION_REPLICA_WAIT_FOR_PRIMARY_FLUSH_CONF_KEY, false);
+    HTU.startMiniCluster(StartTestingClusterOption.builder().numRegionServers(NB_SERVERS).build());
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    HTU.shutdownMiniCluster();
+  }
+
+  /**
+   * This test is for HBASE-26933,make the new region replication framework introduced by
+   * HBASE-26233 work for table which DURABILITY is Durability.SKIP_WAL.
+   */
+  @Test
+  public void testReplicateToReplicaWhenSkipWAL() throws Exception {
+    final HRegion[] skipWALRegions = this.createTable(true);
+    byte[] rowKey1 = Bytes.toBytes(1);
+    byte[] value1 = Bytes.toBytes(2);
+
+    byte[] rowKey2 = Bytes.toBytes(2);
+    byte[] value2 = Bytes.toBytes(4);
+
+    // Test the table is skipWAL
+    skipWALRegions[0].batchMutate(new Mutation[] { new Put(rowKey1).addColumn(FAM1, QUAL1, value1),
+      new Put(rowKey2).addColumn(FAM2, QUAL2, value2) });
+
+    try (Table skipWALTable = HTU.getConnection().getTable(getTableName(true))) {
+      HTU.waitFor(30000, () -> checkReplica(skipWALTable, FAM1, QUAL1, rowKey1, value1)
+        && checkReplica(skipWALTable, FAM2, QUAL2, rowKey2, value2));
+    }
+
+    byte[] rowKey3 = Bytes.toBytes(3);
+    byte[] value3 = Bytes.toBytes(6);
+    byte[] rowKey4 = Bytes.toBytes(4);
+    byte[] value4 = Bytes.toBytes(8);
+    byte[] rowKey5 = Bytes.toBytes(5);
+    byte[] value5 = Bytes.toBytes(10);
+    byte[] rowKey6 = Bytes.toBytes(6);
+    byte[] value6 = Bytes.toBytes(12);
+
+    // Test the table is normal,but the Put is skipWAL
+    final HRegion[] normalRegions = this.createTable(false);
+    normalRegions[0].batchMutate(new Mutation[] { new Put(rowKey3).addColumn(FAM3, QUAL3, value3),
+      new Put(rowKey4).addColumn(FAM4, QUAL4, value4).setDurability(Durability.SKIP_WAL),
+      new Put(rowKey5).addColumn(FAM5, QUAL5, value5).setDurability(Durability.SKIP_WAL),
+      new Put(rowKey6).addColumn(FAM6, QUAL6, value6) });
+
+    try (Table normalTable = HTU.getConnection().getTable(getTableName(false))) {
+      HTU.waitFor(30000,
+        () -> checkReplica(normalTable, FAM3, QUAL3, rowKey3, value3)
+          && checkReplica(normalTable, FAM4, QUAL4, rowKey4, value4)
+          && checkReplica(normalTable, FAM5, QUAL5, rowKey5, value5)
+          && checkReplica(normalTable, FAM6, QUAL6, rowKey6, value6));
+    }
+  }
+
+  private static boolean checkReplica(Table table, byte[] fam, byte[] qual, byte[] rowKey,
+    byte[] expectValue) throws IOException {
+    Get get = new Get(rowKey).setConsistency(Consistency.TIMELINE).setReplicaId(1);
+    Result result = table.get(get);
+    byte[] value = result.getValue(fam, qual);
+    return value != null && value.length > 0 && Arrays.equals(expectValue, value);
+  }
+
+  private TableName getTableName(boolean skipWAL) {
+    return TableName.valueOf(strTableName + (skipWAL ? "_skipWAL" : ""));
+  }
+
+  private HRegion[] createTable(boolean skipWAL) throws Exception {
+    TableName tableName = getTableName(skipWAL);
+    TableDescriptorBuilder builder =
+      TableDescriptorBuilder.newBuilder(tableName).setRegionReplication(NB_SERVERS)
+        .setColumnFamilies(Arrays.asList(ColumnFamilyDescriptorBuilder.of(FAM1),
+          ColumnFamilyDescriptorBuilder.of(FAM2), ColumnFamilyDescriptorBuilder.of(FAM3),
+          ColumnFamilyDescriptorBuilder.of(FAM4), ColumnFamilyDescriptorBuilder.of(FAM5),
+          ColumnFamilyDescriptorBuilder.of(FAM6)));
+    if (skipWAL) {
+      builder.setDurability(Durability.SKIP_WAL);
+
+    }
+    TableDescriptor tableDescriptor = builder.build();
+
+    HTU.getAdmin().createTable(tableDescriptor);
+    final HRegion[] regions = new HRegion[NB_SERVERS];
+    for (int i = 0; i < NB_SERVERS; i++) {
+      HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i);
+      List<HRegion> onlineRegions = rs.getRegions(tableName);
+      for (HRegion region : onlineRegions) {
+        int replicaId = region.getRegionInfo().getReplicaId();
+        assertTrue(regions[replicaId] == null);
+        regions[region.getRegionInfo().getReplicaId()] = region;
+      }
+    }
+    for (Region region : regions) {
+      assertNotNull(region);
+    }
+    return regions;
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplication.java
index 0a07f06cc76..777232f68e9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplication.java
@@ -31,8 +31,10 @@ import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.Region;
@@ -96,16 +98,22 @@ public class TestRegionReplicaReplication {
     HTU.shutdownMiniCluster();
   }
 
-  private void testRegionReplicaReplication(int regionReplication) throws Exception {
+  private void testRegionReplicaReplication(int regionReplication, boolean skipWAL)
+    throws Exception {
     // test region replica replication. Create a table with single region, write some data
     // ensure that data is replicated to the secondary region
-    TableName tableName =
-      TableName.valueOf("testRegionReplicaReplicationWithReplicas_" + regionReplication);
-    TableDescriptor htd = HTU
-      .createModifyableTableDescriptor(TableName.valueOf(tableName.toString()),
-        ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
-        ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED)
-      .setRegionReplication(regionReplication).build();
+    TableName tableName = TableName.valueOf("testRegionReplicaReplicationWithReplicas_"
+      + regionReplication + (skipWAL ? "_skipWAL" : ""));
+    TableDescriptorBuilder builder =
+      HTU
+        .createModifyableTableDescriptor(TableName.valueOf(tableName.toString()),
+          ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
+          ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED)
+        .setRegionReplication(regionReplication);
+    if (skipWAL) {
+      builder.setDurability(Durability.SKIP_WAL);
+    }
+    TableDescriptor htd = builder.build();
     createOrEnableTableWithRetries(htd, true);
     TableName tableNameNoReplicas =
       TableName.valueOf("testRegionReplicaReplicationWithReplicas_NO_REPLICAS");
@@ -171,17 +179,20 @@ public class TestRegionReplicaReplication {
 
   @Test
   public void testRegionReplicaReplicationWith2Replicas() throws Exception {
-    testRegionReplicaReplication(2);
+    testRegionReplicaReplication(2, false);
+    testRegionReplicaReplication(2, true);
   }
 
   @Test
   public void testRegionReplicaReplicationWith3Replicas() throws Exception {
-    testRegionReplicaReplication(3);
+    testRegionReplicaReplication(3, false);
+    testRegionReplicaReplication(3, true);
   }
 
   @Test
   public void testRegionReplicaReplicationWith10Replicas() throws Exception {
-    testRegionReplicaReplication(10);
+    testRegionReplicaReplication(10, false);
+    testRegionReplicaReplication(10, true);
   }
 
   @Test