You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by el...@apache.org on 2018/12/07 22:29:18 UTC
[35/51] [abbrv] hbase git commit: HBASE-21486 The current replication
implementation for peer in STANDBY state breaks serial replication
HBASE-21486 The current replication implementation for peer in STANDBY state breaks serial replication
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/766aa1bf
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/766aa1bf
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/766aa1bf
Branch: refs/heads/HBASE-20952
Commit: 766aa1bfccb48b4d228dd86c100fb48e9c9d61fa
Parents: dfeab9f
Author: Duo Zhang <zh...@apache.org>
Authored: Wed Nov 28 18:00:18 2018 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Sat Dec 1 12:15:18 2018 +0800
----------------------------------------------------------------------
.../src/main/protobuf/MasterProcedure.proto | 19 ++--
.../replication/AbstractPeerProcedure.java | 97 ++++++++++++++++-
.../master/replication/ModifyPeerProcedure.java | 81 --------------
...ransitPeerSyncReplicationStateProcedure.java | 73 +++++++++----
.../replication/SyncReplicationTestBase.java | 30 ++++--
.../replication/TestSerialSyncReplication.java | 106 +++++++++++++++++++
.../TestSyncReplicationRemoveRemoteWAL.java | 21 +---
7 files changed, 291 insertions(+), 136 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/766aa1bf/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
index 44ac952..cc0c6ba 100644
--- a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
@@ -398,15 +398,16 @@ enum PeerSyncReplicationStateTransitionState {
PRE_PEER_SYNC_REPLICATION_STATE_TRANSITION = 1;
SET_PEER_NEW_SYNC_REPLICATION_STATE = 2;
REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_BEGIN = 3;
- REPLAY_REMOTE_WAL_IN_PEER = 4;
- REMOVE_ALL_REPLICATION_QUEUES_IN_PEER = 5;
- REOPEN_ALL_REGIONS_IN_PEER = 6;
- TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE = 7;
- REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_END = 8;
- SYNC_REPLICATION_SET_PEER_ENABLED = 9;
- SYNC_REPLICATION_ENABLE_PEER_REFRESH_PEER_ON_RS = 10;
- CREATE_DIR_FOR_REMOTE_WAL = 11;
- POST_PEER_SYNC_REPLICATION_STATE_TRANSITION = 12;
+ REOPEN_ALL_REGIONS_IN_PEER = 4;
+ SYNC_REPLICATION_UPDATE_LAST_PUSHED_SEQ_ID_FOR_SERIAL_PEER = 5;
+ REPLAY_REMOTE_WAL_IN_PEER = 6;
+ REMOVE_ALL_REPLICATION_QUEUES_IN_PEER = 7;
+ TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE = 8;
+ REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_END = 9;
+ SYNC_REPLICATION_SET_PEER_ENABLED = 10;
+ SYNC_REPLICATION_ENABLE_PEER_REFRESH_PEER_ON_RS = 11;
+ CREATE_DIR_FOR_REMOTE_WAL = 12;
+ POST_PEER_SYNC_REPLICATION_STATE_TRANSITION = 13;
}
message PeerModificationStateData {
http://git-wip-us.apache.org/repos/asf/hbase/blob/766aa1bf/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java
index 882a050..755e0a3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java
@@ -17,11 +17,27 @@
*/
package org.apache.hadoop.hbase.master.replication;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.master.TableStateManager;
+import org.apache.hadoop.hbase.master.TableStateManager.TableStateNotFoundException;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface;
import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ReplicationUtils;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
@@ -29,8 +45,15 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
* The base class for all replication peer related procedure.
*/
@InterfaceAudience.Private
-public abstract class AbstractPeerProcedure<TState>
- extends AbstractPeerNoLockProcedure<TState> implements PeerProcedureInterface {
+public abstract class AbstractPeerProcedure<TState> extends AbstractPeerNoLockProcedure<TState>
+ implements PeerProcedureInterface {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractPeerProcedure.class);
+
+ protected static final int UPDATE_LAST_SEQ_ID_BATCH_SIZE = 1000;
+
+ // The sleep interval when waiting table to be enabled or disabled.
+ protected static final int SLEEP_INTERVAL_MS = 1000;
// used to keep compatible with old client where we can only returns after updateStorage.
protected ProcedurePrepareLatch latch;
@@ -75,4 +98,74 @@ public abstract class AbstractPeerProcedure<TState>
protected void enablePeer(MasterProcedureEnv env) throws ReplicationException {
env.getReplicationPeerManager().enablePeer(peerId);
}
+
+ private void addToMap(Map<String, Long> lastSeqIds, String encodedRegionName, long barrier,
+ ReplicationQueueStorage queueStorage) throws ReplicationException {
+ if (barrier >= 0) {
+ lastSeqIds.put(encodedRegionName, barrier);
+ if (lastSeqIds.size() >= UPDATE_LAST_SEQ_ID_BATCH_SIZE) {
+ queueStorage.setLastSequenceIds(peerId, lastSeqIds);
+ lastSeqIds.clear();
+ }
+ }
+ }
+
+ protected final void setLastPushedSequenceId(MasterProcedureEnv env,
+ ReplicationPeerConfig peerConfig) throws IOException, ReplicationException {
+ Map<String, Long> lastSeqIds = new HashMap<String, Long>();
+ for (TableDescriptor td : env.getMasterServices().getTableDescriptors().getAll().values()) {
+ if (!td.hasGlobalReplicationScope()) {
+ continue;
+ }
+ TableName tn = td.getTableName();
+ if (!ReplicationUtils.contains(peerConfig, tn)) {
+ continue;
+ }
+ setLastPushedSequenceIdForTable(env, tn, lastSeqIds);
+ }
+ if (!lastSeqIds.isEmpty()) {
+ env.getReplicationPeerManager().getQueueStorage().setLastSequenceIds(peerId, lastSeqIds);
+ }
+ }
+
+ // If the table is currently disabling, then we need to wait until it is disabled.We will write
+ // replication barrier for a disabled table. And return whether we need to update the last pushed
+ // sequence id, if the table has been deleted already, i.e, we hit TableStateNotFoundException,
+ // then we do not need to update last pushed sequence id for this table.
+ private boolean needSetLastPushedSequenceId(TableStateManager tsm, TableName tn)
+ throws IOException {
+ for (;;) {
+ try {
+ if (!tsm.getTableState(tn).isDisabling()) {
+ return true;
+ }
+ Thread.sleep(SLEEP_INTERVAL_MS);
+ } catch (TableStateNotFoundException e) {
+ return false;
+ } catch (InterruptedException e) {
+ throw (IOException) new InterruptedIOException(e.getMessage()).initCause(e);
+ }
+ }
+ }
+
+ // Will put the encodedRegionName->lastPushedSeqId pair into the map passed in, if the map is
+ // large enough we will call queueStorage.setLastSequenceIds and clear the map. So the caller
+ // should not forget to check whether the map is empty at last, if not you should call
+ // queueStorage.setLastSequenceIds to write out the remaining entries in the map.
+ protected final void setLastPushedSequenceIdForTable(MasterProcedureEnv env, TableName tableName,
+ Map<String, Long> lastSeqIds) throws IOException, ReplicationException {
+ TableStateManager tsm = env.getMasterServices().getTableStateManager();
+ ReplicationQueueStorage queueStorage = env.getReplicationPeerManager().getQueueStorage();
+ Connection conn = env.getMasterServices().getConnection();
+ if (!needSetLastPushedSequenceId(tsm, tableName)) {
+ LOG.debug("Skip settting last pushed sequence id for {}", tableName);
+ return;
+ }
+ for (Pair<String, Long> name2Barrier : MetaTableAccessor
+ .getTableEncodedRegionNameAndLastBarrier(conn, tableName)) {
+ LOG.trace("Update last pushed sequence id for {}, {}", tableName, name2Barrier);
+ addToMap(lastSeqIds, name2Barrier.getFirst(), name2Barrier.getSecond().longValue() - 1,
+ queueStorage);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/766aa1bf/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
index 9550fb0..d5d2779 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
@@ -19,11 +19,7 @@ package org.apache.hadoop.hbase.master.replication;
import java.io.IOException;
import java.io.InterruptedIOException;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.master.TableStateManager;
@@ -35,9 +31,7 @@ import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
-import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
-import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,11 +49,6 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
private static final Logger LOG = LoggerFactory.getLogger(ModifyPeerProcedure.class);
- protected static final int UPDATE_LAST_SEQ_ID_BATCH_SIZE = 1000;
-
- // The sleep interval when waiting table to be enabled or disabled.
- protected static final int SLEEP_INTERVAL_MS = 1000;
-
protected ModifyPeerProcedure() {
}
@@ -169,76 +158,6 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
}
}
- private void addToMap(Map<String, Long> lastSeqIds, String encodedRegionName, long barrier,
- ReplicationQueueStorage queueStorage) throws ReplicationException {
- if (barrier >= 0) {
- lastSeqIds.put(encodedRegionName, barrier);
- if (lastSeqIds.size() >= UPDATE_LAST_SEQ_ID_BATCH_SIZE) {
- queueStorage.setLastSequenceIds(peerId, lastSeqIds);
- lastSeqIds.clear();
- }
- }
- }
-
- protected final void setLastPushedSequenceId(MasterProcedureEnv env,
- ReplicationPeerConfig peerConfig) throws IOException, ReplicationException {
- Map<String, Long> lastSeqIds = new HashMap<String, Long>();
- for (TableDescriptor td : env.getMasterServices().getTableDescriptors().getAll().values()) {
- if (!td.hasGlobalReplicationScope()) {
- continue;
- }
- TableName tn = td.getTableName();
- if (!ReplicationUtils.contains(peerConfig, tn)) {
- continue;
- }
- setLastPushedSequenceIdForTable(env, tn, lastSeqIds);
- }
- if (!lastSeqIds.isEmpty()) {
- env.getReplicationPeerManager().getQueueStorage().setLastSequenceIds(peerId, lastSeqIds);
- }
- }
-
- // If the table is currently disabling, then we need to wait until it is disabled.We will write
- // replication barrier for a disabled table. And return whether we need to update the last pushed
- // sequence id, if the table has been deleted already, i.e, we hit TableStateNotFoundException,
- // then we do not need to update last pushed sequence id for this table.
- private boolean needSetLastPushedSequenceId(TableStateManager tsm, TableName tn)
- throws IOException {
- for (;;) {
- try {
- if (!tsm.getTableState(tn).isDisabling()) {
- return true;
- }
- Thread.sleep(SLEEP_INTERVAL_MS);
- } catch (TableStateNotFoundException e) {
- return false;
- } catch (InterruptedException e) {
- throw (IOException) new InterruptedIOException(e.getMessage()).initCause(e);
- }
- }
- }
-
- // Will put the encodedRegionName->lastPushedSeqId pair into the map passed in, if the map is
- // large enough we will call queueStorage.setLastSequenceIds and clear the map. So the caller
- // should not forget to check whether the map is empty at last, if not you should call
- // queueStorage.setLastSequenceIds to write out the remaining entries in the map.
- protected final void setLastPushedSequenceIdForTable(MasterProcedureEnv env, TableName tableName,
- Map<String, Long> lastSeqIds) throws IOException, ReplicationException {
- TableStateManager tsm = env.getMasterServices().getTableStateManager();
- ReplicationQueueStorage queueStorage = env.getReplicationPeerManager().getQueueStorage();
- Connection conn = env.getMasterServices().getConnection();
- if (!needSetLastPushedSequenceId(tsm, tableName)) {
- LOG.debug("Skip settting last pushed sequence id for {}", tableName);
- return;
- }
- for (Pair<String, Long> name2Barrier : MetaTableAccessor
- .getTableEncodedRegionNameAndLastBarrier(conn, tableName)) {
- LOG.trace("Update last pushed sequence id for {}, {}", tableName, name2Barrier);
- addToMap(lastSeqIds, name2Barrier.getFirst(), name2Barrier.getSecond().longValue() - 1,
- queueStorage);
- }
- }
-
@Override
protected Flow executeFromState(MasterProcedureEnv env, PeerModificationState state)
throws ProcedureSuspendedException, InterruptedException {
http://git-wip-us.apache.org/repos/asf/hbase/blob/766aa1bf/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
index 8c6232f..fcf41be 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
@@ -50,7 +50,7 @@ public class TransitPeerSyncReplicationStateProcedure
extends AbstractPeerProcedure<PeerSyncReplicationStateTransitionState> {
private static final Logger LOG =
- LoggerFactory.getLogger(TransitPeerSyncReplicationStateProcedure.class);
+ LoggerFactory.getLogger(TransitPeerSyncReplicationStateProcedure.class);
protected SyncReplicationState fromState;
@@ -58,6 +58,8 @@ public class TransitPeerSyncReplicationStateProcedure
private boolean enabled;
+ private boolean serial;
+
public TransitPeerSyncReplicationStateProcedure() {
}
@@ -75,8 +77,8 @@ public class TransitPeerSyncReplicationStateProcedure
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
super.serializeStateData(serializer);
TransitPeerSyncReplicationStateStateData.Builder builder =
- TransitPeerSyncReplicationStateStateData.newBuilder()
- .setToState(ReplicationPeerConfigUtil.toSyncReplicationState(toState));
+ TransitPeerSyncReplicationStateStateData.newBuilder()
+ .setToState(ReplicationPeerConfigUtil.toSyncReplicationState(toState));
if (fromState != null) {
builder.setFromState(ReplicationPeerConfigUtil.toSyncReplicationState(fromState));
}
@@ -87,7 +89,7 @@ public class TransitPeerSyncReplicationStateProcedure
protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
super.deserializeStateData(serializer);
TransitPeerSyncReplicationStateStateData data =
- serializer.deserialize(TransitPeerSyncReplicationStateStateData.class);
+ serializer.deserialize(TransitPeerSyncReplicationStateStateData.class);
toState = ReplicationPeerConfigUtil.toSyncReplicationState(data.getToState());
if (data.hasFromState()) {
fromState = ReplicationPeerConfigUtil.toSyncReplicationState(data.getFromState());
@@ -129,6 +131,7 @@ public class TransitPeerSyncReplicationStateProcedure
}
fromState = desc.getSyncReplicationState();
enabled = desc.isEnabled();
+ serial = desc.getPeerConfig().isSerial();
}
private void postTransit(MasterProcedureEnv env) throws IOException {
@@ -174,7 +177,11 @@ public class TransitPeerSyncReplicationStateProcedure
: PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER);
} else {
assert toState.equals(SyncReplicationState.DOWNGRADE_ACTIVE);
- setNextState(PeerSyncReplicationStateTransitionState.REPLAY_REMOTE_WAL_IN_PEER);
+ // for serial peer, we need to reopen all the regions and then update the last pushed sequence
+ // id, before replaying any remote wals, so that the serial replication will not be stuck, and
+ // also guarantee the order when replicating the remote wal back.
+ setNextState(serial ? PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER
+ : PeerSyncReplicationStateTransitionState.REPLAY_REMOTE_WAL_IN_PEER);
}
}
@@ -183,6 +190,11 @@ public class TransitPeerSyncReplicationStateProcedure
setNextState(
enabled ? PeerSyncReplicationStateTransitionState.SYNC_REPLICATION_SET_PEER_ENABLED
: PeerSyncReplicationStateTransitionState.CREATE_DIR_FOR_REMOTE_WAL);
+ } else if (fromState == SyncReplicationState.STANDBY) {
+ assert toState.equals(SyncReplicationState.DOWNGRADE_ACTIVE);
+ setNextState(serial && enabled
+ ? PeerSyncReplicationStateTransitionState.SYNC_REPLICATION_SET_PEER_ENABLED
+ : PeerSyncReplicationStateTransitionState.POST_PEER_SYNC_REPLICATION_STATE_TRANSITION);
} else {
setNextState(
PeerSyncReplicationStateTransitionState.POST_PEER_SYNC_REPLICATION_STATE_TRANSITION);
@@ -196,14 +208,20 @@ public class TransitPeerSyncReplicationStateProcedure
@VisibleForTesting
protected void setPeerNewSyncReplicationState(MasterProcedureEnv env)
throws ReplicationException {
- env.getReplicationPeerManager().setPeerNewSyncReplicationState(peerId, toState);
- if (toState.equals(SyncReplicationState.STANDBY) && enabled) {
- // disable the peer if we are going to transit to STANDBY state, as we need to remove
+ if (toState.equals(SyncReplicationState.STANDBY) ||
+ (fromState.equals(SyncReplicationState.STANDBY) && serial) && enabled) {
+ // Disable the peer if we are going to transit to STANDBY state, as we need to remove
// all the pending replication files. If we do not disable the peer and delete the wal
// queues on zk directly, RS will get NoNode exception when updating the wal position
// and crash.
+ // Disable the peer if we are going to transit from STANDBY to DOWNGRADE_ACTIVE, and the
+ // replication is serial, as we need to update the lastPushedSequence id after we reopen all
+ // the regions, and for performance reason here we will update in batch, without using CAS, if
+ // we are still replicating at RS side, we may accidentally update the last pushed sequence id
+ // to a less value and cause the replication to be stuck.
env.getReplicationPeerManager().disablePeer(peerId);
}
+ env.getReplicationPeerManager().setPeerNewSyncReplicationState(peerId, toState);
}
@VisibleForTesting
@@ -240,7 +258,7 @@ public class TransitPeerSyncReplicationStateProcedure
long backoff = ProcedureUtil.getBackoffTimeMs(attempts);
LOG.warn(
"Failed to update peer storage for peer {} when starting transiting sync " +
- "replication peer state from {} to {}, sleep {} secs and retry",
+ "replication peer state from {} to {}, sleep {} secs and retry",
peerId, fromState, toState, backoff / 1000, e);
throw suspend(backoff);
}
@@ -254,6 +272,30 @@ public class TransitPeerSyncReplicationStateProcedure
.toArray(RefreshPeerProcedure[]::new));
setNextStateAfterRefreshBegin();
return Flow.HAS_MORE_STATE;
+ case REOPEN_ALL_REGIONS_IN_PEER:
+ reopenRegions(env);
+ if (fromState.equals(SyncReplicationState.STANDBY)) {
+ assert serial;
+ setNextState(
+ PeerSyncReplicationStateTransitionState.SYNC_REPLICATION_UPDATE_LAST_PUSHED_SEQ_ID_FOR_SERIAL_PEER);
+ } else {
+ setNextState(
+ PeerSyncReplicationStateTransitionState.TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE);
+ }
+ return Flow.HAS_MORE_STATE;
+ case SYNC_REPLICATION_UPDATE_LAST_PUSHED_SEQ_ID_FOR_SERIAL_PEER:
+ try {
+ setLastPushedSequenceId(env, env.getReplicationPeerManager().getPeerConfig(peerId).get());
+ } catch (Exception e) {
+ long backoff = ProcedureUtil.getBackoffTimeMs(attempts);
+ LOG.warn(
+ "Failed to update last pushed sequence id for peer {} when transiting sync " +
+ "replication peer state from {} to {}, sleep {} secs and retry",
+ peerId, fromState, toState, backoff / 1000, e);
+ throw suspend(backoff);
+ }
+ setNextState(PeerSyncReplicationStateTransitionState.REPLAY_REMOTE_WAL_IN_PEER);
+ return Flow.HAS_MORE_STATE;
case REPLAY_REMOTE_WAL_IN_PEER:
replayRemoteWAL(env.getReplicationPeerManager().getPeerConfig(peerId).get().isSerial());
setNextState(
@@ -266,7 +308,7 @@ public class TransitPeerSyncReplicationStateProcedure
long backoff = ProcedureUtil.getBackoffTimeMs(attempts);
LOG.warn(
"Failed to remove all replication queues peer {} when starting transiting" +
- " sync replication peer state from {} to {}, sleep {} secs and retry",
+ " sync replication peer state from {} to {}, sleep {} secs and retry",
peerId, fromState, toState, backoff / 1000, e);
throw suspend(backoff);
}
@@ -275,11 +317,6 @@ public class TransitPeerSyncReplicationStateProcedure
? PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER
: PeerSyncReplicationStateTransitionState.TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE);
return Flow.HAS_MORE_STATE;
- case REOPEN_ALL_REGIONS_IN_PEER:
- reopenRegions(env);
- setNextState(
- PeerSyncReplicationStateTransitionState.TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE);
- return Flow.HAS_MORE_STATE;
case TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE:
try {
transitPeerSyncReplicationState(env);
@@ -287,7 +324,7 @@ public class TransitPeerSyncReplicationStateProcedure
long backoff = ProcedureUtil.getBackoffTimeMs(attempts);
LOG.warn(
"Failed to update peer storage for peer {} when ending transiting sync " +
- "replication peer state from {} to {}, sleep {} secs and retry",
+ "replication peer state from {} to {}, sleep {} secs and retry",
peerId, fromState, toState, backoff / 1000, e);
throw suspend(backoff);
}
@@ -308,7 +345,7 @@ public class TransitPeerSyncReplicationStateProcedure
long backoff = ProcedureUtil.getBackoffTimeMs(attempts);
LOG.warn(
"Failed to set peer enabled for peer {} when transiting sync replication peer " +
- "state from {} to {}, sleep {} secs and retry",
+ "state from {} to {}, sleep {} secs and retry",
peerId, fromState, toState, backoff / 1000, e);
throw suspend(backoff);
}
@@ -327,7 +364,7 @@ public class TransitPeerSyncReplicationStateProcedure
long backoff = ProcedureUtil.getBackoffTimeMs(attempts);
LOG.warn(
"Failed to create remote wal dir for peer {} when transiting sync replication " +
- "peer state from {} to {}, sleep {} secs and retry",
+ "peer state from {} to {}, sleep {} secs and retry",
peerId, fromState, toState, backoff / 1000, e);
throw suspend(backoff);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/766aa1bf/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
index 1b52354..f373590 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
@@ -103,8 +103,8 @@ public class SyncReplicationTestBase {
ZK_UTIL.startMiniZKCluster();
initTestingUtility(UTIL1, "/cluster1");
initTestingUtility(UTIL2, "/cluster2");
- StartMiniClusterOption option = StartMiniClusterOption.builder()
- .numMasters(2).numRegionServers(3).numDataNodes(3).build();
+ StartMiniClusterOption option =
+ StartMiniClusterOption.builder().numMasters(2).numRegionServers(3).numDataNodes(3).build();
UTIL1.startMiniCluster(option);
UTIL2.startMiniCluster(option);
TableDescriptor td =
@@ -217,16 +217,16 @@ public class SyncReplicationTestBase {
return getRemoteWALDir(remoteWALDir, peerId);
}
- protected Path getRemoteWALDir(Path remoteWALDir, String peerId) {
+ protected final Path getRemoteWALDir(Path remoteWALDir, String peerId) {
return new Path(remoteWALDir, peerId);
}
- protected Path getReplayRemoteWALs(Path remoteWALDir, String peerId) {
+ protected final Path getReplayRemoteWALs(Path remoteWALDir, String peerId) {
return new Path(remoteWALDir, peerId + "-replay");
}
- protected void verifyRemovedPeer(String peerId, Path remoteWALDir, HBaseTestingUtility utility)
- throws Exception {
+ protected final void verifyRemovedPeer(String peerId, Path remoteWALDir,
+ HBaseTestingUtility utility) throws Exception {
ReplicationPeerStorage rps = ReplicationStorageFactory
.getReplicationPeerStorage(utility.getZooKeeperWatcher(), utility.getConfiguration());
try {
@@ -247,7 +247,7 @@ public class SyncReplicationTestBase {
}
}
- protected void verifyReplicationRequestRejection(HBaseTestingUtility utility,
+ protected final void verifyReplicationRequestRejection(HBaseTestingUtility utility,
boolean expectedRejection) throws Exception {
HRegionServer regionServer = utility.getRSForFirstRegionInTable(TABLE_NAME);
ClusterConnection connection = regionServer.getClusterConnection();
@@ -270,4 +270,20 @@ public class SyncReplicationTestBase {
}
}
}
+
+ protected final void waitUntilDeleted(HBaseTestingUtility util, Path remoteWAL) throws Exception {
+ MasterFileSystem mfs = util.getMiniHBaseCluster().getMaster().getMasterFileSystem();
+ util.waitFor(30000, new ExplainingPredicate<Exception>() {
+
+ @Override
+ public boolean evaluate() throws Exception {
+ return !mfs.getWALFileSystem().exists(remoteWAL);
+ }
+
+ @Override
+ public String explainFailure() throws Exception {
+ return remoteWAL + " has not been deleted yet";
+ }
+ });
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/766aa1bf/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialSyncReplication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialSyncReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialSyncReplication.java
new file mode 100644
index 0000000..6725649
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialSyncReplication.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.hamcrest.CoreMatchers.endsWith;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.master.MasterFileSystem;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.LogRoller;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
+
+/**
+ * Testcase to confirm that serial replication will not be stuck when using along with synchronous
+ * replication. See HBASE-21486 for more details.
+ */
+@Category({ ReplicationTests.class, LargeTests.class })
+public class TestSerialSyncReplication extends SyncReplicationTestBase {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestSerialSyncReplication.class);
+
+ @Test
+ public void test() throws Exception {
+ // change to serial
+ UTIL1.getAdmin().updateReplicationPeerConfig(PEER_ID, ReplicationPeerConfig
+ .newBuilder(UTIL1.getAdmin().getReplicationPeerConfig(PEER_ID)).setSerial(true).build());
+ UTIL2.getAdmin().updateReplicationPeerConfig(PEER_ID, ReplicationPeerConfig
+ .newBuilder(UTIL2.getAdmin().getReplicationPeerConfig(PEER_ID)).setSerial(true).build());
+
+ UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+ SyncReplicationState.STANDBY);
+ UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+ SyncReplicationState.ACTIVE);
+
+ UTIL2.getAdmin().disableReplicationPeer(PEER_ID);
+
+ writeAndVerifyReplication(UTIL1, UTIL2, 0, 100);
+
+ MasterFileSystem mfs = UTIL2.getMiniHBaseCluster().getMaster().getMasterFileSystem();
+ Path remoteWALDir = ReplicationUtils.getPeerRemoteWALDir(
+ new Path(mfs.getWALRootDir(), ReplicationUtils.REMOTE_WAL_DIR_NAME), PEER_ID);
+ FileStatus[] remoteWALStatus = mfs.getWALFileSystem().listStatus(remoteWALDir);
+ assertEquals(1, remoteWALStatus.length);
+ Path remoteWAL = remoteWALStatus[0].getPath();
+ assertThat(remoteWAL.getName(), endsWith(ReplicationUtils.SYNC_WAL_SUFFIX));
+ // roll the wal writer, so that we will delete the remore wal. This is used to make sure that we
+ // will not replay this wal when transiting to DA.
+ for (RegionServerThread t : UTIL1.getMiniHBaseCluster().getRegionServerThreads()) {
+ LogRoller roller = t.getRegionServer().getWalRoller();
+ roller.requestRollAll();
+ roller.waitUntilWalRollFinished();
+ }
+ waitUntilDeleted(UTIL2, remoteWAL);
+
+ UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+ SyncReplicationState.DOWNGRADE_ACTIVE);
+ UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+ SyncReplicationState.STANDBY);
+ // let's reopen the region
+ RegionInfo region = Iterables.getOnlyElement(UTIL2.getAdmin().getRegions(TABLE_NAME));
+ HRegionServer target = UTIL2.getOtherRegionServer(UTIL2.getRSForFirstRegionInTable(TABLE_NAME));
+ UTIL2.getAdmin().move(region.getEncodedNameAsBytes(),
+ Bytes.toBytes(target.getServerName().getServerName()));
+ // here we will remove all the pending wals. This is not a normal operation sequence but anyway,
+ // user could do this.
+ UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+ SyncReplicationState.STANDBY);
+ // transit back to DA
+ UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+ SyncReplicationState.DOWNGRADE_ACTIVE);
+
+ UTIL2.getAdmin().enableReplicationPeer(PEER_ID);
+ // make sure that the async replication still works
+ writeAndVerifyReplication(UTIL2, UTIL1, 100, 200);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/766aa1bf/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationRemoveRemoteWAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationRemoveRemoteWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationRemoveRemoteWAL.java
index 0cd1846..9f89826 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationRemoveRemoteWAL.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationRemoveRemoteWAL.java
@@ -25,7 +25,6 @@ import static org.junit.Assert.assertTrue;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.testclassification.LargeTests;
@@ -41,22 +40,6 @@ public class TestSyncReplicationRemoveRemoteWAL extends SyncReplicationTestBase
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestSyncReplicationRemoveRemoteWAL.class);
- private void waitUntilDeleted(Path remoteWAL) throws Exception {
- MasterFileSystem mfs = UTIL2.getMiniHBaseCluster().getMaster().getMasterFileSystem();
- UTIL1.waitFor(30000, new ExplainingPredicate<Exception>() {
-
- @Override
- public boolean evaluate() throws Exception {
- return !mfs.getWALFileSystem().exists(remoteWAL);
- }
-
- @Override
- public String explainFailure() throws Exception {
- return remoteWAL + " has not been deleted yet";
- }
- });
- }
-
@Test
public void testRemoveRemoteWAL() throws Exception {
UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
@@ -76,7 +59,7 @@ public class TestSyncReplicationRemoveRemoteWAL extends SyncReplicationTestBase
HRegionServer rs = UTIL1.getRSForFirstRegionInTable(TABLE_NAME);
rs.getWalRoller().requestRollAll();
// The replicated wal file should be deleted finally
- waitUntilDeleted(remoteWAL);
+ waitUntilDeleted(UTIL2, remoteWAL);
remoteWALStatus = mfs.getWALFileSystem().listStatus(remoteWALDir);
assertEquals(1, remoteWALStatus.length);
remoteWAL = remoteWALStatus[0].getPath();
@@ -95,6 +78,6 @@ public class TestSyncReplicationRemoveRemoteWAL extends SyncReplicationTestBase
verifyThroughRegion(UTIL2, 100, 200);
// Confirm that we will also remove the remote wal files in DA state
- waitUntilDeleted(remoteWAL);
+ waitUntilDeleted(UTIL2, remoteWAL);
}
}