You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/05/07 06:45:56 UTC
[20/29] hbase git commit: HBASE-20426 Give up replicating anything in
S state
HBASE-20426 Give up replicating anything in S state
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/cb3ca05e
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/cb3ca05e
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/cb3ca05e
Branch: refs/heads/HBASE-19064
Commit: cb3ca05e4b50ba24585c7ffb93cb5a2d2af4a3cf
Parents: f7c5981
Author: zhangduo <zh...@apache.org>
Authored: Thu May 3 15:51:35 2018 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Mon May 7 10:43:47 2018 +0800
----------------------------------------------------------------------
.../src/main/protobuf/MasterProcedure.proto | 13 +-
.../replication/AbstractPeerProcedure.java | 4 +
.../master/replication/ModifyPeerProcedure.java | 6 -
.../replication/ReplicationPeerManager.java | 13 +-
...ransitPeerSyncReplicationStateProcedure.java | 94 +++++++++++----
.../hadoop/hbase/regionserver/LogRoller.java | 11 +-
.../regionserver/PeerProcedureHandlerImpl.java | 63 ++++++++--
.../regionserver/ReplicationSource.java | 1 +
.../regionserver/ReplicationSourceManager.java | 118 ++++++++++++++++---
.../TestDrainReplicationQueuesForStandBy.java | 118 +++++++++++++++++++
10 files changed, 379 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/cb3ca05e/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
index 01e4dae..f15cb04 100644
--- a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
@@ -394,11 +394,14 @@ enum PeerSyncReplicationStateTransitionState {
SET_PEER_NEW_SYNC_REPLICATION_STATE = 2;
REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_BEGIN = 3;
REPLAY_REMOTE_WAL_IN_PEER = 4;
- REOPEN_ALL_REGIONS_IN_PEER = 5;
- TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE = 6;
- REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_END = 7;
- CREATE_DIR_FOR_REMOTE_WAL = 8;
- POST_PEER_SYNC_REPLICATION_STATE_TRANSITION = 9;
+ REMOVE_ALL_REPLICATION_QUEUES_IN_PEER = 5;
+ REOPEN_ALL_REGIONS_IN_PEER = 6;
+ TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE = 7;
+ REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_END = 8;
+ SYNC_REPLICATION_SET_PEER_ENABLED = 9;
+ SYNC_REPLICATION_ENABLE_PEER_REFRESH_PEER_ON_RS = 10;
+ CREATE_DIR_FOR_REMOTE_WAL = 11;
+ POST_PEER_SYNC_REPLICATION_STATE_TRANSITION = 12;
}
message PeerModificationStateData {
http://git-wip-us.apache.org/repos/asf/hbase/blob/cb3ca05e/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java
index 6679d78..458e073 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java
@@ -106,4 +106,8 @@ public abstract class AbstractPeerProcedure<TState>
throw new UnsupportedOperationException();
}
+ protected final void refreshPeer(MasterProcedureEnv env, PeerOperationType type) {
+ addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream()
+ .map(sn -> new RefreshPeerProcedure(peerId, type, sn)).toArray(RefreshPeerProcedure[]::new));
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/cb3ca05e/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
index 32b8ea1..56462ca 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
@@ -108,12 +108,6 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
throw new UnsupportedOperationException();
}
- private void refreshPeer(MasterProcedureEnv env, PeerOperationType type) {
- addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream()
- .map(sn -> new RefreshPeerProcedure(peerId, type, sn))
- .toArray(RefreshPeerProcedure[]::new));
- }
-
protected ReplicationPeerConfig getOldPeerConfig() {
return null;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/cb3ca05e/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
index 229549e..e1d8b51 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
@@ -192,9 +193,9 @@ public class ReplicationPeerManager {
}
/**
- * @return the old state.
+ * @return the old state, and whether the peer is enabled.
*/
- public SyncReplicationState preTransitPeerSyncReplicationState(String peerId,
+ Pair<SyncReplicationState, Boolean> preTransitPeerSyncReplicationState(String peerId,
SyncReplicationState state) throws DoNotRetryIOException {
ReplicationPeerDescription desc = checkPeerExists(peerId);
SyncReplicationState fromState = desc.getSyncReplicationState();
@@ -203,7 +204,7 @@ public class ReplicationPeerManager {
throw new DoNotRetryIOException("Can not transit current cluster state from " + fromState +
" to " + state + " for peer id=" + peerId);
}
- return fromState;
+ return Pair.newPair(fromState, desc.isEnabled());
}
public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
@@ -303,7 +304,7 @@ public class ReplicationPeerManager {
}
}
- public void removeAllQueuesAndHFileRefs(String peerId) throws ReplicationException {
+ public void removeAllQueues(String peerId) throws ReplicationException {
// Here we need two passes to address the problem of claimQueue. Maybe a claimQueue is still
// on-going when the refresh peer config procedure is done, if a RS which has already been
// scanned claims the queue of a RS which has not been scanned yet, we will miss that queue in
@@ -317,6 +318,10 @@ public class ReplicationPeerManager {
// unless it has already been removed by others.
ReplicationUtils.removeAllQueues(queueStorage, peerId);
ReplicationUtils.removeAllQueues(queueStorage, peerId);
+ }
+
+ public void removeAllQueuesAndHFileRefs(String peerId) throws ReplicationException {
+ removeAllQueues(peerId);
queueStorage.removePeerFromHFileRefs(peerId);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/cb3ca05e/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
index 99fd615..0175296 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,6 +55,8 @@ public class TransitPeerSyncReplicationStateProcedure
private SyncReplicationState toState;
+ private boolean enabled;
+
public TransitPeerSyncReplicationStateProcedure() {
}
@@ -110,7 +113,10 @@ public class TransitPeerSyncReplicationStateProcedure
if (cpHost != null) {
cpHost.preTransitReplicationPeerSyncReplicationState(peerId, toState);
}
- fromState = env.getReplicationPeerManager().preTransitPeerSyncReplicationState(peerId, toState);
+ Pair<SyncReplicationState, Boolean> pair =
+ env.getReplicationPeerManager().preTransitPeerSyncReplicationState(peerId, toState);
+ fromState = pair.getFirst();
+ enabled = pair.getSecond();
}
private void postTransit(MasterProcedureEnv env) throws IOException {
@@ -131,6 +137,21 @@ public class TransitPeerSyncReplicationStateProcedure
.collect(Collectors.toList());
}
+ private void createDirForRemoteWAL(MasterProcedureEnv env)
+ throws ProcedureYieldException, IOException {
+ MasterFileSystem mfs = env.getMasterFileSystem();
+ Path remoteWALDir = new Path(mfs.getWALRootDir(), ReplicationUtils.REMOTE_WAL_DIR_NAME);
+ Path remoteWALDirForPeer = ReplicationUtils.getRemoteWALDirForPeer(remoteWALDir, peerId);
+ FileSystem walFs = mfs.getWALFileSystem();
+ if (walFs.exists(remoteWALDirForPeer)) {
+ LOG.warn("Wal dir {} already exists, usually this should not happen, continue anyway",
+ remoteWALDirForPeer);
+ } else if (!walFs.mkdirs(remoteWALDirForPeer)) {
+ LOG.warn("Can not create remote wal dir {}", remoteWALDirForPeer);
+ throw new ProcedureYieldException();
+ }
+ }
+
@Override
protected Flow executeFromState(MasterProcedureEnv env,
PeerSyncReplicationStateTransitionState state)
@@ -151,6 +172,13 @@ public class TransitPeerSyncReplicationStateProcedure
case SET_PEER_NEW_SYNC_REPLICATION_STATE:
try {
env.getReplicationPeerManager().setPeerNewSyncReplicationState(peerId, toState);
+ if (toState.equals(SyncReplicationState.STANDBY) && enabled) {
+ // disable the peer if we are going to transit to STANDBY state, as we need to remove
+ // all the pending replication files. If we do not disable the peer and delete the wal
+ // queues on zk directly, RS will get NoNode exception when updating the wal position
+ // and crash.
+ env.getReplicationPeerManager().disablePeer(peerId);
+ }
} catch (ReplicationException e) {
LOG.warn("Failed to update peer storage for peer {} when starting transiting sync " +
"replication peer state from {} to {}, retry", peerId, fromState, toState, e);
@@ -163,16 +191,35 @@ public class TransitPeerSyncReplicationStateProcedure
addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream()
.map(sn -> new RefreshPeerProcedure(peerId, getPeerOperationType(), sn, 0))
.toArray(RefreshPeerProcedure[]::new));
- if (fromState == SyncReplicationState.STANDBY &&
- toState == SyncReplicationState.DOWNGRADE_ACTIVE) {
- setNextState(PeerSyncReplicationStateTransitionState.REPLAY_REMOTE_WAL_IN_PEER);
+ if (fromState.equals(SyncReplicationState.ACTIVE)) {
+ setNextState(toState.equals(SyncReplicationState.STANDBY)
+ ? PeerSyncReplicationStateTransitionState.REMOVE_ALL_REPLICATION_QUEUES_IN_PEER
+ : PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER);
+ } else if (fromState.equals(SyncReplicationState.DOWNGRADE_ACTIVE)) {
+ setNextState(toState.equals(SyncReplicationState.STANDBY)
+ ? PeerSyncReplicationStateTransitionState.REMOVE_ALL_REPLICATION_QUEUES_IN_PEER
+ : PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER);
} else {
- setNextState(PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER);
+ assert toState.equals(SyncReplicationState.DOWNGRADE_ACTIVE);
+ setNextState(PeerSyncReplicationStateTransitionState.REPLAY_REMOTE_WAL_IN_PEER);
}
return Flow.HAS_MORE_STATE;
case REPLAY_REMOTE_WAL_IN_PEER:
addChildProcedure(new RecoverStandbyProcedure(peerId));
- setNextState(PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER);
+ setNextState(
+ PeerSyncReplicationStateTransitionState.TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE);
+ return Flow.HAS_MORE_STATE;
+ case REMOVE_ALL_REPLICATION_QUEUES_IN_PEER:
+ try {
+ env.getReplicationPeerManager().removeAllQueues(peerId);
+ } catch (ReplicationException e) {
+ LOG.warn("Failed to remove all replication queues peer {} when starting transiting" +
+ " sync replication peer state from {} to {}, retry", peerId, fromState, toState, e);
+ throw new ProcedureYieldException();
+ }
+ setNextState(fromState.equals(SyncReplicationState.ACTIVE)
+ ? PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER
+ : PeerSyncReplicationStateTransitionState.TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE);
return Flow.HAS_MORE_STATE;
case REOPEN_ALL_REGIONS_IN_PEER:
try {
@@ -202,27 +249,35 @@ public class TransitPeerSyncReplicationStateProcedure
.map(sn -> new RefreshPeerProcedure(peerId, getPeerOperationType(), sn, 1))
.toArray(RefreshPeerProcedure[]::new));
if (toState == SyncReplicationState.STANDBY) {
- setNextState(PeerSyncReplicationStateTransitionState.CREATE_DIR_FOR_REMOTE_WAL);
+ setNextState(
+ enabled ? PeerSyncReplicationStateTransitionState.SYNC_REPLICATION_SET_PEER_ENABLED
+ : PeerSyncReplicationStateTransitionState.CREATE_DIR_FOR_REMOTE_WAL);
} else {
setNextState(
PeerSyncReplicationStateTransitionState.POST_PEER_SYNC_REPLICATION_STATE_TRANSITION);
}
return Flow.HAS_MORE_STATE;
+ case SYNC_REPLICATION_SET_PEER_ENABLED:
+ try {
+ env.getReplicationPeerManager().enablePeer(peerId);
+ } catch (ReplicationException e) {
+ LOG.warn("Failed to set peer enabled for peer {} when transiting sync replication peer " +
+ "state from {} to {}, retry", peerId, fromState, toState, e);
+ throw new ProcedureYieldException();
+ }
+ setNextState(
+ PeerSyncReplicationStateTransitionState.SYNC_REPLICATION_ENABLE_PEER_REFRESH_PEER_ON_RS);
+ return Flow.HAS_MORE_STATE;
+ case SYNC_REPLICATION_ENABLE_PEER_REFRESH_PEER_ON_RS:
+ refreshPeer(env, PeerOperationType.ENABLE);
+ setNextState(PeerSyncReplicationStateTransitionState.CREATE_DIR_FOR_REMOTE_WAL);
+ return Flow.HAS_MORE_STATE;
case CREATE_DIR_FOR_REMOTE_WAL:
- MasterFileSystem mfs = env.getMasterFileSystem();
- Path remoteWALDir = new Path(mfs.getWALRootDir(), ReplicationUtils.REMOTE_WAL_DIR_NAME);
- Path remoteWALDirForPeer = ReplicationUtils.getRemoteWALDirForPeer(remoteWALDir, peerId);
- FileSystem walFs = mfs.getWALFileSystem();
try {
- if (walFs.exists(remoteWALDirForPeer)) {
- LOG.warn("Wal dir {} already exists, usually this should not happen, continue anyway",
- remoteWALDirForPeer);
- } else if (!walFs.mkdirs(remoteWALDirForPeer)) {
- LOG.warn("Can not create remote wal dir {}", remoteWALDirForPeer);
- throw new ProcedureYieldException();
- }
+ createDirForRemoteWAL(env);
} catch (IOException e) {
- LOG.warn("Failed to create remote wal dir {}", remoteWALDirForPeer, e);
+ LOG.warn("Failed to create remote wal dir for peer {} when transiting sync replication " +
+ "peer state from {} to {}, retry", peerId, fromState, toState, e);
throw new ProcedureYieldException();
}
setNextState(
@@ -242,5 +297,4 @@ public class TransitPeerSyncReplicationStateProcedure
throw new UnsupportedOperationException("unhandled state=" + state);
}
}
-
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/cb3ca05e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
index ab0083f..05a8fdf 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
@@ -244,10 +244,8 @@ public class LogRoller extends HasThread implements Closeable {
}
/**
- * For testing only
* @return true if all WAL roll finished
*/
- @VisibleForTesting
public boolean walRollFinished() {
for (boolean needRoll : walNeedsRoll.values()) {
if (needRoll) {
@@ -257,6 +255,15 @@ public class LogRoller extends HasThread implements Closeable {
return true;
}
+ /**
+ * Wait until all wals have been rolled after calling {@link #requestRollAll()}.
+ */
+ public void waitUntilWalRollFinished() throws InterruptedException {
+ while (!walRollFinished()) {
+ Thread.sleep(100);
+ }
+ }
+
@Override
public void close() {
running = false;
http://git-wip-us.apache.org/repos/asf/hbase/blob/cb3ca05e/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
index 7fc9f53..d01b130 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
@@ -18,8 +18,10 @@
package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
+import java.io.InterruptedIOException;
import java.util.concurrent.locks.Lock;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.LogRoller;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
@@ -154,24 +156,65 @@ public class PeerProcedureHandlerImpl implements PeerProcedureHandler {
if (!peer.getPeerConfig().isSyncReplication()) {
throw new ReplicationException("Peer with id=" + peerId + " is not synchronous.");
}
- SyncReplicationState newState = peer.getNewSyncReplicationState();
+ SyncReplicationState newSyncReplicationState = peer.getNewSyncReplicationState();
if (stage == 0) {
- if (newState != SyncReplicationState.NONE) {
+ if (newSyncReplicationState != SyncReplicationState.NONE) {
LOG.warn("The new sync replication state for peer {} has already been set to {}, " +
- "this should be a retry, give up", peerId, newState);
+ "this should be a retry, give up", peerId, newSyncReplicationState);
return;
}
- newState = replicationPeers.refreshPeerNewSyncReplicationState(peerId);
- SyncReplicationState oldState = peer.getSyncReplicationState();
- peerActionListener.peerSyncReplicationStateChange(peerId, oldState, newState, stage);
+ // refresh the peer state first, as when we transit to STANDBY, we may need to disable the
+ // peer before processing the sync replication state.
+ PeerState oldState = peer.getPeerState();
+ boolean success = false;
+ try {
+ PeerState newState = replicationPeers.refreshPeerState(peerId);
+ if (oldState.equals(PeerState.ENABLED) && newState.equals(PeerState.DISABLED)) {
+ replicationSourceManager.refreshSources(peerId);
+ }
+ success = true;
+ } finally {
+ if (!success) {
+ peer.setPeerState(oldState.equals(PeerState.ENABLED));
+ }
+ }
+ newSyncReplicationState = replicationPeers.refreshPeerNewSyncReplicationState(peerId);
+ SyncReplicationState oldSyncReplicationState = peer.getSyncReplicationState();
+ peerActionListener.peerSyncReplicationStateChange(peerId, oldSyncReplicationState,
+ newSyncReplicationState, stage);
} else {
- if (newState == SyncReplicationState.NONE) {
- LOG.warn("The new sync replication state for peer {} has already been clear, and the " +
- "current state is {}, this should be a retry, give up", peerId, newState);
+ if (newSyncReplicationState == SyncReplicationState.NONE) {
+ LOG.warn(
+ "The new sync replication state for peer {} has already been clear, and the " +
+ "current state is {}, this should be a retry, give up",
+ peerId, newSyncReplicationState);
return;
}
+ if (newSyncReplicationState == SyncReplicationState.STANDBY) {
+ replicationSourceManager.drainSources(peerId);
+ // Need to roll the wals and make the ReplicationSource for this peer track the new file.
+ // If we do not do this, there will be two problems that can not be addressed at the same
+ // time. First, if we just throw away the current wal file, and later when we transit the
+ // peer to DA, and the wal has not been rolled yet, then the new data written to the wal
+ // file will not be replicated and cause data inconsistency. But if we just track the
+ // current wal file without rolling, it may contains some data before we transit the peer
+ // to S, later if we transit the peer to DA, the data will also be replicated and cause
+ // data inconsistency. So here we need to roll the wal, and let the ReplicationSource
+ // track the new wal file, and throw the old wal files away.
+ LogRoller roller = rs.getWalRoller();
+ roller.requestRollAll();
+ try {
+ roller.waitUntilWalRollFinished();
+ } catch (InterruptedException e) {
+ // reset the interrupted flag
+ Thread.currentThread().interrupt();
+ throw (IOException) new InterruptedIOException(
+ "Interrupted while waiting for wal roll finish").initCause(e);
+ }
+ }
SyncReplicationState oldState = peer.getSyncReplicationState();
- peerActionListener.peerSyncReplicationStateChange(peerId, oldState, newState, stage);
+ peerActionListener.peerSyncReplicationStateChange(peerId, oldState, newSyncReplicationState,
+ stage);
peer.transitSyncReplicationState();
}
} finally {
http://git-wip-us.apache.org/repos/asf/hbase/blob/cb3ca05e/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 7313f13..1a718a1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -499,6 +499,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
if (this.replicationEndpoint != null) {
this.replicationEndpoint.stop();
}
+ metrics.clear();
if (join) {
for (ReplicationSourceShipper worker : workers) {
Threads.shutdown(worker, this.sleepForRetries);
http://git-wip-us.apache.org/repos/asf/hbase/blob/cb3ca05e/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 5015129..f25b073 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -60,6 +60,7 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationTracker;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
+import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider;
@@ -391,11 +392,83 @@ public class ReplicationSourceManager implements ReplicationListener {
}
/**
+ * <p>
+ * This is used when we transit a sync replication peer to {@link SyncReplicationState#STANDBY}.
+ * </p>
+ * <p>
+ * When transiting to {@link SyncReplicationState#STANDBY}, we can remove all the pending wal
+ * files for a replication peer as we do not need to replicate them any more. And this is
+ * necessary, otherwise when we transit back to {@link SyncReplicationState#DOWNGRADE_ACTIVE}
+ * later, the stale data will be replicated again and cause inconsistency.
+ * </p>
+ * <p>
+ * See HBASE-20426 for more details.
+ * </p>
+ * @param peerId the id of the sync replication peer
+ */
+ public void drainSources(String peerId) throws IOException, ReplicationException {
+ String terminateMessage = "Sync replication peer " + peerId +
+ " is transiting to STANDBY. Will close the previous replication source and open a new one";
+ ReplicationPeer peer = replicationPeers.getPeer(peerId);
+ assert peer.getPeerConfig().isSyncReplication();
+ ReplicationSourceInterface src = createSource(peerId, peer);
+ // synchronized here to avoid race with preLogRoll where we add new log to source and also
+ // walsById.
+ ReplicationSourceInterface toRemove;
+ Map<String, NavigableSet<String>> wals = new HashMap<>();
+ synchronized (latestPaths) {
+ toRemove = sources.put(peerId, src);
+ if (toRemove != null) {
+ LOG.info("Terminate replication source for " + toRemove.getPeerId());
+ toRemove.terminate(terminateMessage);
+ toRemove.getSourceMetrics().clear();
+ }
+ // Here we make a copy of all the remaining wal files and then delete them from the
+ // replication queue storage after releasing the lock. It is not safe to just remove the old
+ // map from walsById since later we may fail to delete them from the replication queue
+ // storage, and when we retry next time, we can not know the wal files that need to be deleted
+ // from the replication queue storage.
+ walsById.get(peerId).forEach((k, v) -> wals.put(k, new TreeSet<>(v)));
+ }
+ LOG.info("Startup replication source for " + src.getPeerId());
+ src.startup();
+ for (NavigableSet<String> walsByGroup : wals.values()) {
+ for (String wal : walsByGroup) {
+ queueStorage.removeWAL(server.getServerName(), peerId, wal);
+ }
+ }
+ synchronized (walsById) {
+ Map<String, NavigableSet<String>> oldWals = walsById.get(peerId);
+ wals.forEach((k, v) -> {
+ NavigableSet<String> walsByGroup = oldWals.get(k);
+ if (walsByGroup != null) {
+ walsByGroup.removeAll(v);
+ }
+ });
+ }
+ // synchronized on oldsources to avoid race with NodeFailoverWorker. Since NodeFailoverWorker is
+ // a background task, we will delete the file from replication queue storage under the lock to
+ // simplify the logic.
+ synchronized (this.oldsources) {
+ for (Iterator<ReplicationSourceInterface> iter = oldsources.iterator(); iter.hasNext();) {
+ ReplicationSourceInterface oldSource = iter.next();
+ if (oldSource.getPeerId().equals(peerId)) {
+ String queueId = oldSource.getQueueId();
+ oldSource.terminate(terminateMessage);
+ oldSource.getSourceMetrics().clear();
+ queueStorage.removeQueue(server.getServerName(), queueId);
+ walsByIdRecoveredQueues.remove(queueId);
+ iter.remove();
+ }
+ }
+ }
+ }
+
+ /**
* Close the previous replication sources of this peer id and open new sources to trigger the new
* replication state changes or new replication config changes. Here we don't need to change
* replication queue storage and only to enqueue all logs to the new replication source
* @param peerId the id of the replication peer
- * @throws IOException
*/
public void refreshSources(String peerId) throws IOException {
String terminateMessage = "Peer " + peerId +
@@ -409,7 +482,7 @@ public class ReplicationSourceManager implements ReplicationListener {
LOG.info("Terminate replication source for " + toRemove.getPeerId());
toRemove.terminate(terminateMessage);
}
- for (SortedSet<String> walsByGroup : walsById.get(peerId).values()) {
+ for (NavigableSet<String> walsByGroup : walsById.get(peerId).values()) {
walsByGroup.forEach(wal -> src.enqueueLog(new Path(this.logDir, wal)));
}
}
@@ -830,18 +903,6 @@ public class ReplicationSourceManager implements ReplicationListener {
actualPeerId);
continue;
}
- // track sources in walsByIdRecoveredQueues
- Map<String, NavigableSet<String>> walsByGroup = new HashMap<>();
- walsByIdRecoveredQueues.put(queueId, walsByGroup);
- for (String wal : walsSet) {
- String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal);
- NavigableSet<String> wals = walsByGroup.get(walPrefix);
- if (wals == null) {
- wals = new TreeSet<>();
- walsByGroup.put(walPrefix, wals);
- }
- wals.add(wal);
- }
ReplicationSourceInterface src = createSource(queueId, peer);
// synchronized on oldsources to avoid adding recovered source for the to-be-removed peer
@@ -849,9 +910,36 @@ public class ReplicationSourceManager implements ReplicationListener {
peer = replicationPeers.getPeer(src.getPeerId());
if (peer == null || !isOldPeer(src.getPeerId(), peer)) {
src.terminate("Recovered queue doesn't belong to any current peer");
- removeRecoveredSource(src);
+ deleteQueue(queueId);
continue;
}
+ // Do not setup recovered queue if a sync replication peer is in STANDBY state, or is
+ // transiting to STANDBY state. The only exception is we are in STANDBY state and
+ // transiting to DA, under this state we will replay the remote WAL and they need to be
+ // replicated back.
+ if (peer.getPeerConfig().isSyncReplication()) {
+ Pair<SyncReplicationState, SyncReplicationState> stateAndNewState =
+ peer.getSyncReplicationStateAndNewState();
+ if ((stateAndNewState.getFirst().equals(SyncReplicationState.STANDBY) &&
+ stateAndNewState.getSecond().equals(SyncReplicationState.NONE)) ||
+ stateAndNewState.getSecond().equals(SyncReplicationState.STANDBY)) {
+ src.terminate("Sync replication peer is in STANDBY state");
+ deleteQueue(queueId);
+ continue;
+ }
+ }
+ // track sources in walsByIdRecoveredQueues
+ Map<String, NavigableSet<String>> walsByGroup = new HashMap<>();
+ walsByIdRecoveredQueues.put(queueId, walsByGroup);
+ for (String wal : walsSet) {
+ String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal);
+ NavigableSet<String> wals = walsByGroup.get(walPrefix);
+ if (wals == null) {
+ wals = new TreeSet<>();
+ walsByGroup.put(walPrefix, wals);
+ }
+ wals.add(wal);
+ }
oldsources.add(src);
for (String wal : walsSet) {
src.enqueueLog(new Path(oldLogDir, wal));
http://git-wip-us.apache.org/repos/asf/hbase/blob/cb3ca05e/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestDrainReplicationQueuesForStandBy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestDrainReplicationQueuesForStandBy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestDrainReplicationQueuesForStandBy.java
new file mode 100644
index 0000000..5da7870
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestDrainReplicationQueuesForStandBy.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import static org.junit.Assert.assertTrue;
+
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
+import org.apache.hadoop.hbase.replication.SyncReplicationState;
+import org.apache.hadoop.hbase.replication.SyncReplicationTestBase;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ ReplicationTests.class, MediumTests.class })
+public class TestDrainReplicationQueuesForStandBy extends SyncReplicationTestBase {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestDrainReplicationQueuesForStandBy.class);
+
+ @Test
+ public void test() throws Exception {
+ UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+ SyncReplicationState.STANDBY);
+ UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+ SyncReplicationState.ACTIVE);
+ UTIL1.getAdmin().disableReplicationPeer(PEER_ID);
+ write(UTIL1, 0, 100);
+
+ HRegionServer rs = UTIL1.getRSForFirstRegionInTable(TABLE_NAME);
+ String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(
+ ((AbstractFSWAL<?>) rs.getWAL(RegionInfoBuilder.newBuilder(TABLE_NAME).build()))
+ .getCurrentFileName().getName());
+ UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+ SyncReplicationState.DOWNGRADE_ACTIVE);
+ // transit cluster2 to DA and cluster 1 to S
+ verify(UTIL2, 0, 100);
+
+ UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+ SyncReplicationState.STANDBY);
+ // delete the original value, and then major compact
+ try (Table table = UTIL2.getConnection().getTable(TABLE_NAME)) {
+ for (int i = 0; i < 100; i++) {
+ table.delete(new Delete(Bytes.toBytes(i)));
+ }
+ }
+ UTIL2.flush(TABLE_NAME);
+ UTIL2.compact(TABLE_NAME, true);
+ // wait until the new values are replicated back to cluster1
+ HRegion region = rs.getRegions(TABLE_NAME).get(0);
+ UTIL1.waitFor(30000, new ExplainingPredicate<Exception>() {
+
+ @Override
+ public boolean evaluate() throws Exception {
+ return region.get(new Get(Bytes.toBytes(99))).isEmpty();
+ }
+
+ @Override
+ public String explainFailure() throws Exception {
+ return "Replication has not been catched up yet";
+ }
+ });
+ // transit cluster1 to DA and cluster2 to S, then we will start replicating from cluster1 to
+ // cluster2
+ UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+ SyncReplicationState.DOWNGRADE_ACTIVE);
+ UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+ SyncReplicationState.STANDBY);
+ UTIL1.getAdmin().enableReplicationPeer(PEER_ID);
+
+ // confirm that we will not replicate the old data which causes inconsistency
+ ReplicationSource source = (ReplicationSource) ((Replication) rs.getReplicationSourceService())
+ .getReplicationManager().getSource(PEER_ID);
+ UTIL1.waitFor(30000, new ExplainingPredicate<Exception>() {
+
+ @Override
+ public boolean evaluate() throws Exception {
+ return !source.workerThreads.containsKey(walGroupId);
+ }
+
+ @Override
+ public String explainFailure() throws Exception {
+ return "Replication has not been catched up yet";
+ }
+ });
+ HRegion region2 = UTIL2.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0);
+ for (int i = 0; i < 100; i++) {
+ assertTrue(region2.get(new Get(Bytes.toBytes(i))).isEmpty());
+ }
+ }
+}
\ No newline at end of file