You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2022/06/03 08:39:23 UTC
[hbase] branch master updated: HBASE-26993 Make the new framework for region replication could work for SKIP_WAL (#4392)
This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push:
new d57159f31cb HBASE-26993 Make the new framework for region replication could work for SKIP_WAL (#4392)
d57159f31cb is described below
commit d57159f31cb7be4d9ced0d7b95e2c78c43d160a1
Author: chenglei <ch...@apache.org>
AuthorDate: Fri Jun 3 16:39:17 2022 +0800
HBASE-26993 Make the new framework for region replication could work for SKIP_WAL (#4392)
Signed-off-by: Duo Zhang <zh...@apache.org>
---
.../apache/hadoop/hbase/regionserver/HRegion.java | 258 +++++++++++++++++----
.../regionserver/MiniBatchOperationInProgress.java | 15 ++
.../java/org/apache/hadoop/hbase/wal/WALEdit.java | 24 ++
.../TestRegionReplicaReplicationError.java | 39 +++-
.../TestRegionReplicationForSkipWAL.java | 193 +++++++++++++++
.../regionserver/TestRegionReplicaReplication.java | 33 ++-
6 files changed, 502 insertions(+), 60 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index d14bb1a7e7b..f71a94ad4ef 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -2891,7 +2891,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* {@link FlushDescriptor} and attach the {@link RegionReplicationSink#add} to the
* flushOpSeqIdMVCCEntry,see HBASE-26960 for more details.
*/
- this.attachReplicateRegionReplicaToFlushOpSeqIdMVCCEntry(flushOpSeqIdMVCCEntry, desc, sink);
+ this.attachRegionReplicationToFlushOpSeqIdMVCCEntry(flushOpSeqIdMVCCEntry, desc, sink);
return false;
}
@@ -2912,7 +2912,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* Create {@link WALEdit} for {@link FlushDescriptor} and attach {@link RegionReplicationSink#add}
* to the flushOpSeqIdMVCCEntry.
*/
- private void attachReplicateRegionReplicaToFlushOpSeqIdMVCCEntry(WriteEntry flushOpSeqIdMVCCEntry,
+ private void attachRegionReplicationToFlushOpSeqIdMVCCEntry(WriteEntry flushOpSeqIdMVCCEntry,
FlushDescriptor desc, RegionReplicationSink sink) {
assert !flushOpSeqIdMVCCEntry.getCompletionAction().isPresent();
WALEdit flushMarkerWALEdit = WALEdit.createFlushWALEdit(getRegionInfo(), desc);
@@ -3372,8 +3372,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* Write mini-batch operations to MemStore
*/
public abstract WriteEntry writeMiniBatchOperationsToMemStore(
- final MiniBatchOperationInProgress<Mutation> miniBatchOp, final WriteEntry writeEntry)
- throws IOException;
+ final MiniBatchOperationInProgress<Mutation> miniBatchOp, final WriteEntry writeEntry,
+ long now) throws IOException;
protected void writeMiniBatchOperationsToMemStore(
final MiniBatchOperationInProgress<Mutation> miniBatchOp, final long writeNumber)
@@ -3592,6 +3592,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
walEditsFromCoprocessors, nextIndexToProcess, lastIndexExclusive, readyToWriteCount);
}
+ protected WALEdit createWALEdit(final MiniBatchOperationInProgress<Mutation> miniBatchOp) {
+ return new WALEdit(miniBatchOp.getCellCount(), isInReplay());
+ }
+
/**
* Builds separate WALEdit per nonce by applying input mutations. If WALEdits from CP are
* present, they are merged to result WALEdit.
@@ -3609,6 +3613,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// we use durability of the original mutation for the mutation passed by CP.
if (region.getEffectiveDurability(m.getDurability()) == Durability.SKIP_WAL) {
region.recordMutationWithoutWal(m.getFamilyCellMap());
+ /**
+ * Here is for HBASE-26993,in order to make the new framework for region replication
+ * could work for SKIP_WAL, we save the {@link Mutation} which
+ * {@link Mutation#getDurability} is {@link Durability#SKIP_WAL} in miniBatchOp.
+ */
+ cacheSkipWALMutationForRegionReplication(miniBatchOp, walEdits, familyCellMaps[index]);
return true;
}
@@ -3622,27 +3632,38 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|| curWALEditForNonce.getFirst().getNonceGroup() != nonceGroup
|| curWALEditForNonce.getFirst().getNonce() != nonce
) {
- curWALEditForNonce = new Pair<>(new NonceKey(nonceGroup, nonce),
- new WALEdit(miniBatchOp.getCellCount(), isInReplay()));
+ curWALEditForNonce =
+ new Pair<>(new NonceKey(nonceGroup, nonce), createWALEdit(miniBatchOp));
walEdits.add(curWALEditForNonce);
}
WALEdit walEdit = curWALEditForNonce.getSecond();
// Add WAL edits from CPs.
WALEdit fromCP = walEditsFromCoprocessors[index];
- if (fromCP != null) {
- for (Cell cell : fromCP.getCells()) {
- walEdit.add(cell);
- }
- }
- walEdit.add(familyCellMaps[index]);
-
+ List<Cell> cellsFromCP = fromCP == null ? Collections.emptyList() : fromCP.getCells();
+ addNonSkipWALMutationsToWALEdit(miniBatchOp, walEdit, cellsFromCP, familyCellMaps[index]);
return true;
}
});
return walEdits;
}
+ protected void addNonSkipWALMutationsToWALEdit(
+ final MiniBatchOperationInProgress<Mutation> miniBatchOp, WALEdit walEdit,
+ List<Cell> cellsFromCP, Map<byte[], List<Cell>> familyCellMap) {
+ doAddCellsToWALEdit(walEdit, cellsFromCP, familyCellMap);
+ }
+
+ protected static void doAddCellsToWALEdit(WALEdit walEdit, List<Cell> cellsFromCP,
+ Map<byte[], List<Cell>> familyCellMap) {
+ walEdit.add(cellsFromCP);
+ walEdit.add(familyCellMap);
+ }
+
+ protected abstract void cacheSkipWALMutationForRegionReplication(
+ final MiniBatchOperationInProgress<Mutation> miniBatchOp,
+ List<Pair<NonceKey, WALEdit>> walEdits, Map<byte[], List<Cell>> familyCellMap);
+
/**
* This method completes mini-batch operations by calling postBatchMutate() CP hook (if
* required) and completing mvcc.
@@ -3717,6 +3738,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
private long nonceGroup;
private long nonce;
protected boolean canProceed;
+ private boolean regionReplicateEnable;
public MutationBatchOperation(final HRegion region, Mutation[] operations, boolean atomic,
long nonceGroup, long nonce) {
@@ -3724,6 +3746,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
this.atomic = atomic;
this.nonceGroup = nonceGroup;
this.nonce = nonce;
+ this.regionReplicateEnable = region.regionReplicationSink.isPresent();
}
@Override
@@ -4140,17 +4163,115 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return walEdits;
}
+ /**
+ * Here is for HBASE-26993,in order to make the new framework for region replication could work
+ * for SKIP_WAL, we save the {@link Mutation} which {@link Mutation#getDurability} is
+ * {@link Durability#SKIP_WAL} in miniBatchOp.
+ */
+ @Override
+ protected void cacheSkipWALMutationForRegionReplication(
+ MiniBatchOperationInProgress<Mutation> miniBatchOp,
+ List<Pair<NonceKey, WALEdit>> nonceKeyAndWALEdits, Map<byte[], List<Cell>> familyCellMap) {
+ if (!this.regionReplicateEnable) {
+ return;
+ }
+
+ WALEdit walEditForReplicateIfExistsSkipWAL =
+ miniBatchOp.getWalEditForReplicateIfExistsSkipWAL();
+ /**
+ * When there is a SKIP_WAL {@link Mutation},we create a new {@link WALEdit} for replicating
+ * to region replica,first we fill the existing {@link WALEdit} to it and then add the
+ * {@link Mutation} which is SKIP_WAL to it.
+ */
+ if (walEditForReplicateIfExistsSkipWAL == null) {
+ walEditForReplicateIfExistsSkipWAL =
+ this.createWALEditForReplicateSkipWAL(miniBatchOp, nonceKeyAndWALEdits);
+ miniBatchOp.setWalEditForReplicateIfExistsSkipWAL(walEditForReplicateIfExistsSkipWAL);
+ }
+ walEditForReplicateIfExistsSkipWAL.add(familyCellMap);
+
+ }
+
+ private WALEdit createWALEditForReplicateSkipWAL(
+ MiniBatchOperationInProgress<Mutation> miniBatchOp,
+ List<Pair<NonceKey, WALEdit>> nonceKeyAndWALEdits) {
+ if (nonceKeyAndWALEdits.isEmpty()) {
+ return this.createWALEdit(miniBatchOp);
+ }
+ // for MutationBatchOperation, more than one nonce is not allowed
+ assert nonceKeyAndWALEdits.size() == 1;
+ WALEdit currentWALEdit = nonceKeyAndWALEdits.get(0).getSecond();
+ return new WALEdit(currentWALEdit);
+ }
+
+ @Override
+ protected void addNonSkipWALMutationsToWALEdit(
+ final MiniBatchOperationInProgress<Mutation> miniBatchOp, WALEdit walEdit,
+ List<Cell> cellsFromCP, Map<byte[], List<Cell>> familyCellMap) {
+
+ super.addNonSkipWALMutationsToWALEdit(miniBatchOp, walEdit, cellsFromCP, familyCellMap);
+ WALEdit walEditForReplicateIfExistsSkipWAL =
+ miniBatchOp.getWalEditForReplicateIfExistsSkipWAL();
+ if (walEditForReplicateIfExistsSkipWAL == null) {
+ return;
+ }
+ /**
+ * When walEditForReplicateIfExistsSkipWAL is not null,it means there exists SKIP_WAL
+ * {@link Mutation} and we create a new {@link WALEdit} in
+ * {@link MutationBatchOperation#cacheSkipWALMutationForReplicateRegionReplica} for
+ * replicating to region replica, so here we also add non SKIP_WAL{@link Mutation}s to
+ * walEditForReplicateIfExistsSkipWAL.
+ */
+ doAddCellsToWALEdit(walEditForReplicateIfExistsSkipWAL, cellsFromCP, familyCellMap);
+ }
+
@Override
public WriteEntry writeMiniBatchOperationsToMemStore(
- final MiniBatchOperationInProgress<Mutation> miniBatchOp, @Nullable WriteEntry writeEntry)
- throws IOException {
+ final MiniBatchOperationInProgress<Mutation> miniBatchOp, @Nullable WriteEntry writeEntry,
+ long now) throws IOException {
+ boolean newWriteEntry = false;
if (writeEntry == null) {
writeEntry = region.mvcc.begin();
+ newWriteEntry = true;
}
super.writeMiniBatchOperationsToMemStore(miniBatchOp, writeEntry.getWriteNumber());
+ if (newWriteEntry) {
+ /**
+ * Here is for HBASE-26993 case 2,all {@link Mutation}s are {@link Durability#SKIP_WAL}. In
+ * order to make the new framework for region replication could work for SKIP_WAL,because
+ * there is no {@link RegionReplicationSink#add} attached in {@link HRegion#doWALAppend},so
+ * here we get {@link WALEdit} from
+ * {@link MiniBatchOperationInProgress#getWalEditForReplicateIfExistsSkipWAL} and attach
+ * {@link RegionReplicationSink#add} to the new mvcc writeEntry.
+ */
+ attachRegionReplicationToMVCCEntry(miniBatchOp, writeEntry, now);
+ }
return writeEntry;
}
+ private WALKeyImpl createWALKey(long now) {
+ // for MutationBatchOperation,isReplay is false.
+ return this.region.createWALKeyForWALAppend(false, this, now, this.nonceGroup, this.nonce);
+ }
+
+ /**
+ * Create {@link WALKeyImpl} and get {@link WALEdit} from miniBatchOp and attach
+ * {@link RegionReplicationSink#add} to the mvccWriteEntry.
+ */
+ private void attachRegionReplicationToMVCCEntry(
+ final MiniBatchOperationInProgress<Mutation> miniBatchOp, WriteEntry mvccWriteEntry, long now)
+ throws IOException {
+ if (!this.regionReplicateEnable) {
+ return;
+ }
+ assert !mvccWriteEntry.getCompletionAction().isPresent();
+
+ final WALKeyImpl walKey = this.createWALKey(now);
+ walKey.setWriteEntry(mvccWriteEntry);
+ region.doAttachReplicateRegionReplicaAction(walKey,
+ miniBatchOp.getWalEditForReplicateIfExistsSkipWAL(), mvccWriteEntry);
+ }
+
@Override
public void completeMiniBatchOperations(
final MiniBatchOperationInProgress<Mutation> miniBatchOp, final WriteEntry writeEntry)
@@ -4466,8 +4587,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
@Override
public WriteEntry writeMiniBatchOperationsToMemStore(
- final MiniBatchOperationInProgress<Mutation> miniBatchOp, final WriteEntry writeEntry)
- throws IOException {
+ final MiniBatchOperationInProgress<Mutation> miniBatchOp, final WriteEntry writeEntry,
+ long now) throws IOException {
super.writeMiniBatchOperationsToMemStore(miniBatchOp, getOrigLogSeqNum());
return writeEntry;
}
@@ -4479,6 +4600,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
super.completeMiniBatchOperations(miniBatchOp, writeEntry);
region.mvcc.advanceTo(getOrigLogSeqNum());
}
+
+ @Override
+ protected void cacheSkipWALMutationForRegionReplication(
+ MiniBatchOperationInProgress<Mutation> miniBatchOp, List<Pair<NonceKey, WALEdit>> walEdits,
+ Map<byte[], List<Cell>> familyCellMap) {
+ // There is no action to do if current region is secondary replica
+ }
+
}
public OperationStatus[] batchMutate(Mutation[] mutations, boolean atomic, long nonceGroup,
@@ -4647,8 +4776,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
NonceKey nonceKey = nonceKeyWALEditPair.getFirst();
if (walEdit != null && !walEdit.isEmpty()) {
- writeEntry = doWALAppend(walEdit, batchOp.durability, batchOp.getClusterIds(), now,
- nonceKey.getNonceGroup(), nonceKey.getNonce(), batchOp.getOrigLogSeqNum());
+ writeEntry = doWALAppend(walEdit, batchOp, miniBatchOp, now, nonceKey);
}
// Complete mvcc for all but last writeEntry (for replay case)
@@ -4660,7 +4788,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// STEP 5. Write back to memStore
// NOTE: writeEntry can be null here
- writeEntry = batchOp.writeMiniBatchOperationsToMemStore(miniBatchOp, writeEntry);
+ writeEntry = batchOp.writeMiniBatchOperationsToMemStore(miniBatchOp, writeEntry, now);
// STEP 6. Complete MiniBatchOperations: If required calls postBatchMutate() CP hook and
// complete mvcc for last writeEntry
@@ -7903,42 +8031,46 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}, () -> createRegionSpan("Region.increment"));
}
+ private WALKeyImpl createWALKeyForWALAppend(boolean isReplay, BatchOperation<?> batchOp, long now,
+ long nonceGroup, long nonce) {
+ WALKeyImpl walKey = isReplay
+ ? new WALKeyImpl(this.getRegionInfo().getEncodedNameAsBytes(),
+ this.htableDescriptor.getTableName(), SequenceId.NO_SEQUENCE_ID, now,
+ batchOp.getClusterIds(), nonceGroup, nonce, mvcc)
+ : new WALKeyImpl(this.getRegionInfo().getEncodedNameAsBytes(),
+ this.htableDescriptor.getTableName(), SequenceId.NO_SEQUENCE_ID, now,
+ batchOp.getClusterIds(), nonceGroup, nonce, mvcc, this.getReplicationScope());
+ if (isReplay) {
+ walKey.setOrigLogSeqNum(batchOp.getOrigLogSeqNum());
+ }
+ return walKey;
+ }
+
/**
* @return writeEntry associated with this append
*/
- private WriteEntry doWALAppend(WALEdit walEdit, Durability durability, List<UUID> clusterIds,
- long now, long nonceGroup, long nonce, long origLogSeqNum) throws IOException {
+ private WriteEntry doWALAppend(WALEdit walEdit, BatchOperation<?> batchOp,
+ MiniBatchOperationInProgress<Mutation> miniBatchOp, long now, NonceKey nonceKey)
+ throws IOException {
Preconditions.checkArgument(walEdit != null && !walEdit.isEmpty(), "WALEdit is null or empty!");
- Preconditions.checkArgument(!walEdit.isReplay() || origLogSeqNum != SequenceId.NO_SEQUENCE_ID,
+ Preconditions.checkArgument(
+ !walEdit.isReplay() || batchOp.getOrigLogSeqNum() != SequenceId.NO_SEQUENCE_ID,
"Invalid replay sequence Id for replay WALEdit!");
- // Using default cluster id, as this can only happen in the originating cluster.
- // A slave cluster receives the final value (not the delta) as a Put. We use HLogKey
- // here instead of WALKeyImpl directly to support legacy coprocessors.
- WALKeyImpl walKey = walEdit.isReplay()
- ? new WALKeyImpl(this.getRegionInfo().getEncodedNameAsBytes(),
- this.htableDescriptor.getTableName(), SequenceId.NO_SEQUENCE_ID, now, clusterIds,
- nonceGroup, nonce, mvcc)
- : new WALKeyImpl(this.getRegionInfo().getEncodedNameAsBytes(),
- this.htableDescriptor.getTableName(), SequenceId.NO_SEQUENCE_ID, now, clusterIds,
- nonceGroup, nonce, mvcc, this.getReplicationScope());
- if (walEdit.isReplay()) {
- walKey.setOrigLogSeqNum(origLogSeqNum);
- }
+
+ WALKeyImpl walKey = createWALKeyForWALAppend(walEdit.isReplay(), batchOp, now,
+ nonceKey.getNonceGroup(), nonceKey.getNonce());
// don't call the coproc hook for writes to the WAL caused by
// system lifecycle events like flushes or compactions
if (this.coprocessorHost != null && !walEdit.isMetaEdit()) {
this.coprocessorHost.preWALAppend(walKey, walEdit);
}
- ServerCall<?> rpcCall = RpcServer.getCurrentServerCallWithCellScanner().orElse(null);
try {
long txid = this.wal.appendData(this.getRegionInfo(), walKey, walEdit);
WriteEntry writeEntry = walKey.getWriteEntry();
- regionReplicationSink.ifPresent(sink -> writeEntry.attachCompletionAction(() -> {
- sink.add(walKey, walEdit, rpcCall);
- }));
+ this.attachRegionReplicationInWALAppend(batchOp, miniBatchOp, walKey, walEdit, writeEntry);
// Call sync on our edit.
if (txid != 0) {
- sync(txid, durability);
+ sync(txid, batchOp.durability);
}
return writeEntry;
} catch (IOException ioe) {
@@ -7947,7 +8079,51 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
throw ioe;
}
+ }
+ /**
+ * Attach {@link RegionReplicationSink#add} to the mvcc writeEntry for replicating to region
+ * replica.
+ */
+ private void attachRegionReplicationInWALAppend(BatchOperation<?> batchOp,
+ MiniBatchOperationInProgress<Mutation> miniBatchOp, WALKeyImpl walKey, WALEdit walEdit,
+ WriteEntry writeEntry) throws IOException {
+ if (!regionReplicationSink.isPresent()) {
+ return;
+ }
+ /**
+ * If {@link HRegion#regionReplicationSink} is present,only {@link MutationBatchOperation} is
+ * used and {@link NonceKey} is all the same for {@link Mutation}s in
+ * {@link MutationBatchOperation},so for HBASE-26993 case 1,if
+ * {@link MiniBatchOperationInProgress#getWalEditForReplicateSkipWAL} is not null and we could
+ * enter {@link HRegion#doWALAppend},that means partial {@link Mutation}s are
+ * {@link Durability#SKIP_WAL}, we use
+ * {@link MiniBatchOperationInProgress#getWalEditForReplicateSkipWAL} to replicate to region
+ * replica,but if {@link MiniBatchOperationInProgress#getWalEditForReplicateSkipWAL} is
+ * null,that means there is no {@link Mutation} is {@link Durability#SKIP_WAL},so we just use
+ * walEdit to replicate.
+ */
+ assert batchOp instanceof MutationBatchOperation;
+ WALEdit walEditToUse = miniBatchOp.getWalEditForReplicateIfExistsSkipWAL();
+ if (walEditToUse == null) {
+ walEditToUse = walEdit;
+ }
+ doAttachReplicateRegionReplicaAction(walKey, walEditToUse, writeEntry);
+ }
+
+ /**
+ * Attach {@link RegionReplicationSink#add} to the mvcc writeEntry for replicating to region
+ * replica.
+ */
+ private void doAttachReplicateRegionReplicaAction(WALKeyImpl walKey, WALEdit walEdit,
+ WriteEntry writeEntry) throws IOException {
+ if (walEdit == null || walEdit.isEmpty()) {
+ return;
+ }
+ final ServerCall<?> rpcCall = RpcServer.getCurrentServerCallWithCellScanner().orElse(null);
+ regionReplicationSink.ifPresent(sink -> writeEntry.attachCompletionAction(() -> {
+ sink.add(walKey, walEdit, rpcCall);
+ }));
}
public static final long FIXED_OVERHEAD = ClassSize.estimateBase(HRegion.class, false);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java
index 0fa18cddb6f..69fb426f394 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java
@@ -48,6 +48,13 @@ public class MiniBatchOperationInProgress<T> {
private int numOfDeletes = 0;
private int numOfIncrements = 0;
private int numOfAppends = 0;
+ /**
+ * Here is for HBASE-26993,saving the all the {@link Mutation}s if there is
+ * {@link Durability#SKIP_WAL} in {@link HRegion.BatchOperation#buildWALEdits} for
+ * {@link HRegion#doMiniBatchMutate} to also replicate {@link Mutation} which is
+ * {@link Durability#SKIP_WAL} to region replica.
+ */
+ private WALEdit walEditForReplicateIfExistsSkipWAL = null;
public MiniBatchOperationInProgress(T[] operations, OperationStatus[] retCodeDetails,
WALEdit[] walEditsFromCoprocessors, int firstIndex, int lastIndexExclusive,
@@ -182,4 +189,12 @@ public class MiniBatchOperationInProgress<T> {
public void incrementNumOfAppends() {
this.numOfAppends += 1;
}
+
+ public WALEdit getWalEditForReplicateIfExistsSkipWAL() {
+ return walEditForReplicateIfExistsSkipWAL;
+ }
+
+ public void setWalEditForReplicateIfExistsSkipWAL(WALEdit walEditForReplicateSkipWAL) {
+ this.walEditForReplicateIfExistsSkipWAL = walEditForReplicateSkipWAL;
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALEdit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALEdit.java
index 6794c2d5bd2..c688f6b1de5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALEdit.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALEdit.java
@@ -177,6 +177,19 @@ public class WALEdit implements HeapSize {
cells = new ArrayList<>(cellCount);
}
+ /**
+ * Create a new WALEdit from a existing {@link WALEdit}.
+ */
+ public WALEdit(WALEdit walEdit) {
+ this.replay = walEdit.replay;
+ cells = new ArrayList<>(walEdit.cells);
+ if (walEdit.families != null) {
+ this.families = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+ this.families.addAll(walEdit.families);
+ }
+
+ }
+
private Set<byte[]> getOrCreateFamilies() {
if (this.families == null) {
this.families = new TreeSet<>(Bytes.BYTES_COMPARATOR);
@@ -237,6 +250,17 @@ public class WALEdit implements HeapSize {
return add(cell, CellUtil.cloneFamily(cell));
}
+ @InterfaceAudience.Private
+ public WALEdit add(List<Cell> cells) {
+ if (cells == null || cells.isEmpty()) {
+ return this;
+ }
+ for (Cell cell : cells) {
+ add(cell);
+ }
+ return this;
+ }
+
public boolean isEmpty() {
return cells.isEmpty();
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicaReplicationError.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicaReplicationError.java
index e7022cfaa1c..e0d7895e083 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicaReplicationError.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicaReplicationError.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
@@ -28,6 +29,7 @@ import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
import org.apache.hadoop.hbase.StartTestingClusterOption;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
@@ -37,6 +39,7 @@ import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.testclassification.FlakeyTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ConcurrentMapUtils;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -69,7 +72,8 @@ public class TestRegionReplicaReplicationError {
public static final class ErrorReplayRSRpcServices extends RSRpcServices {
- private final AtomicInteger count = new AtomicInteger(0);
+ private final ConcurrentHashMap<HRegion, AtomicInteger> regionToCounter =
+ new ConcurrentHashMap<HRegion, AtomicInteger>();
public ErrorReplayRSRpcServices(HRegionServer rs) throws IOException {
super(rs);
@@ -89,8 +93,12 @@ public class TestRegionReplicaReplicationError {
} catch (NotServingRegionException e) {
throw new ServiceException(e);
}
+
+ AtomicInteger counter =
+ ConcurrentMapUtils.computeIfAbsent(regionToCounter, region, () -> new AtomicInteger(0));
+
// fail the first several request
- if (region.getRegionInfo().getReplicaId() == 1 && count.addAndGet(entries.size()) < 100) {
+ if (region.getRegionInfo().getReplicaId() == 1 && counter.addAndGet(entries.size()) < 100) {
throw new ServiceException("Inject error!");
}
return super.replicateToReplica(controller, request);
@@ -112,7 +120,7 @@ public class TestRegionReplicaReplicationError {
private static final HBaseTestingUtil HTU = new HBaseTestingUtil();
- private static TableName TN = TableName.valueOf("test");
+ private static String TN = "test";
private static byte[] CF = Bytes.toBytes("cf");
@@ -124,9 +132,6 @@ public class TestRegionReplicaReplicationError {
true);
HTU.startMiniCluster(
StartTestingClusterOption.builder().rsClass(RSForTest.class).numRegionServers(3).build());
- TableDescriptor td = TableDescriptorBuilder.newBuilder(TN).setRegionReplication(3)
- .setColumnFamily(ColumnFamilyDescriptorBuilder.of(CF)).build();
- HTU.getAdmin().createTable(td);
}
@AfterClass
@@ -145,8 +150,26 @@ public class TestRegionReplicaReplicationError {
}
@Test
- public void test() throws IOException {
- try (Table table = HTU.getConnection().getTable(TN)) {
+ public void testDefaultDurability() throws IOException {
+ doTest(false);
+ }
+
+ @Test
+ public void testSkipWAL() throws IOException {
+ doTest(true);
+ }
+
+ private void doTest(boolean skipWAL) throws IOException {
+ TableName tableName = TableName.valueOf(TN + (skipWAL ? "_skipWAL" : ""));
+ TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName)
+ .setRegionReplication(3).setColumnFamily(ColumnFamilyDescriptorBuilder.of(CF));
+ if (skipWAL) {
+ builder.setDurability(Durability.SKIP_WAL);
+ }
+ TableDescriptor td = builder.build();
+ HTU.getAdmin().createTable(td);
+
+ try (Table table = HTU.getConnection().getTable(tableName)) {
for (int i = 0; i < 500; i++) {
table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationForSkipWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationForSkipWAL.java
new file mode 100644
index 00000000000..e8880267118
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationForSkipWAL.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.regionreplication;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.StartTestingClusterOption;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Consistency;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionReplicaUtil;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ RegionServerTests.class, LargeTests.class })
+public class TestRegionReplicationForSkipWAL {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestRegionReplicationForSkipWAL.class);
+
+ private static final byte[] FAM1 = Bytes.toBytes("family_test1");
+
+ private static final byte[] QUAL1 = Bytes.toBytes("qualifier_test1");
+
+ private static final byte[] FAM2 = Bytes.toBytes("family_test2");
+
+ private static final byte[] QUAL2 = Bytes.toBytes("qualifier_test2");
+
+ private static final byte[] FAM3 = Bytes.toBytes("family_test3");
+
+ private static final byte[] QUAL3 = Bytes.toBytes("qualifier_test3");
+
+ private static final byte[] FAM4 = Bytes.toBytes("family_test4");
+
+ private static final byte[] QUAL4 = Bytes.toBytes("qualifier_test4");
+
+ private static final byte[] FAM5 = Bytes.toBytes("family_test5");
+
+ private static final byte[] QUAL5 = Bytes.toBytes("qualifier_test5");
+
+ private static final byte[] FAM6 = Bytes.toBytes("family_test6");
+
+ private static final byte[] QUAL6 = Bytes.toBytes("qualifier_test6");
+
+ private static final HBaseTestingUtil HTU = new HBaseTestingUtil();
+ private static final int NB_SERVERS = 2;
+
+ private static final String strTableName = "TestRegionReplicationForSkipWAL";
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ Configuration conf = HTU.getConfiguration();
+ conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true);
+ conf.setBoolean(RegionReplicaUtil.REGION_REPLICA_WAIT_FOR_PRIMARY_FLUSH_CONF_KEY, false);
+ HTU.startMiniCluster(StartTestingClusterOption.builder().numRegionServers(NB_SERVERS).build());
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ HTU.shutdownMiniCluster();
+ }
+
+ /**
+ * This test is for HBASE-26933,make the new region replication framework introduced by
+ * HBASE-26233 work for table which DURABILITY is Durability.SKIP_WAL.
+ */
+ @Test
+ public void testReplicateToReplicaWhenSkipWAL() throws Exception {
+ final HRegion[] skipWALRegions = this.createTable(true);
+ byte[] rowKey1 = Bytes.toBytes(1);
+ byte[] value1 = Bytes.toBytes(2);
+
+ byte[] rowKey2 = Bytes.toBytes(2);
+ byte[] value2 = Bytes.toBytes(4);
+
+ // Test the table is skipWAL
+ skipWALRegions[0].batchMutate(new Mutation[] { new Put(rowKey1).addColumn(FAM1, QUAL1, value1),
+ new Put(rowKey2).addColumn(FAM2, QUAL2, value2) });
+
+ try (Table skipWALTable = HTU.getConnection().getTable(getTableName(true))) {
+ HTU.waitFor(30000, () -> checkReplica(skipWALTable, FAM1, QUAL1, rowKey1, value1)
+ && checkReplica(skipWALTable, FAM2, QUAL2, rowKey2, value2));
+ }
+
+ byte[] rowKey3 = Bytes.toBytes(3);
+ byte[] value3 = Bytes.toBytes(6);
+ byte[] rowKey4 = Bytes.toBytes(4);
+ byte[] value4 = Bytes.toBytes(8);
+ byte[] rowKey5 = Bytes.toBytes(5);
+ byte[] value5 = Bytes.toBytes(10);
+ byte[] rowKey6 = Bytes.toBytes(6);
+ byte[] value6 = Bytes.toBytes(12);
+
+ // Test the table is normal,but the Put is skipWAL
+ final HRegion[] normalRegions = this.createTable(false);
+ normalRegions[0].batchMutate(new Mutation[] { new Put(rowKey3).addColumn(FAM3, QUAL3, value3),
+ new Put(rowKey4).addColumn(FAM4, QUAL4, value4).setDurability(Durability.SKIP_WAL),
+ new Put(rowKey5).addColumn(FAM5, QUAL5, value5).setDurability(Durability.SKIP_WAL),
+ new Put(rowKey6).addColumn(FAM6, QUAL6, value6) });
+
+ try (Table normalTable = HTU.getConnection().getTable(getTableName(false))) {
+ HTU.waitFor(30000,
+ () -> checkReplica(normalTable, FAM3, QUAL3, rowKey3, value3)
+ && checkReplica(normalTable, FAM4, QUAL4, rowKey4, value4)
+ && checkReplica(normalTable, FAM5, QUAL5, rowKey5, value5)
+ && checkReplica(normalTable, FAM6, QUAL6, rowKey6, value6));
+ }
+ }
+
+ private static boolean checkReplica(Table table, byte[] fam, byte[] qual, byte[] rowKey,
+ byte[] expectValue) throws IOException {
+ Get get = new Get(rowKey).setConsistency(Consistency.TIMELINE).setReplicaId(1);
+ Result result = table.get(get);
+ byte[] value = result.getValue(fam, qual);
+ return value != null && value.length > 0 && Arrays.equals(expectValue, value);
+ }
+
+ private TableName getTableName(boolean skipWAL) {
+ return TableName.valueOf(strTableName + (skipWAL ? "_skipWAL" : ""));
+ }
+
+ private HRegion[] createTable(boolean skipWAL) throws Exception {
+ TableName tableName = getTableName(skipWAL);
+ TableDescriptorBuilder builder =
+ TableDescriptorBuilder.newBuilder(tableName).setRegionReplication(NB_SERVERS)
+ .setColumnFamilies(Arrays.asList(ColumnFamilyDescriptorBuilder.of(FAM1),
+ ColumnFamilyDescriptorBuilder.of(FAM2), ColumnFamilyDescriptorBuilder.of(FAM3),
+ ColumnFamilyDescriptorBuilder.of(FAM4), ColumnFamilyDescriptorBuilder.of(FAM5),
+ ColumnFamilyDescriptorBuilder.of(FAM6)));
+ if (skipWAL) {
+ builder.setDurability(Durability.SKIP_WAL);
+
+ }
+ TableDescriptor tableDescriptor = builder.build();
+
+ HTU.getAdmin().createTable(tableDescriptor);
+ final HRegion[] regions = new HRegion[NB_SERVERS];
+ for (int i = 0; i < NB_SERVERS; i++) {
+ HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i);
+ List<HRegion> onlineRegions = rs.getRegions(tableName);
+ for (HRegion region : onlineRegions) {
+ int replicaId = region.getRegionInfo().getReplicaId();
+ assertTrue(regions[replicaId] == null);
+ regions[region.getRegionInfo().getReplicaId()] = region;
+ }
+ }
+ for (Region region : regions) {
+ assertNotNull(region);
+ }
+ return regions;
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplication.java
index 0a07f06cc76..777232f68e9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplication.java
@@ -31,8 +31,10 @@ import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.Region;
@@ -96,16 +98,22 @@ public class TestRegionReplicaReplication {
HTU.shutdownMiniCluster();
}
- private void testRegionReplicaReplication(int regionReplication) throws Exception {
+ private void testRegionReplicaReplication(int regionReplication, boolean skipWAL)
+ throws Exception {
// test region replica replication. Create a table with single region, write some data
// ensure that data is replicated to the secondary region
- TableName tableName =
- TableName.valueOf("testRegionReplicaReplicationWithReplicas_" + regionReplication);
- TableDescriptor htd = HTU
- .createModifyableTableDescriptor(TableName.valueOf(tableName.toString()),
- ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
- ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED)
- .setRegionReplication(regionReplication).build();
+ TableName tableName = TableName.valueOf("testRegionReplicaReplicationWithReplicas_"
+ + regionReplication + (skipWAL ? "_skipWAL" : ""));
+ TableDescriptorBuilder builder =
+ HTU
+ .createModifyableTableDescriptor(TableName.valueOf(tableName.toString()),
+ ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
+ ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED)
+ .setRegionReplication(regionReplication);
+ if (skipWAL) {
+ builder.setDurability(Durability.SKIP_WAL);
+ }
+ TableDescriptor htd = builder.build();
createOrEnableTableWithRetries(htd, true);
TableName tableNameNoReplicas =
TableName.valueOf("testRegionReplicaReplicationWithReplicas_NO_REPLICAS");
@@ -171,17 +179,20 @@ public class TestRegionReplicaReplication {
@Test
public void testRegionReplicaReplicationWith2Replicas() throws Exception {
- testRegionReplicaReplication(2);
+ testRegionReplicaReplication(2, false);
+ testRegionReplicaReplication(2, true);
}
@Test
public void testRegionReplicaReplicationWith3Replicas() throws Exception {
- testRegionReplicaReplication(3);
+ testRegionReplicaReplication(3, false);
+ testRegionReplicaReplication(3, true);
}
@Test
public void testRegionReplicaReplicationWith10Replicas() throws Exception {
- testRegionReplicaReplication(10);
+ testRegionReplicaReplication(10, false);
+ testRegionReplicaReplication(10, true);
}
@Test