You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2021/11/22 13:50:35 UTC
[hbase] branch HBASE-26233 updated: HBASE-26416 Implement a new method for region replication instead of using replay (#3864)
This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch HBASE-26233
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/HBASE-26233 by this push:
new 5848844 HBASE-26416 Implement a new method for region replication instead of using replay (#3864)
5848844 is described below
commit 584884486bffadf139949519e125b48e5918f92a
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Mon Nov 22 21:50:02 2021 +0800
HBASE-26416 Implement a new method for region replication instead of using replay (#3864)
Signed-off-by: Xiaolin Ha <ha...@apache.org>
---
.../java/org/apache/hadoop/hbase/HConstants.java | 6 +
.../src/main/protobuf/server/region/Admin.proto | 5 +
.../AsyncRegionReplicationRetryingCaller.java | 2 +-
.../hadoop/hbase/master/MasterRpcServices.java | 6 +
.../apache/hadoop/hbase/regionserver/HRegion.java | 328 ++++++++++++++---
.../hadoop/hbase/regionserver/RSRpcServices.java | 67 +++-
.../RegionReplicationFlushRequester.java | 1 +
.../regionreplication/RegionReplicationSink.java | 87 ++---
.../org/apache/hadoop/hbase/wal/WALSplitUtil.java | 2 +
.../hadoop/hbase/master/MockRegionServer.java | 6 +
.../regionserver/TestHRegionReplayEvents.java | 2 +-
.../hbase/regionserver/TestReplicateToReplica.java | 388 +++++++++++++++++++++
.../TestRegionReplicationSink.java | 91 +----
.../TestMetaRegionReplicaReplication.java | 1 -
.../regionserver/TestRegionReplicaReplication.java | 1 -
15 files changed, 795 insertions(+), 198 deletions(-)
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index 6464158..14701f5 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -1192,7 +1192,13 @@ public final class HConstants {
public static final int PRIORITY_UNSET = -1;
public static final int NORMAL_QOS = 0;
public static final int REPLICATION_QOS = 5;
+ /**
+ * @deprecated since 3.0.0, will be removed in 4.0.0. DLR has been purged for a long time and
+ * region replication has its own 'replay' method.
+ */
+ @Deprecated
public static final int REPLAY_QOS = 6;
+ public static final int REGION_REPLICATION_QOS = REPLAY_QOS;
public static final int QOS_THRESHOLD = 10;
public static final int ADMIN_QOS = 100;
public static final int HIGH_QOS = 200;
diff --git a/hbase-protocol-shaded/src/main/protobuf/server/region/Admin.proto b/hbase-protocol-shaded/src/main/protobuf/server/region/Admin.proto
index 0667292..89b9985 100644
--- a/hbase-protocol-shaded/src/main/protobuf/server/region/Admin.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/server/region/Admin.proto
@@ -360,6 +360,11 @@ service AdminService {
returns(ReplicateWALEntryResponse);
rpc Replay(ReplicateWALEntryRequest)
+ returns(ReplicateWALEntryResponse) {
+ option deprecated = true;
+ };
+
+ rpc ReplicateToReplica(ReplicateWALEntryRequest)
returns(ReplicateWALEntryResponse);
rpc RollWALWriter(RollWALWriterRequest)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicationRetryingCaller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicationRetryingCaller.java
index a0ce418..c854ba3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicationRetryingCaller.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicationRetryingCaller.java
@@ -67,7 +67,7 @@ public class AsyncRegionReplicationRetryingCaller extends AsyncRpcRetryingCaller
.buildReplicateWALEntryRequest(entries, replica.getEncodedNameAsBytes(), null, null, null);
resetCallTimeout();
controller.setCellScanner(pair.getSecond());
- stub.replay(controller, pair.getFirst(), r -> {
+ stub.replicateToReplica(controller, pair.getFirst(), r -> {
if (controller.failed()) {
onError(controller.getFailed(),
() -> "Call to " + loc.getServerName() + " for " + replica + " failed",
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
index 46bc8c2..d231796 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
@@ -3492,4 +3492,10 @@ public class MasterRpcServices extends HBaseRpcServicesBase<HMaster>
.forEach(builder::addServer);
return builder.build();
}
+
+ @Override
+ public ReplicateWALEntryResponse replicateToReplica(RpcController controller,
+ ReplicateWALEntryRequest request) throws ServiceException {
+ throw new ServiceException(new DoNotRetryIOException("Unsupported method on master"));
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 92b8734..eaee74b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -65,6 +65,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -197,6 +198,7 @@ import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad;
@@ -355,7 +357,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
private Path regionDir;
private FileSystem walFS;
- // set to true if the region is restored from snapshot
+ // set to true if the region is restored from snapshot for reading by ClientSideRegionScanner
private boolean isRestoredRegion = false;
public void setRestoredRegion(boolean restoredRegion) {
@@ -411,9 +413,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// The following map is populated when opening the region
Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+ // lock used to protect the replay operation for secondary replicas, so the below two fields does
+ // not need to be volatile.
+ private Lock replayLock;
+
/** Saved state from replaying prepare flush cache */
private PrepareFlushResult prepareFlushResult = null;
+ private long lastReplayedSequenceId = HConstants.NO_SEQNUM;
+
private volatile ConfigurationManager configurationManager;
// Used for testing.
@@ -1072,7 +1080,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
LOG.debug("Failed to clean up wrong region WAL directory {}", wrongRegionWALDir);
}
}
+ } else {
+ lastReplayedSequenceId = nextSeqId - 1;
+ replayLock = new ReentrantLock();
}
+ initializeRegionReplicationSink(reporter, status);
}
LOG.info("Opened {}; next sequenceid={}; {}, {}", this.getRegionInfo().getShortNameToLog(),
@@ -1087,7 +1099,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
status.setStatus("Running coprocessor post-open hooks");
coprocessorHost.postOpen();
}
- initializeRegionReplicationSink(reporter, status);
status.markComplete("Region opened successfully");
return nextSeqId;
}
@@ -1241,6 +1252,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
RegionEventDescriptor regionEventDesc = ProtobufUtil.toRegionEventDescriptor(
RegionEventDescriptor.EventType.REGION_CLOSE, getRegionInfo(), mvcc.getReadPoint(),
getRegionServerServices().getServerName(), storeFiles);
+ // we do not care region close event at secondary replica side so just pass a null
+ // RegionReplicationSink
WALUtil.writeRegionEventMarker(wal, getReplicationScope(), getRegionInfo(), regionEventDesc,
mvcc, null);
@@ -1683,7 +1696,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
status.setStatus("Failed pre-flush " + this + "; " + ioe.getMessage());
}
}
-
+ if (regionReplicationSink.isPresent()) {
+ // stop replicating to secondary replicas
+ // the open event marker can make secondary replicas refresh store files and catch up
+ // everything, so here we just give up replicating later edits, to speed up the reopen process
+ RegionReplicationSink sink = regionReplicationSink.get();
+ sink.stop();
+ try {
+ regionReplicationSink.get().waitUntilStopped();
+ } catch (InterruptedException e) {
+ throw throwOnInterrupt(e);
+ }
+ }
// Set the closing flag
// From this point new arrivals at the region lock will get NSRE.
@@ -1887,16 +1911,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
RegionReplicaUtil.isDefaultReplica(getRegionInfo())) {
writeRegionCloseMarker(wal);
}
- if (regionReplicationSink.isPresent()) {
- // stop replicating to secondary replicas
- RegionReplicationSink sink = regionReplicationSink.get();
- sink.stop();
- try {
- regionReplicationSink.get().waitUntilStopped();
- } catch (InterruptedException e) {
- throw throwOnInterrupt(e);
- }
- }
this.closed.set(true);
if (!canFlush) {
decrMemStoreSize(this.memStoreSizing.getMemStoreSize());
@@ -2857,7 +2871,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
getRegionInfo(), flushOpSeqId, committedFiles);
// No sync. Sync is below where no updates lock and we do FlushAction.COMMIT_FLUSH
WALUtil.writeFlushMarker(wal, this.getReplicationScope(), getRegionInfo(), desc, false,
- mvcc, null);
+ mvcc, regionReplicationSink.orElse(null));
}
// Prepare flush (take a snapshot)
@@ -2955,7 +2969,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
/**
* Writes a marker to WAL indicating a flush is requested but cannot be complete due to various
* reasons. Ignores exceptions from WAL. Returns whether the write succeeded.
- * @param wal
* @return whether WAL write was successful
*/
private boolean writeFlushRequestMarkerToWAL(WAL wal, boolean writeFlushWalMarker) {
@@ -2964,11 +2977,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
getRegionInfo(), -1, new TreeMap<>(Bytes.BYTES_COMPARATOR));
try {
WALUtil.writeFlushMarker(wal, this.getReplicationScope(), getRegionInfo(), desc, true, mvcc,
- null);
+ regionReplicationSink.orElse(null));
return true;
} catch (IOException e) {
- LOG.warn(getRegionInfo().getEncodedName() + " : "
- + "Received exception while trying to write the flush request to wal", e);
+ LOG.warn(getRegionInfo().getEncodedName() + " : " +
+ "Received exception while trying to write the flush request to wal", e);
}
}
return false;
@@ -4413,7 +4426,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
/**
* Batch of mutations for replay. Base class is shared with {@link MutationBatchOperation} as most
* of the logic is same.
+ * @deprecated Since 3.0.0, will be removed in 4.0.0. Now we will not use this operation to apply
+ * edits at secondary replica side.
*/
+ @Deprecated
private static final class ReplayBatchOperation extends BatchOperation<MutationReplay> {
private long origLogSeqNum = 0;
@@ -4551,8 +4567,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
() -> createRegionSpan("Region.batchMutate"));
}
- public OperationStatus[] batchReplay(MutationReplay[] mutations, long replaySeqId)
- throws IOException {
+ /**
+ * @deprecated Since 3.0.0, will be removed in 4.0.0. Now we use
+ * {@link #replayWALEntry(WALEntry, CellScanner)} for replaying edits at secondary
+ * replica side.
+ */
+ @Deprecated
+ OperationStatus[] batchReplay(MutationReplay[] mutations, long replaySeqId) throws IOException {
if (!RegionReplicaUtil.isDefaultReplica(getRegionInfo())
&& replaySeqId < lastReplayedOpenRegionSeqId) {
// if it is a secondary replica we should ignore these entries silently
@@ -5708,9 +5729,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
}
+ /**
+ * @deprecated Since 3.0.0, will be removed in 4.0.0. Only for keep compatibility for old region
+ * replica implementation.
+ */
+ @Deprecated
void replayWALFlushMarker(FlushDescriptor flush, long replaySeqId) throws IOException {
- checkTargetRegion(flush.getEncodedRegionName().toByteArray(),
- "Flush marker from WAL ", flush);
+ checkTargetRegion(flush.getEncodedRegionName().toByteArray(), "Flush marker from WAL ", flush);
if (ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) {
return; // if primary nothing to do
@@ -5750,25 +5775,34 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
}
- /** Replay the flush marker from primary region by creating a corresponding snapshot of
- * the store memstores, only if the memstores do not have a higher seqId from an earlier wal
- * edit (because the events may be coming out of order).
- */
- PrepareFlushResult replayWALFlushStartMarker(FlushDescriptor flush) throws IOException {
- long flushSeqId = flush.getFlushSequenceNumber();
-
- HashSet<HStore> storesToFlush = new HashSet<>();
- for (StoreFlushDescriptor storeFlush : flush.getStoreFlushesList()) {
+ private Collection<HStore> getStoresToFlush(FlushDescriptor flushDesc) {
+ List<HStore> storesToFlush = new ArrayList<>();
+ for (StoreFlushDescriptor storeFlush : flushDesc.getStoreFlushesList()) {
byte[] family = storeFlush.getFamilyName().toByteArray();
HStore store = getStore(family);
if (store == null) {
- LOG.warn(getRegionInfo().getEncodedName() + " : "
- + "Received a flush start marker from primary, but the family is not found. Ignoring"
- + " StoreFlushDescriptor:" + TextFormat.shortDebugString(storeFlush));
+ LOG.warn(getRegionInfo().getEncodedName() + " : " +
+ "Received a flush start marker from primary, but the family is not found. Ignoring" +
+ " StoreFlushDescriptor:" + TextFormat.shortDebugString(storeFlush));
continue;
}
storesToFlush.add(store);
}
+ return storesToFlush;
+ }
+
+ /**
+ * Replay the flush marker from primary region by creating a corresponding snapshot of the store
+ * memstores, only if the memstores do not have a higher seqId from an earlier wal edit (because
+ * the events may be coming out of order).
+ * @deprecated Since 3.0.0, will be removed in 4.0.0. Only for keep compatibility for old region
+ * replica implementation.
+ */
+ @Deprecated
+ PrepareFlushResult replayWALFlushStartMarker(FlushDescriptor flush) throws IOException {
+ long flushSeqId = flush.getFlushSequenceNumber();
+
+ Collection<HStore> storesToFlush = getStoresToFlush(flush);
MonitoredTask status = TaskMonitor.get().createStatus("Preparing flush " + this);
@@ -5864,6 +5898,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return null;
}
+ /**
+ * @deprecated Since 3.0.0, will be removed in 4.0.0. Only for keep compatibility for old region
+ * replica implementation.
+ */
+ @Deprecated
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",
justification="Intentional; post memstore flush")
void replayWALFlushCommitMarker(FlushDescriptor flush) throws IOException {
@@ -5985,11 +6024,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
/**
* Replays the given flush descriptor by opening the flush files in stores and dropping the
* memstore snapshots if requested.
- * @param flush
- * @param prepareFlushResult
- * @param dropMemstoreSnapshot
- * @throws IOException
+ * @deprecated Since 3.0.0, will be removed in 4.0.0. Only for keep compatibility for old region
+ * replica implementation.
*/
+ @Deprecated
private void replayFlushInStores(FlushDescriptor flush, PrepareFlushResult prepareFlushResult,
boolean dropMemstoreSnapshot)
throws IOException {
@@ -6083,7 +6121,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
/**
* Drops the memstore contents after replaying a flush descriptor or region open event replay
* if the memstore edits have seqNums smaller than the given seq id
- * @throws IOException
*/
private MemStoreSize dropMemStoreContentsForSeqId(long seqId, HStore store) throws IOException {
MemStoreSizing totalFreedSize = new NonThreadSafeMemStoreSizing();
@@ -6155,8 +6192,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return prepareFlushResult;
}
- @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",
- justification="Intentional; cleared the memstore")
+ /**
+ * @deprecated Since 3.0.0, will be removed in 4.0.0. Only for keep compatibility for old region
+ * replica implementation.
+ */
+ @Deprecated
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NN_NAKED_NOTIFY",
+ justification = "Intentional; cleared the memstore")
void replayWALRegionEventMarker(RegionEventDescriptor regionEvent) throws IOException {
checkTargetRegion(regionEvent.getEncodedRegionName().toByteArray(),
"RegionEvent marker from WAL ", regionEvent);
@@ -6273,6 +6315,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
}
+ /**
+ * @deprecated Since 3.0.0, will be removed in 4.0.0. Only for keep compatibility for old region
+ * replica implementation.
+ */
+ @Deprecated
void replayWALBulkLoadEventMarker(WALProtos.BulkLoadDescriptor bulkLoadEvent) throws IOException {
checkTargetRegion(bulkLoadEvent.getEncodedRegionName().toByteArray(),
"BulkLoad marker from WAL ", bulkLoadEvent);
@@ -6354,6 +6401,205 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
/**
+ * Replay the batch mutate for secondary replica.
+ * <p/>
+ * We will directly apply the cells to the memstore. This is because:
+ * <ol>
+ * <li>All the cells are gotten from {@link WALEdit}, so we only have {@link Put} and
+ * {@link Delete} here</li>
+ * <li>The replay is single threaded, we do not need to acquire row lock, as the region is read
+ * only so no one else can write it.</li>
+ * <li>We do not need to write WAL.</li>
+ * <li>We will advance MVCC in the caller directly.</li>
+ * </ol>
+ */
+ private void replayWALBatchMutate(Map<byte[], List<Cell>> family2Cells) throws IOException {
+ startRegionOperation(Operation.REPLAY_BATCH_MUTATE);
+ try {
+ for (Map.Entry<byte[], List<Cell>> entry : family2Cells.entrySet()) {
+ applyToMemStore(getStore(entry.getKey()), entry.getValue(), false, memStoreSizing);
+ }
+ } finally {
+ closeRegionOperation(Operation.REPLAY_BATCH_MUTATE);
+ }
+ }
+
+ /**
+ * Replay the meta edits, i.e, flush marker, compaction marker, bulk load marker, region event
+ * marker, etc.
+ * <p/>
+ * For all events other than start flush, we will just call {@link #refreshStoreFiles()} as the
+ * logic is straight-forward and robust. For start flush, we need to snapshot the memstore, so
+ * later {@link #refreshStoreFiles()} call could drop the snapshot, otherwise we may run out of
+ * memory.
+ */
+ private void replayWALMetaEdit(Cell cell) throws IOException {
+ startRegionOperation(Operation.REPLAY_EVENT);
+ try {
+ FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(cell);
+ if (flushDesc != null) {
+ switch (flushDesc.getAction()) {
+ case START_FLUSH:
+ // for start flush, we need to take a snapshot of the current memstore
+ synchronized (writestate) {
+ if (!writestate.flushing) {
+ this.writestate.flushing = true;
+ } else {
+ // usually this should not happen but let's make the code more robust, it is not a
+ // big deal to just ignore it, the refreshStoreFiles call should have the ability to
+ // clean up the inconsistent state.
+ LOG.debug("NOT flushing {} as already flushing", getRegionInfo());
+ break;
+ }
+ }
+ MonitoredTask status =
+ TaskMonitor.get().createStatus("Preparing flush " + getRegionInfo());
+ Collection<HStore> storesToFlush = getStoresToFlush(flushDesc);
+ try {
+ PrepareFlushResult prepareResult =
+ internalPrepareFlushCache(null, flushDesc.getFlushSequenceNumber(), storesToFlush,
+ status, false, FlushLifeCycleTracker.DUMMY);
+ if (prepareResult.result == null) {
+ // save the PrepareFlushResult so that we can use it later from commit flush
+ this.prepareFlushResult = prepareResult;
+ status.markComplete("Flush prepare successful");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{} prepared flush with seqId: {}", getRegionInfo(),
+ flushDesc.getFlushSequenceNumber());
+ }
+ } else {
+ // special case empty memstore. We will still save the flush result in this case,
+ // since our memstore is empty, but the primary is still flushing
+ if (prepareResult.getResult()
+ .getResult() == FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY) {
+ this.prepareFlushResult = prepareResult;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{} prepared empty flush with seqId: {}", getRegionInfo(),
+ flushDesc.getFlushSequenceNumber());
+ }
+ }
+ status.abort("Flush prepare failed with " + prepareResult.result);
+ // nothing much to do. prepare flush failed because of some reason.
+ }
+ } finally {
+ status.cleanup();
+ }
+ break;
+ case ABORT_FLUSH:
+ // do nothing, an abort flush means the source region server will crash itself, after
+ // the primary region online, it will send us an open region marker, then we can clean
+ // up the memstore.
+ synchronized (writestate) {
+ writestate.flushing = false;
+ }
+ break;
+ case COMMIT_FLUSH:
+ case CANNOT_FLUSH:
+ // just call refreshStoreFiles
+ refreshStoreFiles();
+ logRegionFiles();
+ synchronized (writestate) {
+ writestate.flushing = false;
+ }
+ break;
+ default:
+ LOG.warn("{} received a flush event with unknown action: {}", getRegionInfo(),
+ TextFormat.shortDebugString(flushDesc));
+ }
+ } else {
+ // for all other region events, we will do a refreshStoreFiles
+ refreshStoreFiles();
+ logRegionFiles();
+ }
+ } finally {
+ closeRegionOperation(Operation.REPLAY_EVENT);
+ }
+ }
+
+ /**
+ * Replay remote wal entry sent by primary replica.
+ * <p/>
+ * Should only call this method on secondary replicas.
+ */
+ void replayWALEntry(WALEntry entry, CellScanner cells) throws IOException {
+ long timeout = -1L;
+ Optional<RpcCall> call = RpcServer.getCurrentCall();
+ if (call.isPresent()) {
+ long deadline = call.get().getDeadline();
+ if (deadline < Long.MAX_VALUE) {
+ timeout = deadline - EnvironmentEdgeManager.currentTime();
+ if (timeout <= 0) {
+ throw new TimeoutIOException("Timeout while replaying edits for " + getRegionInfo());
+ }
+ }
+ }
+ if (timeout > 0) {
+ try {
+ if (!replayLock.tryLock(timeout, TimeUnit.MILLISECONDS)) {
+ throw new TimeoutIOException(
+ "Timeout while waiting for lock when replaying edits for " + getRegionInfo());
+ }
+ } catch (InterruptedException e) {
+ throw throwOnInterrupt(e);
+ }
+ } else {
+ replayLock.lock();
+ }
+ try {
+ int count = entry.getAssociatedCellCount();
+ long sequenceId = entry.getKey().getLogSequenceNumber();
+ if (lastReplayedSequenceId >= sequenceId) {
+ // we have already replayed this edit, skip
+ // remember to advance the CellScanner, as we may have multiple WALEntries, we may still
+ // need apply later WALEntries
+ for (int i = 0; i < count; i++) {
+ // Throw index out of bounds if our cell count is off
+ if (!cells.advance()) {
+ throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
+ }
+ }
+ return;
+ }
+ Map<byte[], List<Cell>> family2Cells = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+ for (int i = 0; i < count; i++) {
+ // Throw index out of bounds if our cell count is off
+ if (!cells.advance()) {
+ throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
+ }
+ Cell cell = cells.current();
+ if (WALEdit.isMetaEditFamily(cell)) {
+ // If there is meta edit, i.e, we have done flush/compaction/open, then we need to apply
+ // the previous cells first, and then replay the special meta edit. The meta edit is like
+ // a barrier, We need to keep the order. For example, the flush marker will contain a
+ // flush sequence number, which makes us possible to drop memstore content, but if we
+ // apply some edits which have greater sequence id first, then we can not drop the
+ // memstore content when replaying the flush marker, which is not good as we could run out
+ // of memory.
+ // And usually, a meta edit will have a special WALEntry for it, so this is just a safe
+ // guard logic to make sure we do not break things in the worst case.
+ if (!family2Cells.isEmpty()) {
+ replayWALBatchMutate(family2Cells);
+ family2Cells.clear();
+ }
+ replayWALMetaEdit(cell);
+ } else {
+ family2Cells
+ .computeIfAbsent(CellUtil.cloneFamily(cell), k -> new ArrayList<>())
+ .add(cell);
+ }
+ }
+ // do not forget to apply the remaining cells
+ if (!family2Cells.isEmpty()) {
+ replayWALBatchMutate(family2Cells);
+ }
+ mvcc.advanceTo(sequenceId);
+ lastReplayedSequenceId = sequenceId;
+ } finally {
+ replayLock.unlock();
+ }
+ }
+
+ /**
* If all stores ended up dropping their snapshots, we can safely drop the prepareFlushResult
*/
private void dropPrepareFlushIfPossible() {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 724da1a..22357f2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -1088,15 +1088,15 @@ public class RSRpcServices extends HBaseRpcServicesBase<HRegionServer>
/**
* Execute a list of Put/Delete mutations. The function returns OperationStatus instead of
* constructing MultiResponse to save a possible loop if caller doesn't need MultiResponse.
- * @param region
- * @param mutations
- * @param replaySeqId
* @return an array of OperationStatus which internally contains the OperationStatusCode and the
* exceptionMessage if any
- * @throws IOException
+ * @deprecated Since 3.0.0, will be removed in 4.0.0. We do not use this method for replaying
+ * edits for secondary replicas any more, see
+ * {@link #replicateToReplica(RpcController, ReplicateWALEntryRequest)}.
*/
- private OperationStatus [] doReplayBatchOp(final HRegion region,
- final List<MutationReplay> mutations, long replaySeqId) throws IOException {
+ @Deprecated
+ private OperationStatus[] doReplayBatchOp(final HRegion region,
+ final List<MutationReplay> mutations, long replaySeqId) throws IOException {
long before = EnvironmentEdgeManager.currentTime();
boolean batchContainsPuts = false, batchContainsDelete = false;
try {
@@ -2075,21 +2075,30 @@ public class RSRpcServices extends HBaseRpcServicesBase<HRegionServer>
return response;
}
+ private CellScanner getAndReset(RpcController controller) {
+ HBaseRpcController hrc = (HBaseRpcController) controller;
+ CellScanner cells = hrc.cellScanner();
+ hrc.setCellScanner(null);
+ return cells;
+ }
+
/**
* Replay the given changes when distributedLogReplay WAL edits from a failed RS. The guarantee is
* that the given mutations will be durable on the receiving RS if this method returns without any
* exception.
* @param controller the RPC controller
* @param request the request
- * @throws ServiceException
+ * @deprecated Since 3.0.0, will be removed in 4.0.0. Not used any more, put here only for
+ * compatibility with old region replica implementation. Now we will use
+ * {@code replicateToReplica} method instead.
*/
+ @Deprecated
@Override
@QosPriority(priority = HConstants.REPLAY_QOS)
public ReplicateWALEntryResponse replay(final RpcController controller,
- final ReplicateWALEntryRequest request) throws ServiceException {
+ final ReplicateWALEntryRequest request) throws ServiceException {
long before = EnvironmentEdgeManager.currentTime();
- CellScanner cells = ((HBaseRpcController) controller).cellScanner();
- ((HBaseRpcController) controller).setCellScanner(null);
+ CellScanner cells = getAndReset(controller);
try {
checkOpen();
List<WALEntry> entries = request.getEntryList();
@@ -2176,6 +2185,41 @@ public class RSRpcServices extends HBaseRpcServicesBase<HRegionServer>
}
}
+ /**
+ * Replay the given changes on a secondary replica
+ */
+ @Override
+ public ReplicateWALEntryResponse replicateToReplica(RpcController controller,
+ ReplicateWALEntryRequest request) throws ServiceException {
+ CellScanner cells = getAndReset(controller);
+ try {
+ checkOpen();
+ List<WALEntry> entries = request.getEntryList();
+ if (entries == null || entries.isEmpty()) {
+ // empty input
+ return ReplicateWALEntryResponse.newBuilder().build();
+ }
+ ByteString regionName = entries.get(0).getKey().getEncodedRegionName();
+ HRegion region = server.getRegionByEncodedName(regionName.toStringUtf8());
+ if (RegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) {
+ throw new DoNotRetryIOException(
+ "Should not replicate to primary replica " + region.getRegionInfo() + ", CODE BUG?");
+ }
+ for (WALEntry entry : entries) {
+ if (!regionName.equals(entry.getKey().getEncodedRegionName())) {
+ throw new NotServingRegionException(
+ "ReplicateToReplica request contains entries from multiple " +
+ "regions. First region:" + regionName.toStringUtf8() + " , other region:" +
+ entry.getKey().getEncodedRegionName());
+ }
+ region.replayWALEntry(entry, cells);
+ }
+ return ReplicateWALEntryResponse.newBuilder().build();
+ } catch (IOException ie) {
+ throw new ServiceException(ie);
+ }
+ }
+
private void checkShouldRejectReplicationRequest(List<WALEntry> entries) throws IOException {
ReplicationSourceService replicationSource = server.getReplicationSourceService();
if (replicationSource == null || entries.isEmpty()) {
@@ -2209,8 +2253,7 @@ public class RSRpcServices extends HBaseRpcServicesBase<HRegionServer>
requestCount.increment();
List<WALEntry> entries = request.getEntryList();
checkShouldRejectReplicationRequest(entries);
- CellScanner cellScanner = ((HBaseRpcController) controller).cellScanner();
- ((HBaseRpcController) controller).setCellScanner(null);
+ CellScanner cellScanner = getAndReset(controller);
server.getRegionServerCoprocessorHost().preReplicateLogEntries();
server.getReplicationSinkService().replicateLogEntries(entries, cellScanner,
request.getReplicationClusterId(), request.getSourceBaseNamespaceDirPath(),
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationFlushRequester.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationFlushRequester.java
index 960f57e..3431324 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationFlushRequester.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationFlushRequester.java
@@ -120,6 +120,7 @@ class RegionReplicationFlushRequester {
}
// schedule a timer task
HashedWheelTimer timer = getTimer();
+ pendingFlushRequestSequenceId = sequenceId;
pendingFlushRequest =
timer.newTimeout(this::flush, minIntervalSecs - elapsedSecs, TimeUnit.SECONDS);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java
index d5e2387..f0129b7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java
@@ -22,7 +22,6 @@ import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -32,6 +31,7 @@ import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
+import org.agrona.collections.IntHashSet;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
@@ -54,6 +54,7 @@ import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
/**
* The class for replicating WAL edits to secondary replicas, one instance per region.
@@ -128,7 +129,7 @@ public class RegionReplicationSink {
// when we get a flush all request, we will try to remove a replica from this map, the key point
// here is the flush sequence number must be greater than the failed sequence id, otherwise we
// should not remove the replica from this map
- private final Map<Integer, Long> failedReplicas = new HashMap<>();
+ private final IntHashSet failedReplicas;
private final Queue<SinkEntry> entries = new ArrayDeque<>();
@@ -165,6 +166,7 @@ public class RegionReplicationSink {
TimeUnit.MILLISECONDS.toNanos(conf.getLong(RPC_TIMEOUT_MS, RPC_TIMEOUT_MS_DEFAULT));
this.operationTimeoutNs = TimeUnit.MILLISECONDS
.toNanos(conf.getLong(OPERATION_TIMEOUT_MS, OPERATION_TIMEOUT_MS_DEFAULT));
+ this.failedReplicas = new IntHashSet(regionReplication - 1);
}
private void onComplete(List<SinkEntry> sent,
@@ -184,16 +186,16 @@ public class RegionReplicationSink {
if (error != null) {
if (maxSequenceId > lastFlushedSequenceId) {
LOG.warn(
- "Failed to replicate to secondary replica {} for {}, since the max sequence"
- + " id of sunk entris is {}, which is greater than the last flush SN {},"
- + " we will stop replicating for a while and trigger a flush",
+ "Failed to replicate to secondary replica {} for {}, since the max sequence" +
+ " id of sunk entris is {}, which is greater than the last flush SN {}," +
+ " we will stop replicating for a while and trigger a flush",
replicaId, primary, maxSequenceId, lastFlushedSequenceId, error);
failed.add(replicaId);
} else {
LOG.warn(
- "Failed to replicate to secondary replica {} for {}, since the max sequence"
- + " id of sunk entris is {}, which is less than or equal to the last flush SN {},"
- + " we will not stop replicating",
+ "Failed to replicate to secondary replica {} for {}, since the max sequence" +
+ " id of sunk entris is {}, which is less than or equal to the last flush SN {}," +
+ " we will not stop replicating",
replicaId, primary, maxSequenceId, lastFlushedSequenceId, error);
}
}
@@ -201,9 +203,7 @@ public class RegionReplicationSink {
synchronized (entries) {
pendingSize -= toReleaseSize;
if (!failed.isEmpty()) {
- for (Integer replicaId : failed) {
- failedReplicas.put(replicaId, maxSequenceId);
- }
+ failedReplicas.addAll(failed);
flushRequester.requestFlush(maxSequenceId);
}
sending = false;
@@ -237,7 +237,7 @@ public class RegionReplicationSink {
AtomicInteger remaining = new AtomicInteger(toSendReplicaCount);
Map<Integer, MutableObject<Throwable>> replica2Error = new HashMap<>();
for (int replicaId = 1; replicaId < regionReplication; replicaId++) {
- if (failedReplicas.containsKey(replicaId)) {
+ if (failedReplicas.contains(replicaId)) {
continue;
}
MutableObject<Throwable> error = new MutableObject<>();
@@ -253,7 +253,15 @@ public class RegionReplicationSink {
}
}
- private boolean isFlushAllStores(FlushDescriptor flushDesc) {
+ private boolean isStartFlushAllStores(FlushDescriptor flushDesc) {
+ if (flushDesc.getAction() == FlushAction.CANNOT_FLUSH) {
+ // this means the memstore is empty, which means all data before this sequence id are flushed
+ // out, so it equals to a flush all, return true
+ return true;
+ }
+ if (flushDesc.getAction() != FlushAction.START_FLUSH) {
+ return false;
+ }
Set<byte[]> storesFlushed =
flushDesc.getStoreFlushesList().stream().map(sfd -> sfd.getFamilyName().toByteArray())
.collect(Collectors.toCollection(() -> new TreeSet<>(Bytes.BYTES_COMPARATOR)));
@@ -263,7 +271,7 @@ public class RegionReplicationSink {
return storesFlushed.containsAll(tableDesc.getColumnFamilyNames());
}
- private Optional<FlushDescriptor> getFlushAllDescriptor(Cell metaCell) {
+ private Optional<FlushDescriptor> getStartFlushAllDescriptor(Cell metaCell) {
if (!CellUtil.matchingFamily(metaCell, WALEdit.METAFAMILY)) {
return Optional.empty();
}
@@ -274,14 +282,14 @@ public class RegionReplicationSink {
LOG.warn("Failed to parse FlushDescriptor from {}", metaCell);
return Optional.empty();
}
- if (flushDesc != null && isFlushAllStores(flushDesc)) {
+ if (flushDesc != null && isStartFlushAllStores(flushDesc)) {
return Optional.of(flushDesc);
} else {
return Optional.empty();
}
}
- private void clearAllEntries() {
+ private long clearAllEntries() {
long toClearSize = 0;
for (SinkEntry entry : entries) {
toClearSize += entry.size;
@@ -290,20 +298,7 @@ public class RegionReplicationSink {
entries.clear();
pendingSize -= toClearSize;
manager.decrease(toClearSize);
- }
-
- private void clearFailedReplica(long flushSequenceNumber) {
- for (Iterator<Map.Entry<Integer, Long>> iter = failedReplicas.entrySet().iterator(); iter
- .hasNext();) {
- Map.Entry<Integer, Long> entry = iter.next();
- if (entry.getValue().longValue() < flushSequenceNumber) {
- LOG.debug(
- "Got a flush all request with sequence id {}, clear failed replica {}" +
- " with last failed sequence id {}",
- flushSequenceNumber, entry.getKey(), entry.getValue());
- iter.remove();
- }
- }
+ return toClearSize;
}
/**
@@ -325,32 +320,20 @@ public class RegionReplicationSink {
// check whether we flushed all stores, which means we could drop all the previous edits,
// and also, recover from the previous failure of some replicas
for (Cell metaCell : edit.getCells()) {
- getFlushAllDescriptor(metaCell).ifPresent(flushDesc -> {
+ getStartFlushAllDescriptor(metaCell).ifPresent(flushDesc -> {
long flushSequenceNumber = flushDesc.getFlushSequenceNumber();
- int toClearCount = 0;
- long toClearSize = 0;
- for (;;) {
- SinkEntry e = entries.peek();
- if (e == null) {
- break;
- }
- if (e.key.getSequenceId() < flushSequenceNumber) {
- entries.poll();
- toClearCount++;
- toClearSize += e.size;
- } else {
- break;
- }
- }
lastFlushedSequenceId = flushSequenceNumber;
+ long clearedCount = entries.size();
+ long clearedSize = clearAllEntries();
if (LOG.isDebugEnabled()) {
LOG.debug(
- "Got a flush all request with sequence id {}, clear {} pending"
- + " entries with size {}",
- flushSequenceNumber, toClearCount,
- StringUtils.TraditionalBinaryPrefix.long2String(toClearSize, "", 1));
+ "Got a flush all request with sequence id {}, clear {} pending" +
+ " entries with size {}, clear failed replicas {}",
+ flushSequenceNumber, clearedCount,
+ StringUtils.TraditionalBinaryPrefix.long2String(clearedSize, "", 1),
+ failedReplicas);
}
- clearFailedReplica(flushSequenceNumber);
+ failedReplicas.clear();
flushRequester.recordFlush(flushSequenceNumber);
});
}
@@ -371,7 +354,7 @@ public class RegionReplicationSink {
// failed
clearAllEntries();
for (int replicaId = 1; replicaId < regionReplication; replicaId++) {
- failedReplicas.put(replicaId, entry.key.getSequenceId());
+ failedReplicas.add(replicaId);
}
flushRequester.requestFlush(entry.key.getSequenceId());
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java
index 27c86c2..e84195b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java
@@ -466,7 +466,9 @@ public final class WALSplitUtil {
* @param logEntry pair of WALKey and WALEdit instance stores WALKey and WALEdit instances
* extracted from the passed in WALEntry.
* @return list of Pair<MutationType, Mutation> to be replayed
+ * @deprecated Since 3.0.0, will be removed in 4.0.0.
*/
+ @Deprecated
public static List<MutationReplay> getMutationsFromWALEntry(AdminProtos.WALEntry entry,
CellScanner cells, Pair<WALKey, WALEdit> logEntry, Durability durability) throws IOException {
if (entry == null) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
index 56813af..4e5cd28 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
@@ -755,4 +755,10 @@ class MockRegionServer implements AdminProtos.AdminService.BlockingInterface,
public RegionReplicationBufferManager getRegionReplicationBufferManager() {
return null;
}
+
+ @Override
+ public ReplicateWALEntryResponse replicateToReplica(RpcController controller,
+ ReplicateWALEntryRequest request) throws ServiceException {
+ return null;
+ }
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
index 1db4bb8..6214b31 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
@@ -62,7 +62,6 @@ import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.executor.ExecutorService;
-import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorConfig;
import org.apache.hadoop.hbase.executor.ExecutorType;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
@@ -113,6 +112,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescript
* Tests of HRegion methods for replaying flush, compaction, region open, etc events for secondary
* region replicas
*/
+@SuppressWarnings("deprecation")
@Category(LargeTests.class)
public class TestHRegionReplayEvents {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReplicateToReplica.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReplicateToReplica.java
new file mode 100644
index 0000000..d9f846d
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReplicateToReplica.java
@@ -0,0 +1,388 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNameTestRule;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.client.RegionReplicaUtil;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.executor.ExecutorService;
+import org.apache.hadoop.hbase.executor.ExecutorType;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil;
+import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult;
+import org.apache.hadoop.hbase.regionserver.regionreplication.RegionReplicationBufferManager;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
+
+@Category({ RegionServerTests.class, MediumTests.class })
+public class TestReplicateToReplica {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestReplicateToReplica.class);
+
+ private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
+
+ private static byte[] FAMILY = Bytes.toBytes("family");
+
+ private static byte[] QUAL = Bytes.toBytes("qualifier");
+
+ private static ExecutorService EXEC;
+
+ @Rule
+ public final TableNameTestRule name = new TableNameTestRule();
+
+ private TableName tableName;
+
+ private Path testDir;
+
+ private TableDescriptor td;
+
+ private RegionServerServices rss;
+
+ private AsyncClusterConnection conn;
+
+ private RegionReplicationBufferManager manager;
+
+ private FlushRequester flushRequester;
+
+ private HRegion primary;
+
+ private HRegion secondary;
+
+ private WALFactory walFactory;
+
+ private boolean queueReqAndResps;
+
+ private Queue<Pair<List<WAL.Entry>, CompletableFuture<Void>>> reqAndResps;
+
+ private static List<Put> TO_ADD_AFTER_PREPARE_FLUSH;
+
+ public static final class HRegionForTest extends HRegion {
+
+ public HRegionForTest(HRegionFileSystem fs, WAL wal, Configuration confParam,
+ TableDescriptor htd, RegionServerServices rsServices) {
+ super(fs, wal, confParam, htd, rsServices);
+ }
+
+ @SuppressWarnings("deprecation")
+ public HRegionForTest(Path tableDir, WAL wal, FileSystem fs, Configuration confParam,
+ RegionInfo regionInfo, TableDescriptor htd, RegionServerServices rsServices) {
+ super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices);
+ }
+
+ @Override
+ protected PrepareFlushResult internalPrepareFlushCache(WAL wal, long myseqid,
+ Collection<HStore> storesToFlush, MonitoredTask status, boolean writeFlushWalMarker,
+ FlushLifeCycleTracker tracker) throws IOException {
+ PrepareFlushResult result = super.internalPrepareFlushCache(wal, myseqid, storesToFlush,
+ status, writeFlushWalMarker, tracker);
+ for (Put put : TO_ADD_AFTER_PREPARE_FLUSH) {
+ put(put);
+ }
+ TO_ADD_AFTER_PREPARE_FLUSH.clear();
+ return result;
+ }
+
+ }
+
+ @BeforeClass
+ public static void setUpBeforeClass() {
+ Configuration conf = UTIL.getConfiguration();
+ conf.setInt("hbase.region.read-replica.sink.flush.min-interval.secs", 1);
+ conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true);
+ conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CATALOG_CONF_KEY, true);
+ conf.setClass(HConstants.REGION_IMPL, HRegionForTest.class, HRegion.class);
+ EXEC = new ExecutorService("test");
+ EXEC.startExecutorService(EXEC.new ExecutorConfig().setCorePoolSize(1)
+ .setExecutorType(ExecutorType.RS_COMPACTED_FILES_DISCHARGER));
+ ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null,
+ MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() {
+ EXEC.shutdown();
+ UTIL.cleanupTestDir();
+ }
+
+ @Before
+ public void setUp() throws IOException {
+ TO_ADD_AFTER_PREPARE_FLUSH = new ArrayList<>();
+ tableName = name.getTableName();
+ testDir = UTIL.getDataTestDir(tableName.getNameAsString());
+ Configuration conf = UTIL.getConfiguration();
+ conf.set(HConstants.HBASE_DIR, testDir.toString());
+
+ td = TableDescriptorBuilder.newBuilder(tableName)
+ .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).setRegionReplication(2)
+ .setRegionMemStoreReplication(true).build();
+
+ reqAndResps = new ArrayDeque<>();
+ queueReqAndResps = true;
+ conn = mock(AsyncClusterConnection.class);
+ when(conn.replicate(any(), anyList(), anyInt(), anyLong(), anyLong())).thenAnswer(i -> {
+ if (queueReqAndResps) {
+ @SuppressWarnings("unchecked")
+ List<WAL.Entry> entries = i.getArgument(1, List.class);
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ reqAndResps.add(Pair.newPair(entries, future));
+ return future;
+ } else {
+ return CompletableFuture.completedFuture(null);
+ }
+ });
+
+ flushRequester = mock(FlushRequester.class);
+
+ rss = mock(RegionServerServices.class);
+ when(rss.getServerName()).thenReturn(ServerName.valueOf("foo", 1, 1));
+ when(rss.getConfiguration()).thenReturn(conf);
+ when(rss.getRegionServerAccounting()).thenReturn(new RegionServerAccounting(conf));
+ when(rss.getExecutorService()).thenReturn(EXEC);
+ when(rss.getAsyncClusterConnection()).thenReturn(conn);
+ when(rss.getFlushRequester()).thenReturn(flushRequester);
+
+ manager = new RegionReplicationBufferManager(rss);
+ when(rss.getRegionReplicationBufferManager()).thenReturn(manager);
+
+ RegionInfo primaryHri = RegionInfoBuilder.newBuilder(td.getTableName()).build();
+ RegionInfo secondaryHri = RegionReplicaUtil.getRegionInfoForReplica(primaryHri, 1);
+
+ walFactory = new WALFactory(conf, UUID.randomUUID().toString());
+ WAL wal = walFactory.getWAL(primaryHri);
+ primary = HRegion.createHRegion(primaryHri, testDir, conf, td, wal);
+ primary.close();
+
+ primary = HRegion.openHRegion(testDir, primaryHri, td, wal, conf, rss, null);
+ secondary = HRegion.openHRegion(secondaryHri, td, null, conf, rss, null);
+
+ when(rss.getRegions()).then(i -> {
+ return Arrays.asList(primary, secondary);
+ });
+
+ // process the open events
+ replicateAll();
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ // close region will issue a flush, which will enqueue an edit into the replication sink so we
+ // need to complete it otherwise the test will hang.
+ queueReqAndResps = false;
+ failAll();
+ HBaseTestingUtil.closeRegionAndWAL(primary);
+ HBaseTestingUtil.closeRegionAndWAL(secondary);
+ if (walFactory != null) {
+ walFactory.close();
+ }
+ }
+
+ private FlushResult flushPrimary() throws IOException {
+ return primary.flushcache(true, true, FlushLifeCycleTracker.DUMMY);
+ }
+
+ private void replicate(Pair<List<WAL.Entry>, CompletableFuture<Void>> pair) throws IOException {
+ Pair<ReplicateWALEntryRequest, CellScanner> params = ReplicationProtobufUtil
+ .buildReplicateWALEntryRequest(pair.getFirst().toArray(new WAL.Entry[0]),
+ secondary.getRegionInfo().getEncodedNameAsBytes(), null, null, null);
+ for (WALEntry entry : params.getFirst().getEntryList()) {
+ secondary.replayWALEntry(entry, params.getSecond());
+ }
+ pair.getSecond().complete(null);
+ }
+
+ private void replicateOne() throws IOException {
+ replicate(reqAndResps.remove());
+ }
+
+ private void replicateAll() throws IOException {
+ for (Pair<List<WAL.Entry>, CompletableFuture<Void>> pair;;) {
+ pair = reqAndResps.poll();
+ if (pair == null) {
+ break;
+ }
+ replicate(pair);
+ }
+ }
+
+ private void failOne() {
+ reqAndResps.remove().getSecond().completeExceptionally(new IOException("Inject error"));
+ }
+
+ private void failAll() {
+ for (Pair<List<WAL.Entry>, CompletableFuture<Void>> pair;;) {
+ pair = reqAndResps.poll();
+ if (pair == null) {
+ break;
+ }
+ pair.getSecond().completeExceptionally(new IOException("Inject error"));
+ }
+ }
+
+ @Test
+ public void testNormalReplicate() throws IOException {
+ byte[] row = Bytes.toBytes(0);
+ primary.put(new Put(row).addColumn(FAMILY, QUAL, Bytes.toBytes(1)));
+ replicateOne();
+ assertEquals(1, Bytes.toInt(secondary.get(new Get(row)).getValue(FAMILY, QUAL)));
+ }
+
+ @Test
+ public void testNormalFlush() throws IOException {
+ byte[] row = Bytes.toBytes(0);
+ primary.put(new Put(row).addColumn(FAMILY, QUAL, Bytes.toBytes(1)));
+ TO_ADD_AFTER_PREPARE_FLUSH.add(new Put(row).addColumn(FAMILY, QUAL, Bytes.toBytes(2)));
+ flushPrimary();
+ replicateAll();
+ assertEquals(2, Bytes.toInt(secondary.get(new Get(row)).getValue(FAMILY, QUAL)));
+
+ // we should have the same memstore size, i.e, the secondary should have also dropped the
+ // snapshot
+ assertEquals(primary.getMemStoreDataSize(), secondary.getMemStoreDataSize());
+ }
+
+ @Test
+ public void testErrorBeforeFlushStart() throws IOException {
+ byte[] row = Bytes.toBytes(0);
+ primary.put(new Put(row).addColumn(FAMILY, QUAL, Bytes.toBytes(1)));
+ failOne();
+ verify(flushRequester, times(1)).requestFlush(any(), anyList(), any());
+ TO_ADD_AFTER_PREPARE_FLUSH.add(new Put(row).addColumn(FAMILY, QUAL, Bytes.toBytes(2)));
+ flushPrimary();
+ // this also tests start flush with empty memstore at secondary replica side
+ replicateAll();
+ assertEquals(2, Bytes.toInt(secondary.get(new Get(row)).getValue(FAMILY, QUAL)));
+ assertEquals(primary.getMemStoreDataSize(), secondary.getMemStoreDataSize());
+ }
+
+ @Test
+ public void testErrorAfterFlushStartBeforeFlushCommit() throws IOException {
+ primary.put(new Put(Bytes.toBytes(0)).addColumn(FAMILY, QUAL, Bytes.toBytes(1)));
+ replicateAll();
+ TO_ADD_AFTER_PREPARE_FLUSH
+ .add(new Put(Bytes.toBytes(1)).addColumn(FAMILY, QUAL, Bytes.toBytes(2)));
+ flushPrimary();
+ // replicate the start flush edit
+ replicateOne();
+ // fail the remaining edits, the put and the commit flush edit
+ failOne();
+ verify(flushRequester, times(1)).requestFlush(any(), anyList(), any());
+ primary.put(new Put(Bytes.toBytes(2)).addColumn(FAMILY, QUAL, Bytes.toBytes(3)));
+ flushPrimary();
+ replicateAll();
+ for (int i = 0; i < 3; i++) {
+ assertEquals(i + 1,
+ Bytes.toInt(secondary.get(new Get(Bytes.toBytes(i))).getValue(FAMILY, QUAL)));
+ }
+ // should have nothing in memstore
+ assertEquals(0, secondary.getMemStoreDataSize());
+ }
+
+ @Test
+ public void testCatchUpWithCannotFlush() throws IOException, InterruptedException {
+ byte[] row = Bytes.toBytes(0);
+ primary.put(new Put(row).addColumn(FAMILY, QUAL, Bytes.toBytes(1)));
+ failOne();
+ verify(flushRequester, times(1)).requestFlush(any(), anyList(), any());
+ flushPrimary();
+ failAll();
+ Thread.sleep(2000);
+ // we will request flush the second time
+ verify(flushRequester, times(2)).requestFlush(any(), anyList(), any());
+ // we can not flush because no content in memstore
+ FlushResult result = flushPrimary();
+ assertEquals(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, result.getResult());
+ // the secondary replica does not have this row yet
+ assertFalse(secondary.get(new Get(row).setCheckExistenceOnly(true)).getExists().booleanValue());
+ // replicate the can not flush edit
+ replicateOne();
+ // we should have the row now
+ assertEquals(1, Bytes.toInt(secondary.get(new Get(row)).getValue(FAMILY, QUAL)));
+ }
+
+ @Test
+ public void testCatchUpWithReopen() throws IOException {
+ byte[] row = Bytes.toBytes(0);
+ primary.put(new Put(row).addColumn(FAMILY, QUAL, Bytes.toBytes(1)));
+ failOne();
+ primary.close();
+ // the secondary replica does not have this row yet, although the above close has flushed the
+ // data out
+ assertFalse(secondary.get(new Get(row).setCheckExistenceOnly(true)).getExists().booleanValue());
+
+ // reopen
+ primary = HRegion.openHRegion(testDir, primary.getRegionInfo(), td, primary.getWAL(),
+ UTIL.getConfiguration(), rss, null);
+ replicateAll();
+ // we should have the row now
+ assertEquals(1, Bytes.toInt(secondary.get(new Get(row)).getValue(FAMILY, QUAL)));
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSink.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSink.java
index 76a224b..918f644 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSink.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSink.java
@@ -236,7 +236,7 @@ public class TestRegionReplicationSink {
throw new IllegalStateException();
}, () -> new TreeMap<>(Bytes.BYTES_COMPARATOR)));
FlushDescriptor fd =
- ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH, primary, 2L, committedFiles);
+ ProtobufUtil.toFlushDescriptor(FlushAction.START_FLUSH, primary, 2L, committedFiles);
WALEdit edit2 = WALEdit.createFlushWALEdit(primary, fd);
sink.add(key2, edit2, rpcCall2);
@@ -300,7 +300,7 @@ public class TestRegionReplicationSink {
throw new IllegalStateException();
}, () -> new TreeMap<>(Bytes.BYTES_COMPARATOR)));
FlushDescriptor fd =
- ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH, primary, 2L, committedFiles);
+ ProtobufUtil.toFlushDescriptor(FlushAction.START_FLUSH, primary, 2L, committedFiles);
WALEdit edit3 = WALEdit.createFlushWALEdit(primary, fd);
sink.add(key3, edit3, rpcCall3);
@@ -313,91 +313,4 @@ public class TestRegionReplicationSink {
// should have send out all so no pending entries.
assertEquals(0, sink.pendingSize());
}
-
- @Test
- public void testNotClearFailedReplica() {
- // simulate this scenario:
- // 1. prepare flush
- // 2. add one edit broken
- // 3. commit flush with flush sequence number less than the previous edit(this is the normal
- // case)
- // we should not clear the failed replica as we do not flush the broken edit out with this
- // flush, we need an extra flush to flush it out
- MutableInt next = new MutableInt(0);
- List<CompletableFuture<Void>> futures =
- Stream.generate(() -> new CompletableFuture<Void>()).limit(8).collect(Collectors.toList());
- when(conn.replicate(any(), anyList(), anyInt(), anyLong(), anyLong()))
- .then(i -> futures.get(next.getAndIncrement()));
- when(manager.increase(anyLong())).thenReturn(true);
-
- ServerCall<?> rpcCall1 = mock(ServerCall.class);
- WALKeyImpl key1 = mock(WALKeyImpl.class);
- when(key1.estimatedSerializedSizeOf()).thenReturn(100L);
- when(key1.getSequenceId()).thenReturn(1L);
- Map<byte[], List<Path>> committedFiles = td.getColumnFamilyNames().stream()
- .collect(Collectors.toMap(Function.identity(), k -> Collections.emptyList(), (u, v) -> {
- throw new IllegalStateException();
- }, () -> new TreeMap<>(Bytes.BYTES_COMPARATOR)));
- FlushDescriptor fd =
- ProtobufUtil.toFlushDescriptor(FlushAction.START_FLUSH, primary, 1L, committedFiles);
- WALEdit edit1 = WALEdit.createFlushWALEdit(primary, fd);
- sink.add(key1, edit1, rpcCall1);
-
- futures.get(0).complete(null);
- futures.get(1).complete(null);
-
- ServerCall<?> rpcCall2 = mock(ServerCall.class);
- WALKeyImpl key2 = mock(WALKeyImpl.class);
- when(key2.estimatedSerializedSizeOf()).thenReturn(200L);
- when(key2.getSequenceId()).thenReturn(2L);
- WALEdit edit2 = mock(WALEdit.class);
- when(edit2.estimatedSerializedSizeOf()).thenReturn(2000L);
- sink.add(key2, edit2, rpcCall2);
-
- // fail the call to replica 1
- futures.get(2).completeExceptionally(new IOException("inject error"));
- futures.get(3).complete(null);
-
- ServerCall<?> rpcCall3 = mock(ServerCall.class);
- WALKeyImpl key3 = mock(WALKeyImpl.class);
- when(key3.estimatedSerializedSizeOf()).thenReturn(300L);
- when(key3.getSequenceId()).thenReturn(3L);
- FlushDescriptor fd3 =
- ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH, primary, 1L, committedFiles);
- WALEdit edit3 = WALEdit.createFlushWALEdit(primary, fd3);
- sink.add(key3, edit3, rpcCall3);
-
- // we should only call replicate once for edit3, since replica 1 is marked as failed, and the
- // flush request can not clean the failed replica since the flush sequence number is not greater
- // than sequence id of the last failed edit
- verify(conn, times(5)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong());
- futures.get(4).complete(null);
-
- ServerCall<?> rpcCall4 = mock(ServerCall.class);
- WALKeyImpl key4 = mock(WALKeyImpl.class);
- when(key4.estimatedSerializedSizeOf()).thenReturn(400L);
- when(key4.getSequenceId()).thenReturn(4L);
- WALEdit edit4 = mock(WALEdit.class);
- when(edit4.estimatedSerializedSizeOf()).thenReturn(4000L);
- sink.add(key4, edit4, rpcCall4);
-
- // still, only send to replica 2
- verify(conn, times(6)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong());
- futures.get(5).complete(null);
-
- ServerCall<?> rpcCall5 = mock(ServerCall.class);
- WALKeyImpl key5 = mock(WALKeyImpl.class);
- when(key5.estimatedSerializedSizeOf()).thenReturn(300L);
- when(key5.getSequenceId()).thenReturn(3L);
- FlushDescriptor fd5 =
- ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH, primary, 4L, committedFiles);
- WALEdit edit5 = WALEdit.createFlushWALEdit(primary, fd5);
- sink.add(key5, edit5, rpcCall5);
-
- futures.get(6).complete(null);
- futures.get(7).complete(null);
- // should have cleared the failed replica because the flush sequence number is greater than than
- // the sequence id of the last failed edit
- verify(conn, times(8)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong());
- }
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplication.java
index a4da640..2fcfc29 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplication.java
@@ -89,7 +89,6 @@ public class TestMetaRegionReplicaReplication {
conf.setInt("zookeeper.recovery.retry", 1);
conf.setInt("zookeeper.recovery.retry.intervalmill", 10);
conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
- conf.setInt("replication.stats.thread.period.seconds", 5);
conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);
// Enable hbase:meta replication.
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplication.java
index 231c9e1..ac279ed 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplication.java
@@ -85,7 +85,6 @@ public class TestRegionReplicaReplication {
conf.setInt("zookeeper.recovery.retry.intervalmill", 10);
conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true);
conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
- conf.setInt("replication.stats.thread.period.seconds", 5);
conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); // less number of retries is needed
conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);