You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/05/02 02:59:22 UTC
[32/43] hbase git commit: HBASE-19957 General framework to transit
sync replication state
HBASE-19957 General framework to transit sync replication state
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/8c53906e
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/8c53906e
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/8c53906e
Branch: refs/heads/HBASE-19064
Commit: 8c53906ef2fbbac96c5d93d0a73446f200dea573
Parents: 435391f
Author: zhangduo <zh...@apache.org>
Authored: Fri Feb 9 18:33:28 2018 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Wed May 2 10:51:10 2018 +0800
----------------------------------------------------------------------
.../replication/ReplicationPeerConfig.java | 2 -
.../replication/ReplicationPeerDescription.java | 5 +-
.../hbase/replication/SyncReplicationState.java | 19 +-
.../org/apache/hadoop/hbase/HConstants.java | 3 +
.../src/main/protobuf/MasterProcedure.proto | 20 +-
.../hbase/replication/ReplicationPeerImpl.java | 45 ++++-
.../replication/ReplicationPeerStorage.java | 25 ++-
.../hbase/replication/ReplicationPeers.java | 27 ++-
.../replication/ZKReplicationPeerStorage.java | 63 +++++--
.../hbase/coprocessor/MasterObserver.java | 7 +-
.../org/apache/hadoop/hbase/master/HMaster.java | 4 +-
.../hbase/master/MasterCoprocessorHost.java | 12 +-
.../replication/AbstractPeerProcedure.java | 14 +-
.../master/replication/ModifyPeerProcedure.java | 11 --
.../replication/RefreshPeerProcedure.java | 18 +-
.../replication/ReplicationPeerManager.java | 89 +++++----
...ransitPeerSyncReplicationStateProcedure.java | 181 ++++++++++++-------
.../hbase/regionserver/HRegionServer.java | 35 ++--
.../regionserver/ReplicationSourceService.java | 11 +-
.../regionserver/PeerActionListener.java | 4 +-
.../regionserver/PeerProcedureHandler.java | 16 +-
.../regionserver/PeerProcedureHandlerImpl.java | 52 +++++-
.../regionserver/RefreshPeerCallable.java | 7 +
.../replication/regionserver/Replication.java | 22 ++-
.../regionserver/ReplicationSourceManager.java | 41 +++--
.../SyncReplicationPeerInfoProvider.java | 43 +++++
.../SyncReplicationPeerInfoProviderImpl.java | 71 ++++++++
.../SyncReplicationPeerMappingManager.java | 48 +++++
.../SyncReplicationPeerProvider.java | 35 ----
.../hbase/wal/SyncReplicationWALProvider.java | 35 ++--
.../org/apache/hadoop/hbase/wal/WALFactory.java | 47 ++---
.../replication/TestReplicationAdmin.java | 3 +-
.../TestReplicationSourceManager.java | 5 +-
.../wal/TestSyncReplicationWALProvider.java | 36 ++--
34 files changed, 743 insertions(+), 313 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/8c53906e/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
index 997a155..cc7b4bc 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
@@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.hadoop.hbase.replication;
import java.util.Collection;
@@ -25,7 +24,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
-
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.util.Bytes;
http://git-wip-us.apache.org/repos/asf/hbase/blob/8c53906e/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerDescription.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerDescription.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerDescription.java
index 2d077c5..b0c27bb 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerDescription.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerDescription.java
@@ -20,7 +20,10 @@ package org.apache.hadoop.hbase.replication;
import org.apache.yetus.audience.InterfaceAudience;
/**
- * The POJO equivalent of ReplicationProtos.ReplicationPeerDescription
+ * The POJO equivalent of ReplicationProtos.ReplicationPeerDescription.
+ * <p>
+ * To developer, here we do not store the new sync replication state since it is just an
+ * intermediate state and this class is public.
*/
@InterfaceAudience.Public
public class ReplicationPeerDescription {
http://git-wip-us.apache.org/repos/asf/hbase/blob/8c53906e/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/SyncReplicationState.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/SyncReplicationState.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/SyncReplicationState.java
index a65b144..de9576c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/SyncReplicationState.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/SyncReplicationState.java
@@ -29,14 +29,19 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
/**
* Used by synchronous replication. Indicate the state of the current cluster in a synchronous
* replication peer. The state may be one of {@link SyncReplicationState#ACTIVE},
- * {@link SyncReplicationState#DOWNGRADE_ACTIVE} or
- * {@link SyncReplicationState#STANDBY}.
+ * {@link SyncReplicationState#DOWNGRADE_ACTIVE} or {@link SyncReplicationState#STANDBY}.
* <p>
* For asynchronous replication, the state is {@link SyncReplicationState#NONE}.
*/
@InterfaceAudience.Public
public enum SyncReplicationState {
- NONE, ACTIVE, DOWNGRADE_ACTIVE, STANDBY;
+ NONE(0), ACTIVE(1), DOWNGRADE_ACTIVE(2), STANDBY(3);
+
+ private final byte value;
+
+ private SyncReplicationState(int value) {
+ this.value = (byte) value;
+ }
public static SyncReplicationState valueOf(int value) {
switch (value) {
@@ -53,13 +58,17 @@ public enum SyncReplicationState {
}
}
+ public int value() {
+ return value & 0xFF;
+ }
+
public static byte[] toByteArray(SyncReplicationState state) {
return ProtobufUtil
- .prependPBMagic(ReplicationPeerConfigUtil.toSyncReplicationState(state).toByteArray());
+ .prependPBMagic(ReplicationPeerConfigUtil.toSyncReplicationState(state).toByteArray());
}
public static SyncReplicationState parseFrom(byte[] bytes) throws InvalidProtocolBufferException {
return ReplicationPeerConfigUtil.toSyncReplicationState(ReplicationProtos.SyncReplicationState
- .parseFrom(Arrays.copyOfRange(bytes, ProtobufUtil.lengthOfPBMagic(), bytes.length)));
+ .parseFrom(Arrays.copyOfRange(bytes, ProtobufUtil.lengthOfPBMagic(), bytes.length)));
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8c53906e/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index 9241682..522c2cf 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -1355,6 +1355,9 @@ public final class HConstants {
public static final String NOT_IMPLEMENTED = "Not implemented";
+ // TODO: need to find a better place to hold it.
+ public static final String SYNC_REPLICATION_ENABLED = "hbase.replication.sync.enabled";
+
private HConstants() {
// Can't be instantiated with this ctor.
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8c53906e/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
index 39fc72a..67c1b43 100644
--- a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
@@ -389,6 +389,17 @@ enum PeerModificationState {
POST_PEER_MODIFICATION = 8;
}
+enum PeerSyncReplicationStateTransitionState {
+ PRE_PEER_SYNC_REPLICATION_STATE_TRANSITION = 1;
+ SET_PEER_NEW_SYNC_REPLICATION_STATE = 2;
+ REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_BEGIN = 3;
+ REPLAY_REMOTE_WAL_IN_PEER = 4;
+ REOPEN_ALL_REGIONS_IN_PEER = 5;
+ TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE = 6;
+ REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_END = 7;
+ POST_PEER_SYNC_REPLICATION_STATE_TRANSITION = 8;
+}
+
message PeerModificationStateData {
required string peer_id = 1;
}
@@ -399,18 +410,23 @@ enum PeerModificationType {
ENABLE_PEER = 3;
DISABLE_PEER = 4;
UPDATE_PEER_CONFIG = 5;
+ TRANSIT_SYNC_REPLICATION_STATE = 6;
}
message RefreshPeerStateData {
required string peer_id = 1;
required PeerModificationType type = 2;
required ServerName target_server = 3;
+ /** We need multiple stages for sync replication state transition **/
+ optional uint32 stage = 4 [default = 0];
}
message RefreshPeerParameter {
required string peer_id = 1;
required PeerModificationType type = 2;
required ServerName target_server = 3;
+ /** We need multiple stages for sync replication state transition **/
+ optional uint32 stage = 4 [default = 0];;
}
message PeerProcedureStateData {
@@ -438,5 +454,7 @@ message DisablePeerStateData {
}
message TransitPeerSyncReplicationStateStateData {
- required SyncReplicationState syncReplicationState = 1;
+ /** Could be null if we fail in pre check, so optional */
+ optional SyncReplicationState fromState = 1;
+ required SyncReplicationState toState = 2;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8c53906e/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java
index ff3f662..22026e5 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java
@@ -23,6 +23,7 @@ import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
@@ -36,7 +37,14 @@ public class ReplicationPeerImpl implements ReplicationPeer {
private volatile PeerState peerState;
- private volatile SyncReplicationState syncReplicationState;
+ // The lower 16 bits are the current sync replication state, the higher 16 bits are the new sync
+ // replication state. Embedded in one int so user can not get an inconsistency view of state and
+ // new state.
+ private volatile int syncReplicationStateBits;
+
+ private static final int SHIFT = 16;
+
+ private static final int AND_BITS = 0xFFFF;
private final List<ReplicationPeerConfigListener> peerConfigListeners;
@@ -48,12 +56,14 @@ public class ReplicationPeerImpl implements ReplicationPeer {
* @param peerConfig configuration for the replication peer
*/
public ReplicationPeerImpl(Configuration conf, String id, ReplicationPeerConfig peerConfig,
- boolean peerState, SyncReplicationState syncReplicationState) {
+ boolean peerState, SyncReplicationState syncReplicationState,
+ SyncReplicationState newSyncReplicationState) {
this.conf = conf;
this.id = id;
this.peerState = peerState ? PeerState.ENABLED : PeerState.DISABLED;
this.peerConfig = peerConfig;
- this.syncReplicationState = syncReplicationState;
+ this.syncReplicationStateBits =
+ syncReplicationState.value() | (newSyncReplicationState.value() << SHIFT);
this.peerConfigListeners = new ArrayList<>();
}
@@ -66,6 +76,16 @@ public class ReplicationPeerImpl implements ReplicationPeer {
peerConfigListeners.forEach(listener -> listener.peerConfigUpdated(peerConfig));
}
+ public void setNewSyncReplicationState(SyncReplicationState newState) {
+ this.syncReplicationStateBits =
+ (this.syncReplicationStateBits & AND_BITS) | (newState.value() << SHIFT);
+ }
+
+ public void transitSyncReplicationState() {
+ this.syncReplicationStateBits =
+ (this.syncReplicationStateBits >>> SHIFT) | (SyncReplicationState.NONE.value() << SHIFT);
+ }
+
/**
* Get the identifier of this peer
* @return string representation of the id (short)
@@ -80,9 +100,26 @@ public class ReplicationPeerImpl implements ReplicationPeer {
return peerState;
}
+ private static SyncReplicationState getSyncReplicationState(int bits) {
+ return SyncReplicationState.valueOf(bits & AND_BITS);
+ }
+
+ private static SyncReplicationState getNewSyncReplicationState(int bits) {
+ return SyncReplicationState.valueOf(bits >>> SHIFT);
+ }
+
+ public Pair<SyncReplicationState, SyncReplicationState> getSyncReplicationStateAndNewState() {
+ int bits = this.syncReplicationStateBits;
+ return Pair.newPair(getSyncReplicationState(bits), getNewSyncReplicationState(bits));
+ }
+
+ public SyncReplicationState getNewSyncReplicationState() {
+ return getNewSyncReplicationState(syncReplicationStateBits);
+ }
+
@Override
public SyncReplicationState getSyncReplicationState() {
- return syncReplicationState;
+ return getSyncReplicationState(syncReplicationStateBits);
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/8c53906e/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorage.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorage.java
index d2538ab..f74ac37 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorage.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorage.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.hbase.replication;
import java.util.List;
-
import org.apache.yetus.audience.InterfaceAudience;
/**
@@ -72,16 +71,30 @@ public interface ReplicationPeerStorage {
ReplicationPeerConfig getPeerConfig(String peerId) throws ReplicationException;
/**
- * Set the state of current cluster in a synchronous replication peer.
+ * Set the new sync replication state that we are going to transit to.
* @throws ReplicationException if there are errors accessing the storage service.
*/
- void setPeerSyncReplicationState(String peerId, SyncReplicationState state)
+ void setPeerNewSyncReplicationState(String peerId, SyncReplicationState state)
throws ReplicationException;
/**
- * Get the state of current cluster in a synchronous replication peer.
+ * Overwrite the sync replication state with the new sync replication state which is set with the
+ * {@link #setPeerNewSyncReplicationState(String, SyncReplicationState)} method above, and clear
+ * the new sync replication state.
* @throws ReplicationException if there are errors accessing the storage service.
*/
- SyncReplicationState getPeerSyncReplicationState(String peerId)
- throws ReplicationException;
+ void transitPeerSyncReplicationState(String peerId) throws ReplicationException;
+
+ /**
+ * Get the sync replication state.
+ * @throws ReplicationException if there are errors accessing the storage service.
+ */
+ SyncReplicationState getPeerSyncReplicationState(String peerId) throws ReplicationException;
+
+ /**
+ * Get the new sync replication state. Will return {@link SyncReplicationState#NONE} if we are
+ * not in a transition.
+ * @throws ReplicationException if there are errors accessing the storage service.
+ */
+ SyncReplicationState getPeerNewSyncReplicationState(String peerId) throws ReplicationException;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8c53906e/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
index a54f339..ba6da7a 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
@@ -80,8 +80,8 @@ public class ReplicationPeers {
return true;
}
- public void removePeer(String peerId) {
- peerCache.remove(peerId);
+ public ReplicationPeerImpl removePeer(String peerId) {
+ return peerCache.remove(peerId);
}
/**
@@ -110,22 +110,29 @@ public class ReplicationPeers {
public PeerState refreshPeerState(String peerId) throws ReplicationException {
ReplicationPeerImpl peer = peerCache.get(peerId);
- if (peer == null) {
- throw new ReplicationException("Peer with id=" + peerId + " is not cached.");
- }
peer.setPeerState(peerStorage.isPeerEnabled(peerId));
return peer.getPeerState();
}
public ReplicationPeerConfig refreshPeerConfig(String peerId) throws ReplicationException {
ReplicationPeerImpl peer = peerCache.get(peerId);
- if (peer == null) {
- throw new ReplicationException("Peer with id=" + peerId + " is not cached.");
- }
peer.setPeerConfig(peerStorage.getPeerConfig(peerId));
return peer.getPeerConfig();
}
+ public SyncReplicationState refreshPeerNewSyncReplicationState(String peerId)
+ throws ReplicationException {
+ ReplicationPeerImpl peer = peerCache.get(peerId);
+ SyncReplicationState newState = peerStorage.getPeerNewSyncReplicationState(peerId);
+ peer.setNewSyncReplicationState(newState);
+ return newState;
+ }
+
+ public void transitPeerSyncReplicationState(String peerId) {
+ ReplicationPeerImpl peer = peerCache.get(peerId);
+ peer.transitSyncReplicationState();
+ }
+
/**
* Helper method to connect to a peer
* @param peerId peer's identifier
@@ -135,7 +142,9 @@ public class ReplicationPeers {
ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId);
boolean enabled = peerStorage.isPeerEnabled(peerId);
SyncReplicationState syncReplicationState = peerStorage.getPeerSyncReplicationState(peerId);
+ SyncReplicationState newSyncReplicationState =
+ peerStorage.getPeerNewSyncReplicationState(peerId);
return new ReplicationPeerImpl(ReplicationUtils.getPeerClusterConfiguration(peerConfig, conf),
- peerId, peerConfig, enabled, syncReplicationState);
+ peerId, peerConfig, enabled, syncReplicationState, newSyncReplicationState);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8c53906e/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java
index 9107cf6..a2cdfdf 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java
@@ -53,7 +53,12 @@ public class ZKReplicationPeerStorage extends ZKReplicationStorageBase
public static final byte[] DISABLED_ZNODE_BYTES =
toByteArray(ReplicationProtos.ReplicationState.State.DISABLED);
- public static final String SYNCHRONOUS_REPLICATION_STATE_ZNODE = "sync-rep-state";
+ public static final String SYNC_REPLICATION_STATE_ZNODE = "sync-rep-state";
+
+ public static final String NEW_SYNC_REPLICATION_STATE_ZNODE = "new-sync-rep-state";
+
+ public static final byte[] NONE_STATE_ZNODE_BYTES =
+ SyncReplicationState.toByteArray(SyncReplicationState.NONE);
/**
* The name of the znode that contains the replication status of a remote slave (i.e. peer)
@@ -85,7 +90,11 @@ public class ZKReplicationPeerStorage extends ZKReplicationStorageBase
@VisibleForTesting
public String getSyncReplicationStateNode(String peerId) {
- return ZNodePaths.joinZNode(getPeerNode(peerId), SYNCHRONOUS_REPLICATION_STATE_ZNODE);
+ return ZNodePaths.joinZNode(getPeerNode(peerId), SYNC_REPLICATION_STATE_ZNODE);
+ }
+
+ private String getNewSyncReplicationStateNode(String peerId) {
+ return ZNodePaths.joinZNode(getPeerNode(peerId), NEW_SYNC_REPLICATION_STATE_ZNODE);
}
@Override
@@ -97,14 +106,15 @@ public class ZKReplicationPeerStorage extends ZKReplicationStorageBase
ZKUtilOp.createAndFailSilent(getPeerStateNode(peerId),
enabled ? ENABLED_ZNODE_BYTES : DISABLED_ZNODE_BYTES),
ZKUtilOp.createAndFailSilent(getSyncReplicationStateNode(peerId),
- SyncReplicationState.toByteArray(syncReplicationState)));
+ SyncReplicationState.toByteArray(syncReplicationState)),
+ ZKUtilOp.createAndFailSilent(getNewSyncReplicationStateNode(peerId), NONE_STATE_ZNODE_BYTES));
try {
ZKUtil.createWithParents(zookeeper, peersZNode);
ZKUtil.multiOrSequential(zookeeper, multiOps, false);
} catch (KeeperException e) {
throw new ReplicationException(
"Could not add peer with id=" + peerId + ", peerConfig=>" + peerConfig + ", state=" +
- (enabled ? "ENABLED" : "DISABLED") + ", syncReplicationState=" + syncReplicationState,
+ (enabled ? "ENABLED" : "DISABLED") + ", syncReplicationState=" + syncReplicationState,
e);
}
}
@@ -136,7 +146,7 @@ public class ZKReplicationPeerStorage extends ZKReplicationStorageBase
ReplicationPeerConfigUtil.toByteArray(peerConfig));
} catch (KeeperException e) {
throw new ReplicationException(
- "There was a problem trying to save changes to the " + "replication peer " + peerId, e);
+ "There was a problem trying to save changes to the " + "replication peer " + peerId, e);
}
}
@@ -170,38 +180,63 @@ public class ZKReplicationPeerStorage extends ZKReplicationStorageBase
}
if (data == null || data.length == 0) {
throw new ReplicationException(
- "Replication peer config data shouldn't be empty, peerId=" + peerId);
+ "Replication peer config data shouldn't be empty, peerId=" + peerId);
}
try {
return ReplicationPeerConfigUtil.parsePeerFrom(data);
} catch (DeserializationException e) {
throw new ReplicationException(
- "Failed to parse replication peer config for peer with id=" + peerId, e);
+ "Failed to parse replication peer config for peer with id=" + peerId, e);
}
}
@Override
- public void setPeerSyncReplicationState(String peerId, SyncReplicationState state)
+ public void setPeerNewSyncReplicationState(String peerId, SyncReplicationState state)
throws ReplicationException {
try {
- ZKUtil.setData(zookeeper, getSyncReplicationStateNode(peerId),
+ ZKUtil.createSetData(zookeeper, getNewSyncReplicationStateNode(peerId),
SyncReplicationState.toByteArray(state));
} catch (KeeperException e) {
throw new ReplicationException(
- "Unable to change the cluster state for the synchronous replication peer with id=" + peerId,
- e);
+ "Unable to set the new sync replication state for peer with id=" + peerId, e);
}
}
@Override
- public SyncReplicationState getPeerSyncReplicationState(String peerId)
+ public void transitPeerSyncReplicationState(String peerId) throws ReplicationException {
+ String newStateNode = getNewSyncReplicationStateNode(peerId);
+ try {
+ byte[] data = ZKUtil.getData(zookeeper, newStateNode);
+ ZKUtil.multiOrSequential(zookeeper,
+ Arrays.asList(ZKUtilOp.setData(newStateNode, NONE_STATE_ZNODE_BYTES),
+ ZKUtilOp.setData(getSyncReplicationStateNode(peerId), data)),
+ false);
+ } catch (KeeperException | InterruptedException e) {
+ throw new ReplicationException(
+ "Error transiting sync replication state for peer with id=" + peerId, e);
+ }
+ }
+
+ private SyncReplicationState getSyncReplicationState(String peerId, String path)
throws ReplicationException {
try {
- byte[] data = ZKUtil.getData(zookeeper, getSyncReplicationStateNode(peerId));
+ byte[] data = ZKUtil.getData(zookeeper, path);
return SyncReplicationState.parseFrom(data);
} catch (KeeperException | InterruptedException | IOException e) {
throw new ReplicationException(
- "Error getting cluster state for the synchronous replication peer with id=" + peerId, e);
+ "Error getting sync replication state of path " + path + " for peer with id=" + peerId, e);
}
}
+
+ @Override
+ public SyncReplicationState getPeerNewSyncReplicationState(String peerId)
+ throws ReplicationException {
+ return getSyncReplicationState(peerId, getNewSyncReplicationStateNode(peerId));
+ }
+
+ @Override
+ public SyncReplicationState getPeerSyncReplicationState(String peerId)
+ throws ReplicationException {
+ return getSyncReplicationState(peerId, getSyncReplicationStateNode(peerId));
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8c53906e/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java
index 8d2b55f..ba340cb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java
@@ -1236,7 +1236,7 @@ public interface MasterObserver {
* Called before transit current cluster state for the specified synchronous replication peer
* @param ctx the environment to interact with the framework and master
* @param peerId a short name that identifies the peer
- * @param state a new state
+ * @param state the new state
*/
default void preTransitReplicationPeerSyncReplicationState(
final ObserverContext<MasterCoprocessorEnvironment> ctx, String peerId,
@@ -1247,11 +1247,12 @@ public interface MasterObserver {
* Called after transit current cluster state for the specified synchronous replication peer
* @param ctx the environment to interact with the framework and master
* @param peerId a short name that identifies the peer
- * @param state a new state
+ * @param from the old state
+ * @param to the new state
*/
default void postTransitReplicationPeerSyncReplicationState(
final ObserverContext<MasterCoprocessorEnvironment> ctx, String peerId,
- SyncReplicationState state) throws IOException {
+ SyncReplicationState from, SyncReplicationState to) throws IOException {
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/8c53906e/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index e90c43b..2809efa 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -130,10 +130,10 @@ import org.apache.hadoop.hbase.master.procedure.ModifyTableProcedure;
import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
import org.apache.hadoop.hbase.master.procedure.RecoverMetaProcedure;
import org.apache.hadoop.hbase.master.procedure.TruncateTableProcedure;
+import org.apache.hadoop.hbase.master.replication.AbstractPeerProcedure;
import org.apache.hadoop.hbase.master.replication.AddPeerProcedure;
import org.apache.hadoop.hbase.master.replication.DisablePeerProcedure;
import org.apache.hadoop.hbase.master.replication.EnablePeerProcedure;
-import org.apache.hadoop.hbase.master.replication.ModifyPeerProcedure;
import org.apache.hadoop.hbase.master.replication.RemovePeerProcedure;
import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
import org.apache.hadoop.hbase.master.replication.TransitPeerSyncReplicationStateProcedure;
@@ -3405,7 +3405,7 @@ public class HMaster extends HRegionServer implements MasterServices {
return favoredNodesManager;
}
- private long executePeerProcedure(ModifyPeerProcedure procedure) throws IOException {
+ private long executePeerProcedure(AbstractPeerProcedure<?> procedure) throws IOException {
long procId = procedureExecutor.submitProcedure(procedure);
procedure.getLatch().await();
return procId;
http://git-wip-us.apache.org/repos/asf/hbase/blob/8c53906e/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
index cc008bd..158a1d0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
@@ -1531,22 +1531,22 @@ public class MasterCoprocessorHost
});
}
- public void preTransitReplicationPeerSyncReplicationState(final String peerId,
- final SyncReplicationState clusterState) throws IOException {
+ public void preTransitReplicationPeerSyncReplicationState(String peerId,
+ SyncReplicationState state) throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
public void call(MasterObserver observer) throws IOException {
- observer.preTransitReplicationPeerSyncReplicationState(this, peerId, clusterState);
+ observer.preTransitReplicationPeerSyncReplicationState(this, peerId, state);
}
});
}
- public void postTransitReplicationPeerSyncReplicationState(final String peerId,
- final SyncReplicationState clusterState) throws IOException {
+ public void postTransitReplicationPeerSyncReplicationState(String peerId,
+ SyncReplicationState from, SyncReplicationState to) throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
public void call(MasterObserver observer) throws IOException {
- observer.postTransitReplicationPeerSyncReplicationState(this, peerId, clusterState);
+ observer.postTransitReplicationPeerSyncReplicationState(this, peerId, from, to);
}
});
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8c53906e/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java
index 0ad8a63..6679d78 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java
@@ -46,7 +46,7 @@ public abstract class AbstractPeerProcedure<TState>
protected AbstractPeerProcedure(String peerId) {
this.peerId = peerId;
- this.latch = ProcedurePrepareLatch.createLatch(2, 0);
+ this.latch = ProcedurePrepareLatch.createLatch(2, 1);
}
public ProcedurePrepareLatch getLatch() {
@@ -94,4 +94,16 @@ public abstract class AbstractPeerProcedure<TState>
super.deserializeStateData(serializer);
peerId = serializer.deserialize(PeerProcedureStateData.class).getPeerId();
}
+
+ @Override
+ protected void rollbackState(MasterProcedureEnv env, TState state)
+ throws IOException, InterruptedException {
+ if (state == getInitialState()) {
+ // actually the peer related operations has no rollback, but if we haven't done any
+ // modifications on the peer storage yet, we can just return.
+ return;
+ }
+ throw new UnsupportedOperationException();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8c53906e/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
index ea2e314..32b8ea1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
@@ -328,17 +328,6 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
}
@Override
- protected void rollbackState(MasterProcedureEnv env, PeerModificationState state)
- throws IOException, InterruptedException {
- if (state == PeerModificationState.PRE_PEER_MODIFICATION) {
- // actually the peer related operations has no rollback, but if we haven't done any
- // modifications on the peer storage yet, we can just return.
- return;
- }
- throw new UnsupportedOperationException();
- }
-
- @Override
protected PeerModificationState getState(int stateId) {
return PeerModificationState.forNumber(stateId);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8c53906e/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java
index ba9bcdc..d51ea63 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java
@@ -54,6 +54,8 @@ public class RefreshPeerProcedure extends Procedure<MasterProcedureEnv>
justification = "Will never change after construction")
private ServerName targetServer;
+ private int stage;
+
private boolean dispatched;
private ProcedureEvent<?> event;
@@ -64,9 +66,15 @@ public class RefreshPeerProcedure extends Procedure<MasterProcedureEnv>
}
public RefreshPeerProcedure(String peerId, PeerOperationType type, ServerName targetServer) {
+ this(peerId, type, targetServer, 0);
+ }
+
+ public RefreshPeerProcedure(String peerId, PeerOperationType type, ServerName targetServer,
+ int stage) {
this.peerId = peerId;
this.type = type;
this.targetServer = targetServer;
+ this.stage = stage;
}
@Override
@@ -91,6 +99,8 @@ public class RefreshPeerProcedure extends Procedure<MasterProcedureEnv>
return PeerModificationType.DISABLE_PEER;
case UPDATE_CONFIG:
return PeerModificationType.UPDATE_PEER_CONFIG;
+ case TRANSIT_SYNC_REPLICATION_STATE:
+ return PeerModificationType.TRANSIT_SYNC_REPLICATION_STATE;
default:
throw new IllegalArgumentException("Unknown type: " + type);
}
@@ -108,6 +118,8 @@ public class RefreshPeerProcedure extends Procedure<MasterProcedureEnv>
return PeerOperationType.DISABLE;
case UPDATE_PEER_CONFIG:
return PeerOperationType.UPDATE_CONFIG;
+ case TRANSIT_SYNC_REPLICATION_STATE:
+ return PeerOperationType.TRANSIT_SYNC_REPLICATION_STATE;
default:
throw new IllegalArgumentException("Unknown type: " + type);
}
@@ -118,7 +130,8 @@ public class RefreshPeerProcedure extends Procedure<MasterProcedureEnv>
assert targetServer.equals(remote);
return new ServerOperation(this, getProcId(), RefreshPeerCallable.class,
RefreshPeerParameter.newBuilder().setPeerId(peerId).setType(toPeerModificationType(type))
- .setTargetServer(ProtobufUtil.toServerName(remote)).build().toByteArray());
+ .setTargetServer(ProtobufUtil.toServerName(remote)).setStage(stage).build()
+ .toByteArray());
}
private void complete(MasterProcedureEnv env, Throwable error) {
@@ -193,7 +206,7 @@ public class RefreshPeerProcedure extends Procedure<MasterProcedureEnv>
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
serializer.serialize(
RefreshPeerStateData.newBuilder().setPeerId(peerId).setType(toPeerModificationType(type))
- .setTargetServer(ProtobufUtil.toServerName(targetServer)).build());
+ .setTargetServer(ProtobufUtil.toServerName(targetServer)).setStage(stage).build());
}
@Override
@@ -202,5 +215,6 @@ public class RefreshPeerProcedure extends Procedure<MasterProcedureEnv>
peerId = data.getPeerId();
type = toPeerOperationType(data.getType());
targetServer = ProtobufUtil.toServerName(data.getTargetServer());
+ stage = data.getStage();
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8c53906e/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
index ff778a8..0dc922d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.master.replication;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.EnumMap;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
@@ -50,6 +49,9 @@ import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
+import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
+
/**
* Manages and performs all replication admin operations.
* <p>
@@ -64,15 +66,11 @@ public class ReplicationPeerManager {
private final ConcurrentMap<String, ReplicationPeerDescription> peers;
- private final EnumMap<SyncReplicationState, EnumSet<SyncReplicationState>> allowedTransition =
- new EnumMap<SyncReplicationState, EnumSet<SyncReplicationState>>(SyncReplicationState.class) {
- {
- put(SyncReplicationState.ACTIVE, EnumSet.of(SyncReplicationState.DOWNGRADE_ACTIVE));
- put(SyncReplicationState.STANDBY, EnumSet.of(SyncReplicationState.DOWNGRADE_ACTIVE));
- put(SyncReplicationState.DOWNGRADE_ACTIVE,
- EnumSet.of(SyncReplicationState.STANDBY, SyncReplicationState.ACTIVE));
- }
- };
+ private final ImmutableMap<SyncReplicationState, EnumSet<SyncReplicationState>>
+ allowedTransition = Maps.immutableEnumMap(ImmutableMap.of(SyncReplicationState.ACTIVE,
+ EnumSet.of(SyncReplicationState.DOWNGRADE_ACTIVE), SyncReplicationState.STANDBY,
+ EnumSet.of(SyncReplicationState.DOWNGRADE_ACTIVE), SyncReplicationState.DOWNGRADE_ACTIVE,
+ EnumSet.of(SyncReplicationState.STANDBY, SyncReplicationState.ACTIVE)));
ReplicationPeerManager(ReplicationPeerStorage peerStorage, ReplicationQueueStorage queueStorage,
ConcurrentMap<String, ReplicationPeerDescription> peers) {
@@ -165,9 +163,9 @@ public class ReplicationPeerManager {
if (!isStringEquals(peerConfig.getRemoteWALDir(), oldPeerConfig.getRemoteWALDir())) {
throw new DoNotRetryIOException(
- "Changing the remote wal dir on an existing peer is not allowed. Existing remote wal " +
- "dir '" + oldPeerConfig.getRemoteWALDir() + "' for peer " + peerId +
- " does not match new remote wal dir '" + peerConfig.getRemoteWALDir() + "'");
+ "Changing the remote wal dir on an existing peer is not allowed. Existing remote wal " +
+ "dir '" + oldPeerConfig.getRemoteWALDir() + "' for peer " + peerId +
+ " does not match new remote wal dir '" + peerConfig.getRemoteWALDir() + "'");
}
if (oldPeerConfig.isSyncReplication()) {
@@ -180,15 +178,19 @@ public class ReplicationPeerManager {
return desc;
}
- public void preTransitPeerSyncReplicationState(String peerId, SyncReplicationState state)
- throws DoNotRetryIOException {
+ /**
+ * @return the old state.
+ */
+ public SyncReplicationState preTransitPeerSyncReplicationState(String peerId,
+ SyncReplicationState state) throws DoNotRetryIOException {
ReplicationPeerDescription desc = checkPeerExists(peerId);
SyncReplicationState fromState = desc.getSyncReplicationState();
EnumSet<SyncReplicationState> allowedToStates = allowedTransition.get(fromState);
if (allowedToStates == null || !allowedToStates.contains(state)) {
throw new DoNotRetryIOException("Can not transit current cluster state from " + fromState +
- " to " + state + " for peer id=" + peerId);
+ " to " + state + " for peer id=" + peerId);
}
+ return fromState;
}
public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
@@ -199,8 +201,8 @@ public class ReplicationPeerManager {
}
ReplicationPeerConfig copiedPeerConfig = ReplicationPeerConfig.newBuilder(peerConfig).build();
SyncReplicationState syncReplicationState =
- copiedPeerConfig.isSyncReplication() ? SyncReplicationState.DOWNGRADE_ACTIVE
- : SyncReplicationState.NONE;
+ copiedPeerConfig.isSyncReplication() ? SyncReplicationState.DOWNGRADE_ACTIVE
+ : SyncReplicationState.NONE;
peerStorage.addPeer(peerId, copiedPeerConfig, enabled, syncReplicationState);
peers.put(peerId,
new ReplicationPeerDescription(peerId, enabled, copiedPeerConfig, syncReplicationState));
@@ -240,7 +242,7 @@ public class ReplicationPeerManager {
ReplicationPeerDescription desc = peers.get(peerId);
ReplicationPeerConfig oldPeerConfig = desc.getPeerConfig();
ReplicationPeerConfigBuilder newPeerConfigBuilder =
- ReplicationPeerConfig.newBuilder(peerConfig);
+ ReplicationPeerConfig.newBuilder(peerConfig);
// we need to use the new conf to overwrite the old one.
newPeerConfigBuilder.putAllConfiguration(oldPeerConfig.getConfiguration());
newPeerConfigBuilder.putAllConfiguration(peerConfig.getConfiguration());
@@ -257,7 +259,7 @@ public class ReplicationPeerManager {
return new ArrayList<>(peers.values());
}
return peers.values().stream().filter(r -> pattern.matcher(r.getPeerId()).matches())
- .collect(Collectors.toList());
+ .collect(Collectors.toList());
}
public Optional<ReplicationPeerConfig> getPeerConfig(String peerId) {
@@ -269,12 +271,23 @@ public class ReplicationPeerManager {
queueStorage.removeLastSequenceIds(peerId);
}
- public void transitPeerSyncReplicationState(String peerId, SyncReplicationState state)
+ public void setPeerNewSyncReplicationState(String peerId, SyncReplicationState state)
throws ReplicationException {
+ peerStorage.setPeerNewSyncReplicationState(peerId, state);
+ }
+
+ public void transitPeerSyncReplicationState(String peerId, SyncReplicationState newState)
+ throws ReplicationException {
+ if (peerStorage.getPeerNewSyncReplicationState(peerId) != SyncReplicationState.NONE) {
+ // Only transit if this is not a retry
+ peerStorage.transitPeerSyncReplicationState(peerId);
+ }
ReplicationPeerDescription desc = peers.get(peerId);
- peerStorage.setPeerSyncReplicationState(peerId, state);
- peers.put(peerId,
- new ReplicationPeerDescription(peerId, desc.isEnabled(), desc.getPeerConfig(), state));
+ if (desc.getSyncReplicationState() != newState) {
+ // Only recreate the desc if this is not a retry
+ peers.put(peerId,
+ new ReplicationPeerDescription(peerId, desc.isEnabled(), desc.getPeerConfig(), newState));
+ }
}
public void removeAllQueuesAndHFileRefs(String peerId) throws ReplicationException {
@@ -301,10 +314,10 @@ public class ReplicationPeerManager {
// If replicate_all flag is true, it means all user tables will be replicated to peer cluster.
// Then allow config exclude namespaces or exclude table-cfs which can't be replicated to peer
// cluster.
- if ((peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty())
- || (peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())) {
- throw new DoNotRetryIOException("Need clean namespaces or table-cfs config firstly "
- + "when you want replicate all cluster");
+ if ((peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty()) ||
+ (peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())) {
+ throw new DoNotRetryIOException("Need clean namespaces or table-cfs config firstly " +
+ "when you want replicate all cluster");
}
checkNamespacesAndTableCfsConfigConflict(peerConfig.getExcludeNamespaces(),
peerConfig.getExcludeTableCFsMap());
@@ -312,13 +325,13 @@ public class ReplicationPeerManager {
// If replicate_all flag is false, it means all user tables can't be replicated to peer
// cluster. Then allow to config namespaces or table-cfs which will be replicated to peer
// cluster.
- if ((peerConfig.getExcludeNamespaces() != null
- && !peerConfig.getExcludeNamespaces().isEmpty())
- || (peerConfig.getExcludeTableCFsMap() != null
- && !peerConfig.getExcludeTableCFsMap().isEmpty())) {
+ if ((peerConfig.getExcludeNamespaces() != null &&
+ !peerConfig.getExcludeNamespaces().isEmpty()) ||
+ (peerConfig.getExcludeTableCFsMap() != null &&
+ !peerConfig.getExcludeTableCFsMap().isEmpty())) {
throw new DoNotRetryIOException(
- "Need clean exclude-namespaces or exclude-table-cfs config firstly"
- + " when replicate_all flag is false");
+ "Need clean exclude-namespaces or exclude-table-cfs config firstly" +
+ " when replicate_all flag is false");
}
checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(),
peerConfig.getTableCFsMap());
@@ -338,11 +351,11 @@ public class ReplicationPeerManager {
// TODO: Add namespace, replicat_all flag back
if (peerConfig.replicateAllUserTables()) {
throw new DoNotRetryIOException(
- "Only support replicated table config for sync replication peer");
+ "Only support replicated table config for sync replication peer");
}
if (peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty()) {
throw new DoNotRetryIOException(
- "Only support replicated table config for sync replication peer");
+ "Only support replicated table config for sync replication peer");
}
if (peerConfig.getTableCFsMap() == null || peerConfig.getTableCFsMap().isEmpty()) {
throw new DoNotRetryIOException("Need config replicated tables for sync replication peer");
@@ -350,7 +363,7 @@ public class ReplicationPeerManager {
for (List<String> cfs : peerConfig.getTableCFsMap().values()) {
if (cfs != null && !cfs.isEmpty()) {
throw new DoNotRetryIOException(
- "Only support replicated table config for sync replication peer");
+ "Only support replicated table config for sync replication peer");
}
}
}
@@ -394,7 +407,7 @@ public class ReplicationPeerManager {
private void checkConfiguredWALEntryFilters(ReplicationPeerConfig peerConfig)
throws DoNotRetryIOException {
String filterCSV = peerConfig.getConfiguration()
- .get(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY);
+ .get(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY);
if (filterCSV != null && !filterCSV.isEmpty()) {
String[] filters = filterCSV.split(",");
for (String filter : filters) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/8c53906e/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
index aad3b06..8fc932f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
@@ -18,11 +18,12 @@
package org.apache.hadoop.hbase.master.replication;
import java.io.IOException;
-
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
-import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
@@ -32,26 +33,29 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationState;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerSyncReplicationStateTransitionState;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.TransitPeerSyncReplicationStateStateData;
/**
- * The procedure for transit current cluster state for a synchronous replication peer.
+ * The procedure for transit current sync replication state for a synchronous replication peer.
*/
@InterfaceAudience.Private
-public class TransitPeerSyncReplicationStateProcedure extends ModifyPeerProcedure {
+public class TransitPeerSyncReplicationStateProcedure
+ extends AbstractPeerProcedure<PeerSyncReplicationStateTransitionState> {
private static final Logger LOG =
LoggerFactory.getLogger(TransitPeerSyncReplicationStateProcedure.class);
- private SyncReplicationState state;
+ private SyncReplicationState fromState;
+
+ private SyncReplicationState toState;
public TransitPeerSyncReplicationStateProcedure() {
}
public TransitPeerSyncReplicationStateProcedure(String peerId, SyncReplicationState state) {
super(peerId);
- this.state = state;
+ this.toState = state;
}
@Override
@@ -60,99 +64,154 @@ public class TransitPeerSyncReplicationStateProcedure extends ModifyPeerProcedur
}
@Override
- protected void prePeerModification(MasterProcedureEnv env)
- throws IOException, ReplicationException {
- MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
- if (cpHost != null) {
- cpHost.preTransitReplicationPeerSyncReplicationState(peerId, state);
+ protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
+ super.serializeStateData(serializer);
+ TransitPeerSyncReplicationStateStateData.Builder builder =
+ TransitPeerSyncReplicationStateStateData.newBuilder()
+ .setToState(ReplicationPeerConfigUtil.toSyncReplicationState(toState));
+ if (fromState != null) {
+ builder.setFromState(ReplicationPeerConfigUtil.toSyncReplicationState(fromState));
}
- env.getReplicationPeerManager().preTransitPeerSyncReplicationState(peerId, state);
+ serializer.serialize(builder.build());
}
@Override
- protected void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException {
- env.getReplicationPeerManager().transitPeerSyncReplicationState(peerId, state);
+ protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
+ super.deserializeStateData(serializer);
+ TransitPeerSyncReplicationStateStateData data =
+ serializer.deserialize(TransitPeerSyncReplicationStateStateData.class);
+ toState = ReplicationPeerConfigUtil.toSyncReplicationState(data.getToState());
+ if (data.hasFromState()) {
+ fromState = ReplicationPeerConfigUtil.toSyncReplicationState(data.getFromState());
+ }
}
@Override
- protected void postPeerModification(MasterProcedureEnv env)
- throws IOException, ReplicationException {
- LOG.info("Successfully transit current cluster state to {} in synchronous replication peer {}",
- state, peerId);
+ protected PeerSyncReplicationStateTransitionState getState(int stateId) {
+ return PeerSyncReplicationStateTransitionState.forNumber(stateId);
+ }
+
+ @Override
+ protected int getStateId(PeerSyncReplicationStateTransitionState state) {
+ return state.getNumber();
+ }
+
+ @Override
+ protected PeerSyncReplicationStateTransitionState getInitialState() {
+ return PeerSyncReplicationStateTransitionState.PRE_PEER_SYNC_REPLICATION_STATE_TRANSITION;
+ }
+
+ private void preTransit(MasterProcedureEnv env) throws IOException {
MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
if (cpHost != null) {
- env.getMasterCoprocessorHost().postTransitReplicationPeerSyncReplicationState(peerId, state);
+ cpHost.preTransitReplicationPeerSyncReplicationState(peerId, toState);
}
+ fromState = env.getReplicationPeerManager().preTransitPeerSyncReplicationState(peerId, toState);
}
- @Override
- protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
- super.serializeStateData(serializer);
- serializer.serialize(TransitPeerSyncReplicationStateStateData.newBuilder()
- .setSyncReplicationState(ReplicationPeerConfigUtil.toSyncReplicationState(state)).build());
+ private void postTransit(MasterProcedureEnv env) throws IOException {
+ LOG.info(
+ "Successfully transit current cluster state from {} to {} for sync replication peer {}",
+ fromState, toState, peerId);
+ MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
+ if (cpHost != null) {
+ env.getMasterCoprocessorHost().postTransitReplicationPeerSyncReplicationState(peerId,
+ fromState, toState);
+ }
}
- @Override
- protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
- super.deserializeStateData(serializer);
- TransitPeerSyncReplicationStateStateData data =
- serializer.deserialize(TransitPeerSyncReplicationStateStateData.class);
- state = ReplicationPeerConfigUtil.toSyncReplicationState(data.getSyncReplicationState());
+ private List<RegionInfo> getRegionsToReopen(MasterProcedureEnv env) {
+ return env.getReplicationPeerManager().getPeerConfig(peerId).get().getTableCFsMap().keySet()
+ .stream()
+ .flatMap(tn -> env.getAssignmentManager().getRegionStates().getRegionsOfTable(tn).stream())
+ .collect(Collectors.toList());
}
@Override
- protected Flow executeFromState(MasterProcedureEnv env, PeerModificationState state)
+ protected Flow executeFromState(MasterProcedureEnv env,
+ PeerSyncReplicationStateTransitionState state)
throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
switch (state) {
- case PRE_PEER_MODIFICATION:
+ case PRE_PEER_SYNC_REPLICATION_STATE_TRANSITION:
try {
- prePeerModification(env);
+ preTransit(env);
} catch (IOException e) {
- LOG.warn("{} failed to call pre CP hook or the pre check is failed for peer {}, " +
- "mark the procedure as failure and give up", getClass().getName(), peerId, e);
- setFailure("master-" + getPeerOperationType().name().toLowerCase() + "-peer", e);
- releaseLatch();
+ LOG.warn("Failed to call pre CP hook or the pre check is failed for peer {} " +
+ "when transiting sync replication peer state to {}, " +
+ "mark the procedure as failure and give up", peerId, toState, e);
+ setFailure("master-transit-peer-sync-replication-state", e);
return Flow.NO_MORE_STATE;
- } catch (ReplicationException e) {
- LOG.warn("{} failed to call prePeerModification for peer {}, retry", getClass().getName(),
- peerId, e);
- throw new ProcedureYieldException();
}
- setNextState(PeerModificationState.UPDATE_PEER_STORAGE);
+ setNextState(PeerSyncReplicationStateTransitionState.SET_PEER_NEW_SYNC_REPLICATION_STATE);
return Flow.HAS_MORE_STATE;
- case UPDATE_PEER_STORAGE:
+ case SET_PEER_NEW_SYNC_REPLICATION_STATE:
try {
- updatePeerStorage(env);
+ env.getReplicationPeerManager().setPeerNewSyncReplicationState(peerId, toState);
} catch (ReplicationException e) {
- LOG.warn("{} update peer storage for peer {} failed, retry", getClass().getName(), peerId,
- e);
+ LOG.warn("Failed to update peer storage for peer {} when starting transiting sync " +
+ "replication peer state from {} to {}, retry", peerId, fromState, toState, e);
throw new ProcedureYieldException();
}
- setNextState(PeerModificationState.REFRESH_PEER_ON_RS);
+ setNextState(
+ PeerSyncReplicationStateTransitionState.REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_BEGIN);
+ return Flow.HAS_MORE_STATE;
+ case REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_BEGIN:
+ addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream()
+ .map(sn -> new RefreshPeerProcedure(peerId, getPeerOperationType(), sn, 0))
+ .toArray(RefreshPeerProcedure[]::new));
+ if (fromState == SyncReplicationState.STANDBY &&
+ toState == SyncReplicationState.DOWNGRADE_ACTIVE) {
+ setNextState(PeerSyncReplicationStateTransitionState.REPLAY_REMOTE_WAL_IN_PEER);
+ } else {
+ setNextState(PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER);
+ }
+ return Flow.HAS_MORE_STATE;
+ case REPLAY_REMOTE_WAL_IN_PEER:
+ // TODO: replay remote wal when transiting from S to DA.
+ setNextState(PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER);
return Flow.HAS_MORE_STATE;
- case REFRESH_PEER_ON_RS:
- // TODO: Need add child procedure for every RegionServer
- setNextState(PeerModificationState.POST_PEER_MODIFICATION);
+ case REOPEN_ALL_REGIONS_IN_PEER:
+ try {
+ addChildProcedure(
+ env.getAssignmentManager().createReopenProcedures(getRegionsToReopen(env)));
+ } catch (IOException e) {
+ LOG.warn("Failed to schedule region reopen for peer {} when starting transiting sync " +
+ "replication peer state from {} to {}, retry", peerId, fromState, toState, e);
+ throw new ProcedureYieldException();
+ }
+ setNextState(
+ PeerSyncReplicationStateTransitionState.TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE);
return Flow.HAS_MORE_STATE;
- case POST_PEER_MODIFICATION:
+ case TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE:
try {
- postPeerModification(env);
+ env.getReplicationPeerManager().transitPeerSyncReplicationState(peerId, toState);
} catch (ReplicationException e) {
- LOG.warn("{} failed to call postPeerModification for peer {}, retry",
- getClass().getName(), peerId, e);
+ LOG.warn("Failed to update peer storage for peer {} when ending transiting sync " +
+ "replication peer state from {} to {}, retry", peerId, fromState, toState, e);
throw new ProcedureYieldException();
+ }
+ setNextState(
+ PeerSyncReplicationStateTransitionState.REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_END);
+ return Flow.HAS_MORE_STATE;
+ case REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_END:
+ addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream()
+ .map(sn -> new RefreshPeerProcedure(peerId, getPeerOperationType(), sn, 1))
+ .toArray(RefreshPeerProcedure[]::new));
+ setNextState(
+ PeerSyncReplicationStateTransitionState.POST_PEER_SYNC_REPLICATION_STATE_TRANSITION);
+ case POST_PEER_SYNC_REPLICATION_STATE_TRANSITION:
+ try {
+ postTransit(env);
} catch (IOException e) {
- LOG.warn("{} failed to call post CP hook for peer {}, " +
- "ignore since the procedure has already done", getClass().getName(), peerId, e);
+ LOG.warn(
+ "Failed to call post CP hook for peer {} when transiting sync replication " +
+ "peer state from {} to {}, ignore since the procedure has already done",
+ peerId, fromState, toState, e);
}
- releaseLatch();
return Flow.NO_MORE_STATE;
default:
throw new UnsupportedOperationException("unhandled state=" + state);
}
}
- private void releaseLatch() {
- ProcedurePrepareLatch.releaseLatch(latch, this);
- }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8c53906e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 240de85..ddd6a06 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -1802,21 +1802,27 @@ public class HRegionServer extends HasThread implements
* be hooked up to WAL.
*/
private void setupWALAndReplication() throws IOException {
+ boolean isMasterNoTableOrSystemTableOnly = this instanceof HMaster &&
+ (!LoadBalancer.isTablesOnMaster(conf) || LoadBalancer.isSystemTablesOnlyOnMaster(conf));
+ if (isMasterNoTableOrSystemTableOnly) {
+ conf.setBoolean(HConstants.SYNC_REPLICATION_ENABLED, false);
+ }
WALFactory factory = new WALFactory(conf, serverName.toString());
+ if (!isMasterNoTableOrSystemTableOnly) {
+ // TODO Replication make assumptions here based on the default filesystem impl
+ Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
+ String logName = AbstractFSWALProvider.getWALDirectoryName(this.serverName.toString());
- // TODO Replication make assumptions here based on the default filesystem impl
- Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
- String logName = AbstractFSWALProvider.getWALDirectoryName(this.serverName.toString());
-
- Path logDir = new Path(walRootDir, logName);
- LOG.debug("logDir={}", logDir);
- if (this.walFs.exists(logDir)) {
- throw new RegionServerRunningException(
- "Region server has already created directory at " + this.serverName.toString());
+ Path logDir = new Path(walRootDir, logName);
+ LOG.debug("logDir={}", logDir);
+ if (this.walFs.exists(logDir)) {
+ throw new RegionServerRunningException(
+ "Region server has already created directory at " + this.serverName.toString());
+ }
+ // Instantiate replication if replication enabled. Pass it the log directories.
+ createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir,
+ factory.getWALProvider());
}
- // Instantiate replication if replication enabled. Pass it the log directories.
- createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir,
- factory.getWALProvider());
this.walFactory = factory;
}
@@ -2940,11 +2946,6 @@ public class HRegionServer extends HasThread implements
*/
private static void createNewReplicationInstance(Configuration conf, HRegionServer server,
FileSystem walFs, Path walDir, Path oldWALDir, WALProvider walProvider) throws IOException {
- if ((server instanceof HMaster) &&
- (!LoadBalancer.isTablesOnMaster(conf) || LoadBalancer.isSystemTablesOnlyOnMaster(conf))) {
- return;
- }
-
// read in the name of the source replication class from the config file.
String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME,
HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
http://git-wip-us.apache.org/repos/asf/hbase/blob/8c53906e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java
index 23ba773..4529943 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java
@@ -18,17 +18,22 @@
package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.hbase.replication.regionserver.PeerProcedureHandler;
+import org.apache.hadoop.hbase.replication.regionserver.SyncReplicationPeerInfoProvider;
import org.apache.yetus.audience.InterfaceAudience;
/**
- * A source for a replication stream has to expose this service.
- * This service allows an application to hook into the
- * regionserver and watch for new transactions.
+ * A source for a replication stream has to expose this service. This service allows an application
+ * to hook into the regionserver and watch for new transactions.
*/
@InterfaceAudience.Private
public interface ReplicationSourceService extends ReplicationService {
/**
+ * Returns an info provider for sync replication peer.
+ */
+ SyncReplicationPeerInfoProvider getSyncReplicationPeerInfoProvider();
+
+ /**
* Returns a Handler to handle peer procedures.
*/
PeerProcedureHandler getPeerProcedureHandler();
http://git-wip-us.apache.org/repos/asf/hbase/blob/8c53906e/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerActionListener.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerActionListener.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerActionListener.java
index 6df2af9..efafd09 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerActionListener.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerActionListener.java
@@ -28,8 +28,8 @@ import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
public interface PeerActionListener {
- default void peerRemoved(String peerId) {}
+ static final PeerActionListener DUMMY = new PeerActionListener() {};
default void peerSyncReplicationStateChange(String peerId, SyncReplicationState from,
- SyncReplicationState to) {}
+ SyncReplicationState to, int stage) {}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8c53906e/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandler.java
index 65da9af..52b604b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandler.java
@@ -15,11 +15,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
-
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.yetus.audience.InterfaceAudience;
@@ -29,13 +28,16 @@ import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
public interface PeerProcedureHandler {
- public void addPeer(String peerId) throws ReplicationException, IOException;
+ void addPeer(String peerId) throws ReplicationException, IOException;
+
+ void removePeer(String peerId) throws ReplicationException, IOException;
- public void removePeer(String peerId) throws ReplicationException, IOException;
+ void disablePeer(String peerId) throws ReplicationException, IOException;
- public void disablePeer(String peerId) throws ReplicationException, IOException;
+ void enablePeer(String peerId) throws ReplicationException, IOException;
- public void enablePeer(String peerId) throws ReplicationException, IOException;
+ void updatePeerConfig(String peerId) throws ReplicationException, IOException;
- public void updatePeerConfig(String peerId) throws ReplicationException, IOException;
+ void transitSyncReplicationPeerState(String peerId, int stage, HRegionServer rs)
+ throws ReplicationException, IOException;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8c53906e/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
index 78c1977..7fc9f53 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
@@ -19,23 +19,32 @@ package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
import java.util.concurrent.locks.Lock;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
+import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.util.KeyLocker;
import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
@InterfaceAudience.Private
public class PeerProcedureHandlerImpl implements PeerProcedureHandler {
+ private static final Logger LOG = LoggerFactory.getLogger(PeerProcedureHandlerImpl.class);
+
private final ReplicationSourceManager replicationSourceManager;
+ private final PeerActionListener peerActionListener;
private final KeyLocker<String> peersLock = new KeyLocker<>();
- public PeerProcedureHandlerImpl(ReplicationSourceManager replicationSourceManager) {
+ public PeerProcedureHandlerImpl(ReplicationSourceManager replicationSourceManager,
+ PeerActionListener peerActionListener) {
this.replicationSourceManager = replicationSourceManager;
+ this.peerActionListener = peerActionListener;
}
@Override
@@ -61,7 +70,6 @@ public class PeerProcedureHandlerImpl implements PeerProcedureHandler {
}
private void refreshPeerState(String peerId) throws ReplicationException, IOException {
- PeerState newState;
Lock peerLock = peersLock.acquireLock(peerId);
ReplicationPeerImpl peer = null;
PeerState oldState = null;
@@ -72,7 +80,7 @@ public class PeerProcedureHandlerImpl implements PeerProcedureHandler {
throw new ReplicationException("Peer with id=" + peerId + " is not cached.");
}
oldState = peer.getPeerState();
- newState = replicationSourceManager.getReplicationPeers().refreshPeerState(peerId);
+ PeerState newState = replicationSourceManager.getReplicationPeers().refreshPeerState(peerId);
// RS need to start work with the new replication state change
if (oldState.equals(PeerState.ENABLED) && newState.equals(PeerState.DISABLED)) {
replicationSourceManager.refreshSources(peerId);
@@ -132,4 +140,42 @@ public class PeerProcedureHandlerImpl implements PeerProcedureHandler {
peerLock.unlock();
}
}
+
+ @Override
+ public void transitSyncReplicationPeerState(String peerId, int stage, HRegionServer rs)
+ throws ReplicationException, IOException {
+ ReplicationPeers replicationPeers = replicationSourceManager.getReplicationPeers();
+ Lock peerLock = peersLock.acquireLock(peerId);
+ try {
+ ReplicationPeerImpl peer = replicationPeers.getPeer(peerId);
+ if (peer == null) {
+ throw new ReplicationException("Peer with id=" + peerId + " is not cached.");
+ }
+ if (!peer.getPeerConfig().isSyncReplication()) {
+ throw new ReplicationException("Peer with id=" + peerId + " is not synchronous.");
+ }
+ SyncReplicationState newState = peer.getNewSyncReplicationState();
+ if (stage == 0) {
+ if (newState != SyncReplicationState.NONE) {
+ LOG.warn("The new sync replication state for peer {} has already been set to {}, " +
+ "this should be a retry, give up", peerId, newState);
+ return;
+ }
+ newState = replicationPeers.refreshPeerNewSyncReplicationState(peerId);
+ SyncReplicationState oldState = peer.getSyncReplicationState();
+ peerActionListener.peerSyncReplicationStateChange(peerId, oldState, newState, stage);
+ } else {
+ if (newState == SyncReplicationState.NONE) {
+ LOG.warn("The new sync replication state for peer {} has already been clear, and the " +
+ "current state is {}, this should be a retry, give up", peerId, newState);
+ return;
+ }
+ SyncReplicationState oldState = peer.getSyncReplicationState();
+ peerActionListener.peerSyncReplicationStateChange(peerId, oldState, newState, stage);
+ peer.transitSyncReplicationState();
+ }
+ } finally {
+ peerLock.unlock();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8c53906e/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RefreshPeerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RefreshPeerCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RefreshPeerCallable.java
index 7ada24b..8fe16bc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RefreshPeerCallable.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RefreshPeerCallable.java
@@ -35,12 +35,15 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.R
public class RefreshPeerCallable implements RSProcedureCallable {
private static final Logger LOG = Logger.getLogger(RefreshPeerCallable.class);
+
private HRegionServer rs;
private String peerId;
private PeerModificationType type;
+ private int stage;
+
private Exception initError;
@Override
@@ -67,6 +70,9 @@ public class RefreshPeerCallable implements RSProcedureCallable {
case UPDATE_PEER_CONFIG:
handler.updatePeerConfig(this.peerId);
break;
+ case TRANSIT_SYNC_REPLICATION_STATE:
+ handler.transitSyncReplicationPeerState(peerId, stage, rs);
+ break;
default:
throw new IllegalArgumentException("Unknown peer modification type: " + type);
}
@@ -80,6 +86,7 @@ public class RefreshPeerCallable implements RSProcedureCallable {
RefreshPeerParameter param = RefreshPeerParameter.parseFrom(parameter);
this.peerId = param.getPeerId();
this.type = param.getType();
+ this.stage = param.getStage();
} catch (InvalidProtocolBufferException e) {
initError = e;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8c53906e/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index 8290ac3..2846d2c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.replication.ReplicationTracker;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider;
import org.apache.hadoop.hbase.wal.WALProvider;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.yetus.audience.InterfaceAudience;
@@ -66,6 +67,7 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
private ReplicationTracker replicationTracker;
private Configuration conf;
private ReplicationSink replicationSink;
+ private SyncReplicationPeerInfoProvider syncReplicationPeerInfoProvider;
// Hosting server
private Server server;
/** Statistics thread schedule pool */
@@ -120,19 +122,30 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
} catch (KeeperException ke) {
throw new IOException("Could not read cluster id", ke);
}
+ SyncReplicationPeerMappingManager mapping = new SyncReplicationPeerMappingManager();
this.replicationManager = new ReplicationSourceManager(queueStorage, replicationPeers,
replicationTracker, conf, this.server, fs, logDir, oldLogDir, clusterId,
- walProvider != null ? walProvider.getWALFileLengthProvider() : p -> OptionalLong.empty());
+ walProvider != null ? walProvider.getWALFileLengthProvider() : p -> OptionalLong.empty(),
+ mapping);
+ this.syncReplicationPeerInfoProvider =
+ new SyncReplicationPeerInfoProviderImpl(replicationPeers, mapping);
+ PeerActionListener peerActionListener = PeerActionListener.DUMMY;
if (walProvider != null) {
walProvider
.addWALActionsListener(new ReplicationSourceWALActionListener(conf, replicationManager));
+ if (walProvider instanceof SyncReplicationWALProvider) {
+ SyncReplicationWALProvider syncWALProvider = (SyncReplicationWALProvider) walProvider;
+ peerActionListener = syncWALProvider;
+ syncWALProvider.setPeerInfoProvider(syncReplicationPeerInfoProvider);
+ }
}
this.statsThreadPeriod =
this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60);
LOG.debug("Replication stats-in-log period={} seconds", this.statsThreadPeriod);
this.replicationLoad = new ReplicationLoad();
- this.peerProcedureHandler = new PeerProcedureHandlerImpl(replicationManager);
+ this.peerProcedureHandler =
+ new PeerProcedureHandlerImpl(replicationManager, peerActionListener);
}
@Override
@@ -270,4 +283,9 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
MetricsSink sinkMetrics = this.replicationSink.getSinkMetrics();
this.replicationLoad.buildReplicationLoad(sourceMetricsList, sinkMetrics);
}
+
+ @Override
+ public SyncReplicationPeerInfoProvider getSyncReplicationPeerInfoProvider() {
+ return syncReplicationPeerInfoProvider;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8c53906e/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 70cd986..72d1771 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationListener;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
@@ -135,6 +136,8 @@ public class ReplicationSourceManager implements ReplicationListener {
// For recovered source, the queue id's format is peer_id-servername-*
private final ConcurrentMap<String, Map<String, NavigableSet<String>>> walsByIdRecoveredQueues;
+ private final SyncReplicationPeerMappingManager syncReplicationPeerMappingManager;
+
private final Configuration conf;
private final FileSystem fs;
// The paths to the latest log of each wal group, for new coming peers
@@ -169,9 +172,8 @@ public class ReplicationSourceManager implements ReplicationListener {
public ReplicationSourceManager(ReplicationQueueStorage queueStorage,
ReplicationPeers replicationPeers, ReplicationTracker replicationTracker, Configuration conf,
Server server, FileSystem fs, Path logDir, Path oldLogDir, UUID clusterId,
- WALFileLengthProvider walFileLengthProvider) throws IOException {
- // CopyOnWriteArrayList is thread-safe.
- // Generally, reading is more than modifying.
+ WALFileLengthProvider walFileLengthProvider,
+ SyncReplicationPeerMappingManager syncReplicationPeerMappingManager) throws IOException {
this.sources = new ConcurrentHashMap<>();
this.queueStorage = queueStorage;
this.replicationPeers = replicationPeers;
@@ -184,10 +186,11 @@ public class ReplicationSourceManager implements ReplicationListener {
this.fs = fs;
this.logDir = logDir;
this.oldLogDir = oldLogDir;
- this.sleepBeforeFailover = conf.getLong("replication.sleep.before.failover", 30000); // 30
- // seconds
+ // 30 seconds
+ this.sleepBeforeFailover = conf.getLong("replication.sleep.before.failover", 30000);
this.clusterId = clusterId;
this.walFileLengthProvider = walFileLengthProvider;
+ this.syncReplicationPeerMappingManager = syncReplicationPeerMappingManager;
this.replicationTracker.registerListener(this);
// It's preferable to failover 1 RS at a time, but with good zk servers
// more could be processed at the same time.
@@ -248,8 +251,11 @@ public class ReplicationSourceManager implements ReplicationListener {
}
/**
- * 1. Add peer to replicationPeers 2. Add the normal source and related replication queue 3. Add
- * HFile Refs
+ * <ol>
+ * <li>Add peer to replicationPeers</li>
+ * <li>Add the normal source and related replication queue</li>
+ * <li>Add HFile Refs</li>
+ * </ol>
* @param peerId the id of replication peer
*/
public void addPeer(String peerId) throws IOException {
@@ -268,13 +274,16 @@ public class ReplicationSourceManager implements ReplicationListener {
}
/**
- * 1. Remove peer for replicationPeers 2. Remove all the recovered sources for the specified id
- * and related replication queues 3. Remove the normal source and related replication queue 4.
- * Remove HFile Refs
+ * <ol>
+ * <li>Remove peer for replicationPeers</li>
+ * <li>Remove all the recovered sources for the specified id and related replication queues</li>
+ * <li>Remove the normal source and related replication queue</li>
+ * <li>Remove HFile Refs</li>
+ * </ol>
* @param peerId the id of the replication peer
*/
public void removePeer(String peerId) {
- replicationPeers.removePeer(peerId);
+ ReplicationPeer peer = replicationPeers.removePeer(peerId);
String terminateMessage = "Replication stream was removed by a user";
List<ReplicationSourceInterface> oldSourcesToDelete = new ArrayList<>();
// synchronized on oldsources to avoid adding recovered source for the to-be-removed peer
@@ -305,7 +314,10 @@ public class ReplicationSourceManager implements ReplicationListener {
deleteQueue(peerId);
this.walsById.remove(peerId);
}
-
+ ReplicationPeerConfig peerConfig = peer.getPeerConfig();
+ if (peerConfig.isSyncReplication()) {
+ syncReplicationPeerMappingManager.remove(peerId, peerConfig);
+ }
// Remove HFile Refs
abortWhenFail(() -> this.queueStorage.removePeerFromHFileRefs(peerId));
}
@@ -357,6 +369,10 @@ public class ReplicationSourceManager implements ReplicationListener {
}
}
}
+ ReplicationPeerConfig peerConfig = peer.getPeerConfig();
+ if (peerConfig.isSyncReplication()) {
+ syncReplicationPeerMappingManager.add(peer.getId(), peerConfig);
+ }
src.startup();
return src;
}
@@ -436,6 +452,7 @@ public class ReplicationSourceManager implements ReplicationListener {
// Delete queue from storage and memory
deleteQueue(src.getQueueId());
this.walsById.remove(src.getQueueId());
+
}
/**