You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/03/27 10:20:52 UTC
[06/50] [abbrv] hbase git commit: HBASE-20147 Serial replication will
be stuck if we create a table with serial replication but add it to a peer
after there are region moves
HBASE-20147 Serial replication will be stuck if we create a table with serial replication but add it to a peer after there are region moves
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/64061f89
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/64061f89
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/64061f89
Branch: refs/heads/HBASE-19064
Commit: 64061f896fe21512504e3886a400759e88b519da
Parents: aadb2f0
Author: zhangduo <zh...@apache.org>
Authored: Wed Mar 21 21:03:14 2018 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Fri Mar 23 14:31:20 2018 +0800
----------------------------------------------------------------------
.../hadoop/hbase/AsyncMetaTableAccessor.java | 50 ++--
.../apache/hadoop/hbase/MetaTableAccessor.java | 26 ++-
.../src/main/protobuf/MasterProcedure.proto | 7 +-
.../replication/ReplicationQueueStorage.java | 8 +
.../hbase/replication/ReplicationUtils.java | 6 +-
.../replication/ZKReplicationQueueStorage.java | 50 ++--
.../master/replication/AddPeerProcedure.java | 21 +-
.../master/replication/ModifyPeerProcedure.java | 166 +++++++++++++-
.../replication/ReplicationPeerManager.java | 32 +--
.../replication/UpdatePeerConfigProcedure.java | 59 ++++-
.../regionserver/PeerProcedureHandlerImpl.java | 17 +-
.../regionserver/ReplicationSourceManager.java | 4 +-
.../replication/regionserver/WALEntryBatch.java | 8 +
.../replication/SerialReplicationTestBase.java | 229 +++++++++++++++++++
.../TestAddToSerialReplicationPeer.java | 215 +++++++++++++++++
.../replication/TestSerialReplication.java | 191 +---------------
16 files changed, 825 insertions(+), 264 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/64061f89/hbase-client/src/main/java/org/apache/hadoop/hbase/AsyncMetaTableAccessor.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/AsyncMetaTableAccessor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/AsyncMetaTableAccessor.java
index 05e60d4..13245d3 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/AsyncMetaTableAccessor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/AsyncMetaTableAccessor.java
@@ -489,16 +489,17 @@ public class AsyncMetaTableAccessor {
QueryType type) {
return tableName.map((table) -> {
switch (type) {
- case REGION:
- byte[] startRow = new byte[table.getName().length + 2];
- System.arraycopy(table.getName(), 0, startRow, 0, table.getName().length);
- startRow[startRow.length - 2] = HConstants.DELIMITER;
- startRow[startRow.length - 1] = HConstants.DELIMITER;
- return startRow;
- case ALL:
- case TABLE:
- default:
- return table.getName();
+ case REGION:
+ case REPLICATION:
+ byte[] startRow = new byte[table.getName().length + 2];
+ System.arraycopy(table.getName(), 0, startRow, 0, table.getName().length);
+ startRow[startRow.length - 2] = HConstants.DELIMITER;
+ startRow[startRow.length - 1] = HConstants.DELIMITER;
+ return startRow;
+ case ALL:
+ case TABLE:
+ default:
+ return table.getName();
}
});
}
@@ -512,20 +513,21 @@ public class AsyncMetaTableAccessor {
return tableName.map((table) -> {
final byte[] stopRow;
switch (type) {
- case REGION:
- stopRow = new byte[table.getName().length + 3];
- System.arraycopy(table.getName(), 0, stopRow, 0, table.getName().length);
- stopRow[stopRow.length - 3] = ' ';
- stopRow[stopRow.length - 2] = HConstants.DELIMITER;
- stopRow[stopRow.length - 1] = HConstants.DELIMITER;
- break;
- case ALL:
- case TABLE:
- default:
- stopRow = new byte[table.getName().length + 1];
- System.arraycopy(table.getName(), 0, stopRow, 0, table.getName().length);
- stopRow[stopRow.length - 1] = ' ';
- break;
+ case REGION:
+ case REPLICATION:
+ stopRow = new byte[table.getName().length + 3];
+ System.arraycopy(table.getName(), 0, stopRow, 0, table.getName().length);
+ stopRow[stopRow.length - 3] = ' ';
+ stopRow[stopRow.length - 2] = HConstants.DELIMITER;
+ stopRow[stopRow.length - 1] = HConstants.DELIMITER;
+ break;
+ case ALL:
+ case TABLE:
+ default:
+ stopRow = new byte[table.getName().length + 1];
+ System.arraycopy(table.getName(), 0, stopRow, 0, table.getName().length);
+ stopRow[stopRow.length - 1] = ' ';
+ break;
}
return stopRow;
});
http://git-wip-us.apache.org/repos/asf/hbase/blob/64061f89/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
index 8858e71..a2d0ae2 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
@@ -192,7 +192,8 @@ public class MetaTableAccessor {
public enum QueryType {
ALL(HConstants.TABLE_FAMILY, HConstants.CATALOG_FAMILY),
REGION(HConstants.CATALOG_FAMILY),
- TABLE(HConstants.TABLE_FAMILY);
+ TABLE(HConstants.TABLE_FAMILY),
+ REPLICATION(HConstants.REPLICATION_BARRIER_FAMILY);
private final byte[][] families;
@@ -1168,8 +1169,9 @@ public class MetaTableAccessor {
final List<T> results = new ArrayList<>();
@Override
public boolean visit(Result r) throws IOException {
- if (r == null || r.isEmpty()) return true;
- add(r);
+ if (r != null && !r.isEmpty()) {
+ add(r);
+ }
return true;
}
@@ -2108,6 +2110,24 @@ public class MetaTableAccessor {
}
}
+ public static List<Pair<String, Long>> getTableEncodedRegionNameAndLastBarrier(Connection conn,
+ TableName tableName) throws IOException {
+ List<Pair<String, Long>> list = new ArrayList<>();
+ scanMeta(conn, getTableStartRowForMeta(tableName, QueryType.REPLICATION),
+ getTableStopRowForMeta(tableName, QueryType.REPLICATION), QueryType.REPLICATION, r -> {
+ byte[] value =
+ r.getValue(HConstants.REPLICATION_BARRIER_FAMILY, HConstants.SEQNUM_QUALIFIER);
+ if (value == null) {
+ return true;
+ }
+ long lastBarrier = Bytes.toLong(value);
+ String encodedRegionName = RegionInfo.encodeRegionName(r.getRow());
+ list.add(Pair.newPair(encodedRegionName, lastBarrier));
+ return true;
+ });
+ return list;
+ }
+
private static void debugLogMutations(List<? extends Mutation> mutations) throws IOException {
if (!METALOG.isDebugEnabled()) {
return;
http://git-wip-us.apache.org/repos/asf/hbase/blob/64061f89/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
index fa6fa75..f710759 100644
--- a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
@@ -377,7 +377,11 @@ enum PeerModificationState {
PRE_PEER_MODIFICATION = 1;
UPDATE_PEER_STORAGE = 2;
REFRESH_PEER_ON_RS = 3;
- POST_PEER_MODIFICATION = 4;
+ SERIAL_PEER_REOPEN_REGIONS = 4;
+ SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID = 5;
+ SERIAL_PEER_SET_PEER_ENABLED = 6;
+ SERIAL_PEER_ENABLE_PEER_REFRESH_PEER_ON_RS = 7;
+ POST_PEER_MODIFICATION = 8;
}
message PeerModificationStateData {
@@ -415,4 +419,5 @@ message AddPeerStateData {
message UpdatePeerConfigStateData {
required ReplicationPeer peer_config = 1;
+ optional ReplicationPeer old_peer_config = 2;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/64061f89/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java
index cfe9c9c..99a1e97 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java
@@ -79,6 +79,14 @@ public interface ReplicationQueueStorage {
long getLastSequenceId(String encodedRegionName, String peerId) throws ReplicationException;
/**
+ * Set the max sequence id of a bunch of regions for a given peer. Will be called when setting up
+ * a serial replication peer.
+ * @param peerId peer id
+ * @param lastSeqIds map with {encodedRegionName, sequenceId} pairs for serial replication.
+ */
+ void setLastSequenceIds(String peerId, Map<String, Long> lastSeqIds) throws ReplicationException;
+
+ /**
* Get the current position for a specific WAL in a given queue for a given regionserver.
* @param serverName the name of the regionserver
* @param queueId a String that identifies the queue
http://git-wip-us.apache.org/repos/asf/hbase/blob/64061f89/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
index e2479e0..1c42de4 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
@@ -111,13 +111,11 @@ public final class ReplicationUtils {
return true;
}
- public static boolean isKeyConfigEqual(ReplicationPeerConfig rpc1, ReplicationPeerConfig rpc2) {
+ public static boolean isNamespacesAndTableCFsEqual(ReplicationPeerConfig rpc1,
+ ReplicationPeerConfig rpc2) {
if (rpc1.replicateAllUserTables() != rpc2.replicateAllUserTables()) {
return false;
}
- if (rpc1.isSerial() != rpc2.isSerial()) {
- return false;
- }
if (rpc1.replicateAllUserTables()) {
return isNamespacesEqual(rpc1.getExcludeNamespaces(), rpc2.getExcludeNamespaces()) &&
isTableCFsEqual(rpc1.getExcludeTableCFsMap(), rpc2.getExcludeTableCFsMap());
http://git-wip-us.apache.org/repos/asf/hbase/blob/64061f89/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
index d4363db..e5a498a 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
@@ -203,6 +203,24 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase
}
}
+ private void addLastSeqIdsToOps(String queueId, Map<String, Long> lastSeqIds,
+ List<ZKUtilOp> listOfOps) throws KeeperException {
+ for (Entry<String, Long> lastSeqEntry : lastSeqIds.entrySet()) {
+ String peerId = new ReplicationQueueInfo(queueId).getPeerId();
+ String path = getSerialReplicationRegionPeerNode(lastSeqEntry.getKey(), peerId);
+ /*
+ * Make sure the existence of path
+ * /hbase/replication/regions/<hash>/<encoded-region-name>-<peer-id>. As the javadoc in
+ * multiOrSequential() method said, if received a NodeExistsException, all operations will
+ * fail. So create the path here, and in fact, no need to add this operation to listOfOps,
+ * because only need to make sure that update file position and sequence id atomically.
+ */
+ ZKUtil.createWithParents(zookeeper, path);
+ // Persist the max sequence id of region to zookeeper.
+ listOfOps.add(ZKUtilOp.setData(path, ZKUtil.positionToByteArray(lastSeqEntry.getValue())));
+ }
+ }
+
@Override
public void setWALPosition(ServerName serverName, String queueId, String fileName, long position,
Map<String, Long> lastSeqIds) throws ReplicationException {
@@ -213,23 +231,8 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase
ZKUtil.positionToByteArray(position)));
}
// Persist the max sequence id(s) of regions for serial replication atomically.
- for (Entry<String, Long> lastSeqEntry : lastSeqIds.entrySet()) {
- String peerId = new ReplicationQueueInfo(queueId).getPeerId();
- String path = getSerialReplicationRegionPeerNode(lastSeqEntry.getKey(), peerId);
- /*
- * Make sure the existence of path
- * /hbase/replication/regions/<hash>/<encoded-region-name>-<peer-id>. As the javadoc in
- * multiOrSequential() method said, if received a NodeExistsException, all operations will
- * fail. So create the path here, and in fact, no need to add this operation to listOfOps,
- * because only need to make sure that update file position and sequence id atomically.
- */
- ZKUtil.createWithParents(zookeeper, path);
- // Persist the max sequence id of region to zookeeper.
- listOfOps.add(ZKUtilOp.setData(path, ZKUtil.positionToByteArray(lastSeqEntry.getValue())));
- }
- if (!listOfOps.isEmpty()) {
- ZKUtil.multiOrSequential(zookeeper, listOfOps, false);
- }
+ addLastSeqIdsToOps(queueId, lastSeqIds, listOfOps);
+ ZKUtil.multiOrSequential(zookeeper, listOfOps, false);
} catch (KeeperException e) {
throw new ReplicationException("Failed to set log position (serverName=" + serverName
+ ", queueId=" + queueId + ", fileName=" + fileName + ", position=" + position + ")", e);
@@ -257,6 +260,19 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase
}
@Override
+ public void setLastSequenceIds(String peerId, Map<String, Long> lastSeqIds)
+ throws ReplicationException {
+ try {
+ List<ZKUtilOp> listOfOps = new ArrayList<>();
+ addLastSeqIdsToOps(peerId, lastSeqIds, listOfOps);
+ ZKUtil.multiOrSequential(zookeeper, listOfOps, false);
+ } catch (KeeperException e) {
+ throw new ReplicationException("Failed to set last sequence ids, peerId=" + peerId +
+ ", lastSeqIds.size=" + lastSeqIds.size(), e);
+ }
+ }
+
+ @Override
public long getWALPosition(ServerName serverName, String queueId, String fileName)
throws ReplicationException {
byte[] bytes;
http://git-wip-us.apache.org/repos/asf/hbase/blob/64061f89/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java
index f0f7704..72228f6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java
@@ -57,6 +57,21 @@ public class AddPeerProcedure extends ModifyPeerProcedure {
}
@Override
+ protected boolean reopenRegionsAfterRefresh() {
+ return true;
+ }
+
+ @Override
+ protected boolean enablePeerBeforeFinish() {
+ return enabled;
+ }
+
+ @Override
+ protected ReplicationPeerConfig getNewPeerConfig() {
+ return peerConfig;
+ }
+
+ @Override
protected void prePeerModification(MasterProcedureEnv env)
throws IOException, ReplicationException {
MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
@@ -68,11 +83,13 @@ public class AddPeerProcedure extends ModifyPeerProcedure {
@Override
protected void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException {
- env.getReplicationPeerManager().addPeer(peerId, peerConfig, enabled);
+ env.getReplicationPeerManager().addPeer(peerId, peerConfig,
+ peerConfig.isSerial() ? false : enabled);
}
@Override
- protected void postPeerModification(MasterProcedureEnv env) throws IOException {
+ protected void postPeerModification(MasterProcedureEnv env)
+ throws IOException, ReplicationException {
LOG.info("Successfully added {} peer {}, config {}", enabled ? "ENABLED" : "DISABLED", peerId,
peerConfig);
MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
http://git-wip-us.apache.org/repos/asf/hbase/blob/64061f89/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
index 83c5134..2b76487 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
@@ -18,11 +18,28 @@
package org.apache.hadoop.hbase.master.replication;
import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Stream;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.master.MasterFileSystem;
+import org.apache.hadoop.hbase.master.TableStateManager;
+import org.apache.hadoop.hbase.master.TableStateManager.TableStateNotFoundException;
+import org.apache.hadoop.hbase.master.assignment.RegionStates;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ReplicationUtils;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.wal.WALSplitter;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,6 +55,8 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
private static final Logger LOG = LoggerFactory.getLogger(ModifyPeerProcedure.class);
+ private static final int SET_LAST_SEQ_ID_BATCH_SIZE = 1000;
+
protected ModifyPeerProcedure() {
}
@@ -73,6 +92,114 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
ProcedurePrepareLatch.releaseLatch(latch, this);
}
+ /**
+ * Implementation class can override this method. The default return value is false which means we
+ * will jump to POST_PEER_MODIFICATION and finish the procedure. If returns true, we will jump to
+ * SERIAL_PEER_REOPEN_REGIONS.
+ */
+ protected boolean reopenRegionsAfterRefresh() {
+ return false;
+ }
+
+ /**
+ * The implementation class should override this method if the procedure may enter the serial
+ * related states.
+ */
+ protected boolean enablePeerBeforeFinish() {
+ throw new UnsupportedOperationException();
+ }
+
+ private void refreshPeer(MasterProcedureEnv env, PeerOperationType type) {
+ addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream()
+ .map(sn -> new RefreshPeerProcedure(peerId, type, sn))
+ .toArray(RefreshPeerProcedure[]::new));
+ }
+
+ protected ReplicationPeerConfig getOldPeerConfig() {
+ return null;
+ }
+
+ protected ReplicationPeerConfig getNewPeerConfig() {
+ throw new UnsupportedOperationException();
+ }
+
+ private Stream<TableDescriptor> getTables(MasterProcedureEnv env) throws IOException {
+ ReplicationPeerConfig peerConfig = getNewPeerConfig();
+ Stream<TableDescriptor> stream = env.getMasterServices().getTableDescriptors().getAll().values()
+ .stream().filter(TableDescriptor::hasGlobalReplicationScope)
+ .filter(td -> ReplicationUtils.contains(peerConfig, td.getTableName()));
+ ReplicationPeerConfig oldPeerConfig = getOldPeerConfig();
+ if (oldPeerConfig != null && oldPeerConfig.isSerial()) {
+ stream = stream.filter(td -> !ReplicationUtils.contains(oldPeerConfig, td.getTableName()));
+ }
+ return stream;
+ }
+
+ private void reopenRegions(MasterProcedureEnv env) throws IOException {
+ Stream<TableDescriptor> stream = getTables(env);
+ TableStateManager tsm = env.getMasterServices().getTableStateManager();
+ stream.filter(td -> {
+ try {
+ return tsm.getTableState(td.getTableName()).isEnabled();
+ } catch (TableStateNotFoundException e) {
+ return false;
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }).forEach(td -> {
+ try {
+ addChildProcedure(env.getAssignmentManager().createReopenProcedures(
+ env.getAssignmentManager().getRegionStates().getRegionsOfTable(td.getTableName())));
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ });
+ }
+
+ private void addToMap(Map<String, Long> lastSeqIds, String encodedRegionName, long barrier,
+ ReplicationQueueStorage queueStorage) throws ReplicationException {
+ if (barrier >= 0) {
+ lastSeqIds.put(encodedRegionName, barrier);
+ if (lastSeqIds.size() >= SET_LAST_SEQ_ID_BATCH_SIZE) {
+ queueStorage.setLastSequenceIds(peerId, lastSeqIds);
+ lastSeqIds.clear();
+ }
+ }
+ }
+
+ private void setLastSequenceIdForSerialPeer(MasterProcedureEnv env)
+ throws IOException, ReplicationException {
+ Stream<TableDescriptor> stream = getTables(env);
+ TableStateManager tsm = env.getMasterServices().getTableStateManager();
+ ReplicationQueueStorage queueStorage = env.getReplicationPeerManager().getQueueStorage();
+ Connection conn = env.getMasterServices().getConnection();
+ RegionStates regionStates = env.getAssignmentManager().getRegionStates();
+ MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
+ Map<String, Long> lastSeqIds = new HashMap<String, Long>();
+ stream.forEach(td -> {
+ try {
+ if (tsm.getTableState(td.getTableName()).isEnabled()) {
+ for (Pair<String, Long> name2Barrier : MetaTableAccessor
+ .getTableEncodedRegionNameAndLastBarrier(conn, td.getTableName())) {
+ addToMap(lastSeqIds, name2Barrier.getFirst(), name2Barrier.getSecond().longValue() - 1,
+ queueStorage);
+ }
+ } else {
+ for (RegionInfo region : regionStates.getRegionsOfTable(td.getTableName(), true)) {
+ long maxSequenceId =
+ WALSplitter.getMaxRegionSequenceId(mfs.getFileSystem(), mfs.getRegionDir(region));
+ addToMap(lastSeqIds, region.getEncodedName(), maxSequenceId, queueStorage);
+ }
+ }
+ } catch (IOException | ReplicationException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ if (!lastSeqIds.isEmpty()) {
+ queueStorage.setLastSequenceIds(peerId, lastSeqIds);
+ }
+ }
+
@Override
protected Flow executeFromState(MasterProcedureEnv env, PeerModificationState state)
throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
@@ -104,9 +231,42 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
setNextState(PeerModificationState.REFRESH_PEER_ON_RS);
return Flow.HAS_MORE_STATE;
case REFRESH_PEER_ON_RS:
- addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream()
- .map(sn -> new RefreshPeerProcedure(peerId, getPeerOperationType(), sn))
- .toArray(RefreshPeerProcedure[]::new));
+ refreshPeer(env, getPeerOperationType());
+ setNextState(reopenRegionsAfterRefresh() ? PeerModificationState.SERIAL_PEER_REOPEN_REGIONS
+ : PeerModificationState.POST_PEER_MODIFICATION);
+ return Flow.HAS_MORE_STATE;
+ case SERIAL_PEER_REOPEN_REGIONS:
+ try {
+ reopenRegions(env);
+ } catch (Exception e) {
+ LOG.warn("{} reopen regions for peer {} failed, retry", getClass().getName(), peerId, e);
+ throw new ProcedureYieldException();
+ }
+ setNextState(PeerModificationState.SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID);
+ return Flow.HAS_MORE_STATE;
+ case SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID:
+ try {
+ setLastSequenceIdForSerialPeer(env);
+ } catch (Exception e) {
+ LOG.warn("{} set last sequence id for peer {} failed, retry", getClass().getName(),
+ peerId, e);
+ throw new ProcedureYieldException();
+ }
+ setNextState(enablePeerBeforeFinish() ? PeerModificationState.SERIAL_PEER_SET_PEER_ENABLED
+ : PeerModificationState.POST_PEER_MODIFICATION);
+ return Flow.HAS_MORE_STATE;
+ case SERIAL_PEER_SET_PEER_ENABLED:
+ try {
+ env.getReplicationPeerManager().enablePeer(peerId);
+ } catch (ReplicationException e) {
+ LOG.warn("{} enable peer before finish for peer {} failed, retry", getClass().getName(),
+ peerId, e);
+ throw new ProcedureYieldException();
+ }
+ setNextState(PeerModificationState.SERIAL_PEER_ENABLE_PEER_REFRESH_PEER_ON_RS);
+ return Flow.HAS_MORE_STATE;
+ case SERIAL_PEER_ENABLE_PEER_REFRESH_PEER_ON_RS:
+ refreshPeer(env, PeerOperationType.ENABLE);
setNextState(PeerModificationState.POST_PEER_MODIFICATION);
return Flow.HAS_MORE_STATE;
case POST_PEER_MODIFICATION:
http://git-wip-us.apache.org/repos/asf/hbase/blob/64061f89/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
index 1e93373..a0e01e0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
@@ -85,7 +85,7 @@ public class ReplicationPeerManager {
}
}
- public void preAddPeer(String peerId, ReplicationPeerConfig peerConfig)
+ void preAddPeer(String peerId, ReplicationPeerConfig peerConfig)
throws DoNotRetryIOException, ReplicationException {
if (peerId.contains("-")) {
throw new DoNotRetryIOException("Found invalid peer name: " + peerId);
@@ -109,43 +109,47 @@ public class ReplicationPeerManager {
return desc;
}
- public void preRemovePeer(String peerId) throws DoNotRetryIOException {
+ void preRemovePeer(String peerId) throws DoNotRetryIOException {
checkPeerExists(peerId);
}
- public void preEnablePeer(String peerId) throws DoNotRetryIOException {
+ void preEnablePeer(String peerId) throws DoNotRetryIOException {
ReplicationPeerDescription desc = checkPeerExists(peerId);
if (desc.isEnabled()) {
throw new DoNotRetryIOException("Replication peer " + peerId + " has already been enabled");
}
}
- public void preDisablePeer(String peerId) throws DoNotRetryIOException {
+ void preDisablePeer(String peerId) throws DoNotRetryIOException {
ReplicationPeerDescription desc = checkPeerExists(peerId);
if (!desc.isEnabled()) {
throw new DoNotRetryIOException("Replication peer " + peerId + " has already been disabled");
}
}
- public void preUpdatePeerConfig(String peerId, ReplicationPeerConfig peerConfig)
+ /**
+ * Return the old peer description. Can never be null.
+ */
+ ReplicationPeerDescription preUpdatePeerConfig(String peerId, ReplicationPeerConfig peerConfig)
throws DoNotRetryIOException {
checkPeerConfig(peerConfig);
ReplicationPeerDescription desc = checkPeerExists(peerId);
ReplicationPeerConfig oldPeerConfig = desc.getPeerConfig();
if (!isStringEquals(peerConfig.getClusterKey(), oldPeerConfig.getClusterKey())) {
throw new DoNotRetryIOException(
- "Changing the cluster key on an existing peer is not allowed. Existing key '" +
- oldPeerConfig.getClusterKey() + "' for peer " + peerId + " does not match new key '" +
- peerConfig.getClusterKey() + "'");
+ "Changing the cluster key on an existing peer is not allowed. Existing key '" +
+ oldPeerConfig.getClusterKey() + "' for peer " + peerId + " does not match new key '" +
+ peerConfig.getClusterKey() + "'");
}
if (!isStringEquals(peerConfig.getReplicationEndpointImpl(),
oldPeerConfig.getReplicationEndpointImpl())) {
throw new DoNotRetryIOException("Changing the replication endpoint implementation class " +
- "on an existing peer is not allowed. Existing class '" +
- oldPeerConfig.getReplicationEndpointImpl() + "' for peer " + peerId +
- " does not match new class '" + peerConfig.getReplicationEndpointImpl() + "'");
+ "on an existing peer is not allowed. Existing class '" +
+ oldPeerConfig.getReplicationEndpointImpl() + "' for peer " + peerId +
+ " does not match new class '" + peerConfig.getReplicationEndpointImpl() + "'");
}
+ return desc;
}
public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
@@ -216,7 +220,7 @@ public class ReplicationPeerManager {
return desc != null ? Optional.of(desc.getPeerConfig()) : Optional.empty();
}
- public void removeAllQueuesAndHFileRefs(String peerId) throws ReplicationException {
+ void removeAllQueuesAndHFileRefs(String peerId) throws ReplicationException {
// Here we need two passes to address the problem of claimQueue. Maybe a claimQueue is still
// on-going when the refresh peer config procedure is done, if a RS which has already been
// scanned claims the queue of a RS which has not been scanned yet, we will miss that queue in
@@ -340,7 +344,7 @@ public class ReplicationPeerManager {
public static ReplicationPeerManager create(ZKWatcher zk, Configuration conf)
throws ReplicationException {
ReplicationPeerStorage peerStorage =
- ReplicationStorageFactory.getReplicationPeerStorage(zk, conf);
+ ReplicationStorageFactory.getReplicationPeerStorage(zk, conf);
ConcurrentMap<String, ReplicationPeerDescription> peers = new ConcurrentHashMap<>();
for (String peerId : peerStorage.listPeerIds()) {
ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId);
@@ -348,7 +352,7 @@ public class ReplicationPeerManager {
peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, peerConfig));
}
return new ReplicationPeerManager(peerStorage,
- ReplicationStorageFactory.getReplicationQueueStorage(zk, conf), peers);
+ ReplicationStorageFactory.getReplicationQueueStorage(zk, conf), peers);
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/64061f89/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java
index 3497447..b7e670a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java
@@ -24,6 +24,8 @@ import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
+import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,6 +42,10 @@ public class UpdatePeerConfigProcedure extends ModifyPeerProcedure {
private ReplicationPeerConfig peerConfig;
+ private ReplicationPeerConfig oldPeerConfig;
+
+ private boolean enabled;
+
public UpdatePeerConfigProcedure() {
}
@@ -54,21 +60,53 @@ public class UpdatePeerConfigProcedure extends ModifyPeerProcedure {
}
@Override
+ protected boolean reopenRegionsAfterRefresh() {
+ // If we remove some tables from the peer config then we do not need to enter the extra states
+ // for serial replication. Could try to optimize later since it is not easy to determine this...
+ return peerConfig.isSerial() && (!oldPeerConfig.isSerial() ||
+ !ReplicationUtils.isNamespacesAndTableCFsEqual(peerConfig, oldPeerConfig));
+ }
+
+ @Override
+ protected boolean enablePeerBeforeFinish() {
+ // do not need to test reopenRegionsAfterRefresh since we can only enter here if
+ // reopenRegionsAfterRefresh returns true.
+ return enabled;
+ }
+
+ @Override
+ protected ReplicationPeerConfig getOldPeerConfig() {
+ return oldPeerConfig;
+ }
+
+ @Override
+ protected ReplicationPeerConfig getNewPeerConfig() {
+ return peerConfig;
+ }
+
+ @Override
protected void prePeerModification(MasterProcedureEnv env) throws IOException {
MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
if (cpHost != null) {
cpHost.preUpdateReplicationPeerConfig(peerId, peerConfig);
}
- env.getReplicationPeerManager().preUpdatePeerConfig(peerId, peerConfig);
+ ReplicationPeerDescription desc =
+ env.getReplicationPeerManager().preUpdatePeerConfig(peerId, peerConfig);
+ oldPeerConfig = desc.getPeerConfig();
+ enabled = desc.isEnabled();
}
@Override
protected void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException {
env.getReplicationPeerManager().updatePeerConfig(peerId, peerConfig);
+ if (enabled && reopenRegionsAfterRefresh()) {
+ env.getReplicationPeerManager().disablePeer(peerId);
+ }
}
@Override
- protected void postPeerModification(MasterProcedureEnv env) throws IOException {
+ protected void postPeerModification(MasterProcedureEnv env)
+ throws IOException, ReplicationException {
LOG.info("Successfully updated peer config of {} to {}", peerId, peerConfig);
MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
if (cpHost != null) {
@@ -79,14 +117,23 @@ public class UpdatePeerConfigProcedure extends ModifyPeerProcedure {
@Override
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
super.serializeStateData(serializer);
- serializer.serialize(UpdatePeerConfigStateData.newBuilder()
- .setPeerConfig(ReplicationPeerConfigUtil.convert(peerConfig)).build());
+ UpdatePeerConfigStateData.Builder builder = UpdatePeerConfigStateData.newBuilder()
+ .setPeerConfig(ReplicationPeerConfigUtil.convert(peerConfig));
+ if (oldPeerConfig != null) {
+ builder.setOldPeerConfig(ReplicationPeerConfigUtil.convert(oldPeerConfig));
+ }
+ serializer.serialize(builder.build());
}
@Override
protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
super.deserializeStateData(serializer);
- peerConfig = ReplicationPeerConfigUtil
- .convert(serializer.deserialize(UpdatePeerConfigStateData.class).getPeerConfig());
+ UpdatePeerConfigStateData data = serializer.deserialize(UpdatePeerConfigStateData.class);
+ peerConfig = ReplicationPeerConfigUtil.convert(data.getPeerConfig());
+ if (data.hasOldPeerConfig()) {
+ oldPeerConfig = ReplicationPeerConfigUtil.convert(data.getOldPeerConfig());
+ } else {
+ oldPeerConfig = null;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/64061f89/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
index a02d181..78c1977 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
+import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.util.KeyLocker;
import org.apache.yetus.audience.InterfaceAudience;
@@ -99,19 +100,26 @@ public class PeerProcedureHandlerImpl implements PeerProcedureHandler {
@Override
public void updatePeerConfig(String peerId) throws ReplicationException, IOException {
Lock peerLock = peersLock.acquireLock(peerId);
+ ReplicationPeers peers = replicationSourceManager.getReplicationPeers();
ReplicationPeerImpl peer = null;
ReplicationPeerConfig oldConfig = null;
+ PeerState oldState = null;
boolean success = false;
try {
- peer = replicationSourceManager.getReplicationPeers().getPeer(peerId);
+ peer = peers.getPeer(peerId);
if (peer == null) {
throw new ReplicationException("Peer with id=" + peerId + " is not cached.");
}
oldConfig = peer.getPeerConfig();
- ReplicationPeerConfig newConfig =
- replicationSourceManager.getReplicationPeers().refreshPeerConfig(peerId);
+ oldState = peer.getPeerState();
+ ReplicationPeerConfig newConfig = peers.refreshPeerConfig(peerId);
+ // also need to refresh peer state here. When updating a serial replication peer we may
+ // disable it first and then enable it.
+ PeerState newState = peers.refreshPeerState(peerId);
// RS need to start work with the new replication config change
- if (!ReplicationUtils.isKeyConfigEqual(oldConfig, newConfig)) {
+ if (!ReplicationUtils.isNamespacesAndTableCFsEqual(oldConfig, newConfig) ||
+ oldConfig.isSerial() != newConfig.isSerial() ||
+ (oldState.equals(PeerState.ENABLED) && newState.equals(PeerState.DISABLED))) {
replicationSourceManager.refreshSources(peerId);
}
success = true;
@@ -119,6 +127,7 @@ public class PeerProcedureHandlerImpl implements PeerProcedureHandler {
if (!success && peer != null) {
// Reset peer config if refresh source failed
peer.setPeerConfig(oldConfig);
+ peer.setPeerState(oldState.equals(PeerState.ENABLED));
}
peerLock.unlock();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/64061f89/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 23e1115..3ecc50a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -510,7 +510,7 @@ public class ReplicationSourceManager implements ReplicationListener {
// synchronized on walsById to avoid race with preLogRoll
synchronized (this.walsById) {
NavigableSet<String> wals = walsById.get(queueId).get(logPrefix);
- if (wals != null && !wals.first().equals(log)) {
+ if (wals != null) {
cleanOldLogs(wals, log, inclusive, queueId);
}
}
@@ -755,7 +755,7 @@ public class ReplicationSourceManager implements ReplicationListener {
* @return a sorted set of wal names
*/
@VisibleForTesting
- Map<String, Map<String, NavigableSet<String>>> getWALs() {
+ public Map<String, Map<String, NavigableSet<String>>> getWALs() {
return Collections.unmodifiableMap(walsById);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/64061f89/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
index 960d473..22b2de7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
@@ -157,4 +157,12 @@ class WALEntryBatch {
public void setLastSeqId(String region, long sequenceId) {
lastSeqIds.put(region, sequenceId);
}
+
+ @Override
+ public String toString() {
+ return "WALEntryBatch [walEntries=" + walEntries + ", lastWalPath=" + lastWalPath +
+ ", lastWalPosition=" + lastWalPosition + ", nbRowKeys=" + nbRowKeys + ", nbHFiles=" +
+ nbHFiles + ", heapSize=" + heapSize + ", lastSeqIds=" + lastSeqIds + ", endOfFile=" +
+ endOfFile + "]";
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/64061f89/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java
new file mode 100644
index 0000000..83afd81
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.UUID;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALProvider;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.rules.TestName;
+
+/**
+ * Base class for testing serial replication.
+ */
+public class SerialReplicationTestBase {
+
+ protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+ protected static String PEER_ID = "1";
+
+ protected static byte[] CF = Bytes.toBytes("CF");
+
+ protected static byte[] CQ = Bytes.toBytes("CQ");
+
+ protected static FileSystem FS;
+
+ protected static Path LOG_DIR;
+
+ protected static WALProvider.Writer WRITER;
+
+ @Rule
+ public final TestName name = new TestName();
+
+ protected Path logPath;
+
+ public static final class LocalReplicationEndpoint extends BaseReplicationEndpoint {
+
+ private static final UUID PEER_UUID = UUID.randomUUID();
+
+ @Override
+ public UUID getPeerUUID() {
+ return PEER_UUID;
+ }
+
+ @Override
+ public boolean replicate(ReplicateContext replicateContext) {
+ synchronized (WRITER) {
+ try {
+ for (Entry entry : replicateContext.getEntries()) {
+ WRITER.append(entry);
+ }
+ WRITER.sync(false);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public void start() {
+ startAsync();
+ }
+
+ @Override
+ public void stop() {
+ stopAsync();
+ }
+
+ @Override
+ protected void doStart() {
+ notifyStarted();
+ }
+
+ @Override
+ protected void doStop() {
+ notifyStopped();
+ }
+ }
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ UTIL.getConfiguration().setInt("replication.source.nb.capacity", 10);
+ UTIL.startMiniCluster(3);
+ // disable balancer
+ UTIL.getAdmin().balancerSwitch(false, true);
+ LOG_DIR = UTIL.getDataTestDirOnTestFS("replicated");
+ FS = UTIL.getTestFileSystem();
+ FS.mkdirs(LOG_DIR);
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ UTIL.shutdownMiniCluster();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ UTIL.getAdmin().removeReplicationPeer(PEER_ID);
+ rollAllWALs();
+ if (WRITER != null) {
+ WRITER.close();
+ WRITER = null;
+ }
+ }
+
+ protected static void moveRegion(RegionInfo region, HRegionServer rs) throws Exception {
+ UTIL.getAdmin().move(region.getEncodedNameAsBytes(),
+ Bytes.toBytes(rs.getServerName().getServerName()));
+ UTIL.waitFor(30000, new ExplainingPredicate<Exception>() {
+
+ @Override
+ public boolean evaluate() throws Exception {
+ return rs.getRegion(region.getEncodedName()) != null;
+ }
+
+ @Override
+ public String explainFailure() throws Exception {
+ return region + " is still not on " + rs;
+ }
+ });
+ }
+
+ protected static void rollAllWALs() throws Exception {
+ for (RegionServerThread t : UTIL.getMiniHBaseCluster().getLiveRegionServerThreads()) {
+ t.getRegionServer().getWalRoller().requestRollAll();
+ }
+ UTIL.waitFor(30000, new ExplainingPredicate<Exception>() {
+
+ @Override
+ public boolean evaluate() throws Exception {
+ return UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().stream()
+ .map(t -> t.getRegionServer()).allMatch(HRegionServer::walRollRequestFinished);
+ }
+
+ @Override
+ public String explainFailure() throws Exception {
+ return "Log roll has not finished yet";
+ }
+ });
+ }
+
+ protected final void setupWALWriter() throws IOException {
+ logPath = new Path(LOG_DIR, name.getMethodName());
+ WRITER = WALFactory.createWALWriter(FS, logPath, UTIL.getConfiguration());
+ }
+
+ protected final void waitUntilReplicationDone(int expectedEntries) throws Exception {
+ UTIL.waitFor(30000, new ExplainingPredicate<Exception>() {
+
+ @Override
+ public boolean evaluate() throws Exception {
+ try (WAL.Reader reader = WALFactory.createReader(FS, logPath, UTIL.getConfiguration())) {
+ int count = 0;
+ while (reader.next() != null) {
+ count++;
+ }
+ return count >= expectedEntries;
+ } catch (IOException e) {
+ return false;
+ }
+ }
+
+ @Override
+ public String explainFailure() throws Exception {
+ return "Not enough entries replicated";
+ }
+ });
+ }
+
+ protected final void addPeer(boolean enabled) throws IOException {
+ UTIL.getAdmin().addReplicationPeer(PEER_ID,
+ ReplicationPeerConfig.newBuilder().setClusterKey("127.0.0.1:2181:/hbase")
+ .setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName()).setSerial(true)
+ .build(),
+ enabled);
+ }
+
+ protected final void checkOrder(int expectedEntries) throws IOException {
+ try (WAL.Reader reader =
+ WALFactory.createReader(UTIL.getTestFileSystem(), logPath, UTIL.getConfiguration())) {
+ long seqId = -1L;
+ int count = 0;
+ for (Entry entry;;) {
+ entry = reader.next();
+ if (entry == null) {
+ break;
+ }
+ assertTrue(
+ "Sequence id go backwards from " + seqId + " to " + entry.getKey().getSequenceId(),
+ entry.getKey().getSequenceId() >= seqId);
+ count++;
+ }
+ assertEquals(expectedEntries, count);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/64061f89/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestAddToSerialReplicationPeer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestAddToSerialReplicationPeer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestAddToSerialReplicationPeer.java
new file mode 100644
index 0000000..64b5bb1
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestAddToSerialReplicationPeer.java
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.io.IOException;
+import java.util.Collections;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
+import org.apache.hadoop.hbase.replication.regionserver.Replication;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
+
+/**
+ * Testcase for HBASE-20147.
+ */
+@Category({ ReplicationTests.class, MediumTests.class })
+public class TestAddToSerialReplicationPeer extends SerialReplicationTestBase {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestAddToSerialReplicationPeer.class);
+
+ @Before
+ public void setUp() throws IOException, StreamLacksCapabilityException {
+ setupWALWriter();
+ }
+
+ // make sure that we will start replication for the sequence id after move, that's what we want to
+ // test here.
+ private void moveRegionAndArchiveOldWals(RegionInfo region, HRegionServer rs) throws Exception {
+ moveRegion(region, rs);
+ rollAllWALs();
+ }
+
+ private void waitUntilReplicatedToTheCurrentWALFile(HRegionServer rs) throws Exception {
+ Path path = ((AbstractFSWAL<?>) rs.getWAL(null)).getCurrentFileName();
+ String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(path.getName());
+ UTIL.waitFor(30000, new ExplainingPredicate<Exception>() {
+
+ @Override
+ public boolean evaluate() throws Exception {
+ ReplicationSourceManager manager =
+ ((Replication) rs.getReplicationSourceService()).getReplicationManager();
+ return manager.getWALs().get(PEER_ID).get(logPrefix).size() == 1;
+ }
+
+ @Override
+ public String explainFailure() throws Exception {
+ return "Still not replicated to the current WAL file yet";
+ }
+ });
+ }
+
+ @Test
+ public void testAddPeer() throws Exception {
+ TableName tableName = TableName.valueOf(name.getMethodName());
+ UTIL.getAdmin().createTable(
+ TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder
+ .newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build());
+ UTIL.waitTableAvailable(tableName);
+ try (Table table = UTIL.getConnection().getTable(tableName)) {
+ for (int i = 0; i < 100; i++) {
+ table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
+ }
+ }
+ RegionInfo region = UTIL.getAdmin().getRegions(tableName).get(0);
+ HRegionServer rs = UTIL.getOtherRegionServer(UTIL.getRSForFirstRegionInTable(tableName));
+ moveRegionAndArchiveOldWals(region, rs);
+ addPeer(true);
+ try (Table table = UTIL.getConnection().getTable(tableName)) {
+ for (int i = 0; i < 100; i++) {
+ table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
+ }
+ }
+ waitUntilReplicationDone(100);
+ checkOrder(100);
+ }
+
+ @Test
+ public void testChangeToSerial() throws Exception {
+ ReplicationPeerConfig peerConfig =
+ ReplicationPeerConfig.newBuilder().setClusterKey("127.0.0.1:2181:/hbase")
+ .setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName()).build();
+ UTIL.getAdmin().addReplicationPeer(PEER_ID, peerConfig, true);
+
+ TableName tableName = TableName.valueOf(name.getMethodName());
+
+ UTIL.getAdmin().createTable(
+ TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder
+ .newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build());
+ UTIL.waitTableAvailable(tableName);
+ try (Table table = UTIL.getConnection().getTable(tableName)) {
+ for (int i = 0; i < 100; i++) {
+ table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
+ }
+ }
+
+ RegionInfo region = UTIL.getAdmin().getRegions(tableName).get(0);
+ HRegionServer srcRs = UTIL.getRSForFirstRegionInTable(tableName);
+ HRegionServer rs = UTIL.getOtherRegionServer(srcRs);
+ moveRegionAndArchiveOldWals(region, rs);
+ waitUntilReplicationDone(100);
+ waitUntilReplicatedToTheCurrentWALFile(srcRs);
+
+ UTIL.getAdmin().disableReplicationPeer(PEER_ID);
+ UTIL.getAdmin().updateReplicationPeerConfig(PEER_ID,
+ ReplicationPeerConfig.newBuilder(peerConfig).setSerial(true).build());
+ UTIL.getAdmin().enableReplicationPeer(PEER_ID);
+
+ try (Table table = UTIL.getConnection().getTable(tableName)) {
+ for (int i = 0; i < 100; i++) {
+ table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
+ }
+ }
+ waitUntilReplicationDone(200);
+ checkOrder(200);
+ }
+
+ @Test
+ public void testAddToSerialPeer() throws Exception {
+ ReplicationPeerConfig peerConfig =
+ ReplicationPeerConfig.newBuilder().setClusterKey("127.0.0.1:2181:/hbase")
+ .setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName())
+ .setReplicateAllUserTables(false).setSerial(true).build();
+ UTIL.getAdmin().addReplicationPeer(PEER_ID, peerConfig, true);
+
+ TableName tableName = TableName.valueOf(name.getMethodName());
+ UTIL.getAdmin().createTable(
+ TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder
+ .newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build());
+ UTIL.waitTableAvailable(tableName);
+ try (Table table = UTIL.getConnection().getTable(tableName)) {
+ for (int i = 0; i < 100; i++) {
+ table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
+ }
+ }
+ RegionInfo region = UTIL.getAdmin().getRegions(tableName).get(0);
+ HRegionServer srcRs = UTIL.getRSForFirstRegionInTable(tableName);
+ HRegionServer rs = UTIL.getOtherRegionServer(srcRs);
+ moveRegionAndArchiveOldWals(region, rs);
+ waitUntilReplicatedToTheCurrentWALFile(rs);
+ UTIL.getAdmin().disableReplicationPeer(PEER_ID);
+ UTIL.getAdmin().updateReplicationPeerConfig(PEER_ID,
+ ReplicationPeerConfig.newBuilder(peerConfig)
+ .setTableCFsMap(ImmutableMap.of(tableName, Collections.emptyList())).build());
+ UTIL.getAdmin().enableReplicationPeer(PEER_ID);
+ try (Table table = UTIL.getConnection().getTable(tableName)) {
+ for (int i = 0; i < 100; i++) {
+ table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
+ }
+ }
+ waitUntilReplicationDone(100);
+ checkOrder(100);
+ }
+
+ @Test
+ public void testDisabledTable() throws Exception {
+ TableName tableName = TableName.valueOf(name.getMethodName());
+ UTIL.getAdmin().createTable(
+ TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder
+ .newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build());
+ UTIL.waitTableAvailable(tableName);
+ try (Table table = UTIL.getConnection().getTable(tableName)) {
+ for (int i = 0; i < 100; i++) {
+ table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
+ }
+ }
+ UTIL.getAdmin().disableTable(tableName);
+ rollAllWALs();
+ addPeer(true);
+ UTIL.getAdmin().enableTable(tableName);
+ try (Table table = UTIL.getConnection().getTable(tableName)) {
+ for (int i = 0; i < 100; i++) {
+ table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
+ }
+ }
+ waitUntilReplicationDone(100);
+ checkOrder(100);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/64061f89/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java
index 7b440ce..25333ad 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java
@@ -23,211 +23,49 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
-import java.io.UncheckedIOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
-import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
-import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALFactory;
-import org.apache.hadoop.hbase.wal.WALProvider;
-import org.junit.After;
-import org.junit.AfterClass;
import org.junit.Before;
-import org.junit.BeforeClass;
import org.junit.ClassRule;
-import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
-@Category({ ReplicationTests.class, LargeTests.class })
-public class TestSerialReplication {
+@Category({ ReplicationTests.class, MediumTests.class })
+public class TestSerialReplication extends SerialReplicationTestBase {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestSerialReplication.class);
- private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
-
- private static String PEER_ID = "1";
-
- private static byte[] CF = Bytes.toBytes("CF");
-
- private static byte[] CQ = Bytes.toBytes("CQ");
-
- private static FileSystem FS;
-
- private static Path LOG_DIR;
-
- private static WALProvider.Writer WRITER;
-
- public static final class LocalReplicationEndpoint extends BaseReplicationEndpoint {
-
- private static final UUID PEER_UUID = UUID.randomUUID();
-
- @Override
- public UUID getPeerUUID() {
- return PEER_UUID;
- }
-
- @Override
- public boolean replicate(ReplicateContext replicateContext) {
- synchronized (WRITER) {
- try {
- for (Entry entry : replicateContext.getEntries()) {
- WRITER.append(entry);
- }
- WRITER.sync(false);
- } catch (IOException e) {
- throw new UncheckedIOException(e);
- }
- }
- return true;
- }
-
- @Override
- public void start() {
- startAsync();
- }
-
- @Override
- public void stop() {
- stopAsync();
- }
-
- @Override
- protected void doStart() {
- notifyStarted();
- }
-
- @Override
- protected void doStop() {
- notifyStopped();
- }
- }
-
- @BeforeClass
- public static void setUpBeforeClass() throws Exception {
- UTIL.getConfiguration().setInt("replication.source.nb.capacity", 10);
- UTIL.startMiniCluster(3);
- // disable balancer
- UTIL.getAdmin().balancerSwitch(false, true);
- LOG_DIR = UTIL.getDataTestDirOnTestFS("replicated");
- FS = UTIL.getTestFileSystem();
- FS.mkdirs(LOG_DIR);
- }
-
- @AfterClass
- public static void tearDownAfterClass() throws Exception {
- UTIL.shutdownMiniCluster();
- }
-
- @Rule
- public final TestName name = new TestName();
-
- private Path logPath;
-
@Before
public void setUp() throws IOException, StreamLacksCapabilityException {
- logPath = new Path(LOG_DIR, name.getMethodName());
- WRITER = WALFactory.createWALWriter(FS, logPath, UTIL.getConfiguration());
+ setupWALWriter();
// add in disable state, so later when enabling it all sources will start push together.
- UTIL.getAdmin().addReplicationPeer(PEER_ID,
- ReplicationPeerConfig.newBuilder().setClusterKey("127.0.0.1:2181:/hbase")
- .setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName()).setSerial(true)
- .build(),
- false);
- }
-
- @After
- public void tearDown() throws Exception {
- UTIL.getAdmin().removeReplicationPeer(PEER_ID);
- for (RegionServerThread t : UTIL.getMiniHBaseCluster().getLiveRegionServerThreads()) {
- t.getRegionServer().getWalRoller().requestRollAll();
- }
- UTIL.waitFor(30000, new ExplainingPredicate<Exception>() {
-
- @Override
- public boolean evaluate() throws Exception {
- return UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().stream()
- .map(t -> t.getRegionServer()).allMatch(HRegionServer::walRollRequestFinished);
- }
-
- @Override
- public String explainFailure() throws Exception {
- return "Log roll has not finished yet";
- }
- });
- for (RegionServerThread t : UTIL.getMiniHBaseCluster().getLiveRegionServerThreads()) {
- t.getRegionServer().getWalRoller().requestRollAll();
- }
- if (WRITER != null) {
- WRITER.close();
- WRITER = null;
- }
- }
-
- private void moveRegion(RegionInfo region, HRegionServer rs) throws Exception {
- UTIL.getAdmin().move(region.getEncodedNameAsBytes(),
- Bytes.toBytes(rs.getServerName().getServerName()));
- UTIL.waitFor(30000, new ExplainingPredicate<Exception>() {
-
- @Override
- public boolean evaluate() throws Exception {
- return rs.getRegion(region.getEncodedName()) != null;
- }
-
- @Override
- public String explainFailure() throws Exception {
- return region + " is still not on " + rs;
- }
- });
+ addPeer(false);
}
private void enablePeerAndWaitUntilReplicationDone(int expectedEntries) throws Exception {
UTIL.getAdmin().enableReplicationPeer(PEER_ID);
- UTIL.waitFor(30000, new ExplainingPredicate<Exception>() {
-
- @Override
- public boolean evaluate() throws Exception {
- try (WAL.Reader reader = WALFactory.createReader(FS, logPath, UTIL.getConfiguration())) {
- int count = 0;
- while (reader.next() != null) {
- count++;
- }
- return count >= expectedEntries;
- } catch (IOException e) {
- return false;
- }
- }
-
- @Override
- public String explainFailure() throws Exception {
- return "Not enough entries replicated";
- }
- });
+ waitUntilReplicationDone(expectedEntries);
}
@Test
@@ -251,22 +89,7 @@ public class TestSerialReplication {
}
}
enablePeerAndWaitUntilReplicationDone(200);
- try (WAL.Reader reader =
- WALFactory.createReader(UTIL.getTestFileSystem(), logPath, UTIL.getConfiguration())) {
- long seqId = -1L;
- int count = 0;
- for (Entry entry;;) {
- entry = reader.next();
- if (entry == null) {
- break;
- }
- assertTrue(
- "Sequence id go backwards from " + seqId + " to " + entry.getKey().getSequenceId(),
- entry.getKey().getSequenceId() >= seqId);
- count++;
- }
- assertEquals(200, count);
- }
+ checkOrder(200);
}
@Test