You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/01/27 06:39:51 UTC
[09/14] hbase git commit: HBASE-19781 Add a new cluster state flag
for synchronous replication
HBASE-19781 Add a new cluster state flag for synchronous replication
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/cd9848e7
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/cd9848e7
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/cd9848e7
Branch: refs/heads/HBASE-19064
Commit: cd9848e7a97261fd0aa86d8d24c8b195f32185a8
Parents: c795686
Author: Guanghao Zhang <zg...@apache.org>
Authored: Mon Jan 22 11:44:49 2018 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Sat Jan 27 14:35:42 2018 +0800
----------------------------------------------------------------------
.../org/apache/hadoop/hbase/client/Admin.java | 39 +++++
.../apache/hadoop/hbase/client/AsyncAdmin.java | 31 ++++
.../hadoop/hbase/client/AsyncHBaseAdmin.java | 7 +
.../hbase/client/ConnectionImplementation.java | 9 ++
.../apache/hadoop/hbase/client/HBaseAdmin.java | 26 +++
.../hadoop/hbase/client/RawAsyncHBaseAdmin.java | 15 ++
.../client/ShortCircuitMasterConnection.java | 9 ++
.../replication/ReplicationPeerConfigUtil.java | 26 +--
.../replication/ReplicationPeerDescription.java | 10 +-
.../hbase/replication/SyncReplicationState.java | 48 ++++++
.../hbase/shaded/protobuf/RequestConverter.java | 10 ++
.../src/main/protobuf/Master.proto | 4 +
.../src/main/protobuf/MasterProcedure.proto | 6 +-
.../src/main/protobuf/Replication.proto | 20 +++
.../replication/ReplicationPeerStorage.java | 18 ++-
.../hbase/replication/ReplicationUtils.java | 1 +
.../replication/ZKReplicationPeerStorage.java | 60 +++++--
.../replication/TestReplicationStateBasic.java | 23 ++-
.../TestZKReplicationPeerStorage.java | 12 +-
.../hbase/coprocessor/MasterObserver.java | 23 +++
.../org/apache/hadoop/hbase/master/HMaster.java | 12 ++
.../hbase/master/MasterCoprocessorHost.java | 21 +++
.../hadoop/hbase/master/MasterRpcServices.java | 17 ++
.../hadoop/hbase/master/MasterServices.java | 9 ++
.../procedure/PeerProcedureInterface.java | 2 +-
.../replication/ReplicationPeerManager.java | 51 +++++-
...ransitPeerSyncReplicationStateProcedure.java | 159 +++++++++++++++++++
.../hbase/security/access/AccessController.java | 8 +
.../replication/TestReplicationAdmin.java | 62 ++++++++
.../hbase/master/MockNoopMasterServices.java | 11 +-
.../cleaner/TestReplicationHFileCleaner.java | 4 +-
.../TestReplicationTrackerZKImpl.java | 6 +-
.../TestReplicationSourceManager.java | 3 +-
.../security/access/TestAccessController.java | 16 ++
.../hbase/util/TestHBaseFsckReplication.java | 5 +-
.../src/main/ruby/hbase/replication_admin.rb | 15 ++
hbase-shell/src/main/ruby/shell.rb | 1 +
.../src/main/ruby/shell/commands/list_peers.rb | 6 +-
.../transit_peer_sync_replication_state.rb | 44 +++++
.../test/ruby/hbase/replication_admin_test.rb | 24 +++
40 files changed, 818 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/cd9848e7/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java
index b8546fa..167d6f3 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java
@@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
+import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.snapshot.HBaseSnapshotException;
import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
@@ -2648,6 +2649,44 @@ public interface Admin extends Abortable, Closeable {
List<ReplicationPeerDescription> listReplicationPeers(Pattern pattern) throws IOException;
/**
+ * Transit current cluster to a new state in a synchronous replication peer.
+ * @param peerId a short name that identifies the peer
+ * @param state a new state of current cluster
+ * @throws IOException if a remote or network exception occurs
+ */
+ void transitReplicationPeerSyncReplicationState(String peerId, SyncReplicationState state)
+ throws IOException;
+
+ /**
+ * Transit current cluster to a new state in a synchronous replication peer. But does not block
+ * and wait for it.
+ * <p>
+ * You can use Future.get(long, TimeUnit) to wait on the operation to complete. It may throw
+ * ExecutionException if there was an error while executing the operation or TimeoutException in
+ * case the wait timeout was not long enough to allow the operation to complete.
+ * @param peerId a short name that identifies the peer
+ * @param state a new state of current cluster
+ * @throws IOException if a remote or network exception occurs
+ */
+ Future<Void> transitReplicationPeerSyncReplicationStateAsync(String peerId,
+ SyncReplicationState state) throws IOException;
+
+ /**
+ * Get the current cluster state in a synchronous replication peer.
+ * @param peerId a short name that identifies the peer
+ * @return the current cluster state
+ * @throws IOException if a remote or network exception occurs
+ */
+ default SyncReplicationState getReplicationPeerSyncReplicationState(String peerId)
+ throws IOException {
+ List<ReplicationPeerDescription> peers = listReplicationPeers(Pattern.compile(peerId));
+ if (peers.isEmpty() || !peers.get(0).getPeerId().equals(peerId)) {
+ throw new IOException("Replication peer " + peerId + " does not exist");
+ }
+ return peers.get(0).getSyncReplicationState();
+ }
+
+ /**
* Mark region server(s) as decommissioned to prevent additional regions from getting
* assigned to them. Optionally unload the regions on the servers. If there are multiple servers
* to be decommissioned, decommissioning them at the same time can prevent wasteful region
http://git-wip-us.apache.org/repos/asf/hbase/blob/cd9848e7/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
index 35cdd3f..895e7ff 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.client;
import com.google.protobuf.RpcChannel;
+import java.io.IOException;
import java.util.Collection;
import java.util.EnumSet;
import java.util.List;
@@ -42,6 +43,7 @@ import org.apache.hadoop.hbase.quotas.QuotaFilter;
import org.apache.hadoop.hbase.quotas.QuotaSettings;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
+import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.yetus.audience.InterfaceAudience;
/**
@@ -600,6 +602,35 @@ public interface AsyncAdmin {
ReplicationPeerConfig peerConfig);
/**
+ * Transit current cluster to a new state in a synchronous replication peer.
+ * @param peerId a short name that identifies the peer
+ * @param state a new state of current cluster
+ */
+ CompletableFuture<Void> transitReplicationPeerSyncReplicationState(String peerId,
+ SyncReplicationState state);
+
+ /**
+ * Get the current cluster state in a synchronous replication peer.
+ * @param peerId a short name that identifies the peer
+ * @return the current cluster state wrapped by a {@link CompletableFuture}.
+ */
+ default CompletableFuture<SyncReplicationState>
+ getReplicationPeerSyncReplicationState(String peerId) {
+ CompletableFuture<SyncReplicationState> future = new CompletableFuture<>();
+ listReplicationPeers(Pattern.compile(peerId)).whenComplete((peers, error) -> {
+ if (error != null) {
+ future.completeExceptionally(error);
+ } else if (peers.isEmpty() || !peers.get(0).getPeerId().equals(peerId)) {
+ future.completeExceptionally(
+ new IOException("Replication peer " + peerId + " does not exist"));
+ } else {
+ future.complete(peers.get(0).getSyncReplicationState());
+ }
+ });
+ return future;
+ }
+
+ /**
* Append the replicable table-cf config of the specified peer
* @param peerId a short that identifies the cluster
* @param tableCfs A map from tableName to column family names
http://git-wip-us.apache.org/repos/asf/hbase/blob/cd9848e7/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
index 9b2390c..44771fd 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.quotas.QuotaFilter;
import org.apache.hadoop.hbase.quotas.QuotaSettings;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
+import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.yetus.audience.InterfaceAudience;
/**
@@ -414,6 +415,12 @@ class AsyncHBaseAdmin implements AsyncAdmin {
}
@Override
+ public CompletableFuture<Void> transitReplicationPeerSyncReplicationState(String peerId,
+ SyncReplicationState clusterState) {
+ return wrap(rawAdmin.transitReplicationPeerSyncReplicationState(peerId, clusterState));
+ }
+
+ @Override
public CompletableFuture<Void> appendReplicationPeerTableCFs(String peerId,
Map<TableName, List<String>> tableCfs) {
return wrap(rawAdmin.appendReplicationPeerTableCFs(peerId, tableCfs));
http://git-wip-us.apache.org/repos/asf/hbase/blob/cd9848e7/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
index 8807884..5407c6d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
@@ -123,6 +123,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListR
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
@@ -1724,6 +1726,13 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
MasterProtos.ClearDeadServersRequest request) throws ServiceException {
return stub.clearDeadServers(controller, request);
}
+
+ @Override
+ public TransitReplicationPeerSyncReplicationStateResponse
+ transitReplicationPeerSyncReplicationState(RpcController controller,
+ TransitReplicationPeerSyncReplicationStateRequest request) throws ServiceException {
+ return stub.transitReplicationPeerSyncReplicationState(controller, request);
+ }
};
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/cd9848e7/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
index 8685984..c01b891 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
@@ -86,6 +86,7 @@ import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
+import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.HBaseSnapshotException;
import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
@@ -206,6 +207,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.Disab
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
@@ -3991,6 +3993,30 @@ public class HBaseAdmin implements Admin {
}
@Override
+ public void transitReplicationPeerSyncReplicationState(String peerId, SyncReplicationState state)
+ throws IOException {
+ get(transitReplicationPeerSyncReplicationStateAsync(peerId, state), this.syncWaitTimeout,
+ TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public Future<Void> transitReplicationPeerSyncReplicationStateAsync(String peerId,
+ SyncReplicationState state) throws IOException {
+ TransitReplicationPeerSyncReplicationStateResponse response =
+ executeCallable(new MasterCallable<TransitReplicationPeerSyncReplicationStateResponse>(
+ getConnection(), getRpcControllerFactory()) {
+ @Override
+ protected TransitReplicationPeerSyncReplicationStateResponse rpcCall() throws Exception {
+ return master.transitReplicationPeerSyncReplicationState(getRpcController(),
+ RequestConverter.buildTransitReplicationPeerSyncReplicationStateRequest(peerId,
+ state));
+ }
+ });
+ return new ReplicationFuture(this, peerId, response.getProcId(),
+ () -> "TRANSIT_REPLICATION_PEER_SYNCHRONOUS_REPLICATION_STATE");
+ }
+
+ @Override
public void appendReplicationPeerTableCFs(String id,
Map<TableName, List<String>> tableCfs)
throws ReplicationException, IOException {
http://git-wip-us.apache.org/repos/asf/hbase/blob/cd9848e7/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
index 050bfe2..30a372d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
@@ -78,6 +78,7 @@ import org.apache.hadoop.hbase.quotas.QuotaTableUtil;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
+import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
@@ -255,6 +256,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListR
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
@@ -1613,6 +1616,18 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
}
@Override
+ public CompletableFuture<Void> transitReplicationPeerSyncReplicationState(String peerId,
+ SyncReplicationState clusterState) {
+ return this
+ .<TransitReplicationPeerSyncReplicationStateRequest, TransitReplicationPeerSyncReplicationStateResponse> procedureCall(
+ RequestConverter.buildTransitReplicationPeerSyncReplicationStateRequest(peerId,
+ clusterState),
+ (s, c, req, done) -> s.transitReplicationPeerSyncReplicationState(c, req, done),
+ (resp) -> resp.getProcId(), new ReplicationProcedureBiConsumer(peerId,
+ () -> "TRANSIT_REPLICATION_PEER_SYNCHRONOUS_REPLICATION_STATE"));
+ }
+
+ @Override
public CompletableFuture<Void> appendReplicationPeerTableCFs(String id,
Map<TableName, List<String>> tableCfs) {
if (tableCfs == null) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/cd9848e7/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitMasterConnection.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitMasterConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitMasterConnection.java
index 50690b4..7bb65d2 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitMasterConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitMasterConnection.java
@@ -166,6 +166,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListR
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
@@ -638,4 +640,11 @@ public class ShortCircuitMasterConnection implements MasterKeepAliveConnection {
throws ServiceException {
return stub.splitRegion(controller, request);
}
+
+ @Override
+ public TransitReplicationPeerSyncReplicationStateResponse
+ transitReplicationPeerSyncReplicationState(RpcController controller,
+ TransitReplicationPeerSyncReplicationStateRequest request) throws ServiceException {
+ return stub.transitReplicationPeerSyncReplicationState(controller, request);
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/cd9848e7/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java
index 642149b..86b49ea 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
+import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
@@ -391,25 +392,28 @@ public final class ReplicationPeerConfigUtil {
return ProtobufUtil.prependPBMagic(bytes);
}
- public static ReplicationPeerDescription toReplicationPeerDescription(
- ReplicationProtos.ReplicationPeerDescription desc) {
- boolean enabled = ReplicationProtos.ReplicationState.State.ENABLED == desc.getState()
- .getState();
+ public static ReplicationPeerDescription
+ toReplicationPeerDescription(ReplicationProtos.ReplicationPeerDescription desc) {
+ boolean enabled =
+ ReplicationProtos.ReplicationState.State.ENABLED == desc.getState().getState();
ReplicationPeerConfig config = convert(desc.getConfig());
- return new ReplicationPeerDescription(desc.getId(), enabled, config);
+ return new ReplicationPeerDescription(desc.getId(), enabled, config,
+ SyncReplicationState.valueOf(desc.getSyncReplicationState().getNumber()));
}
- public static ReplicationProtos.ReplicationPeerDescription toProtoReplicationPeerDescription(
- ReplicationPeerDescription desc) {
+ public static ReplicationProtos.ReplicationPeerDescription
+ toProtoReplicationPeerDescription(ReplicationPeerDescription desc) {
ReplicationProtos.ReplicationPeerDescription.Builder builder =
ReplicationProtos.ReplicationPeerDescription.newBuilder();
builder.setId(desc.getPeerId());
- ReplicationProtos.ReplicationState.Builder stateBuilder = ReplicationProtos.ReplicationState
- .newBuilder();
- stateBuilder.setState(desc.isEnabled() ? ReplicationProtos.ReplicationState.State.ENABLED
- : ReplicationProtos.ReplicationState.State.DISABLED);
+ ReplicationProtos.ReplicationState.Builder stateBuilder =
+ ReplicationProtos.ReplicationState.newBuilder();
+ stateBuilder.setState(desc.isEnabled() ? ReplicationProtos.ReplicationState.State.ENABLED :
+ ReplicationProtos.ReplicationState.State.DISABLED);
builder.setState(stateBuilder.build());
builder.setConfig(convert(desc.getPeerConfig()));
+ builder.setSyncReplicationState(
+ ReplicationProtos.SyncReplicationState.forNumber(desc.getSyncReplicationState().ordinal()));
return builder.build();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/cd9848e7/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerDescription.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerDescription.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerDescription.java
index ba97d07..2d077c5 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerDescription.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerDescription.java
@@ -28,11 +28,14 @@ public class ReplicationPeerDescription {
private final String id;
private final boolean enabled;
private final ReplicationPeerConfig config;
+ private final SyncReplicationState syncReplicationState;
- public ReplicationPeerDescription(String id, boolean enabled, ReplicationPeerConfig config) {
+ public ReplicationPeerDescription(String id, boolean enabled, ReplicationPeerConfig config,
+ SyncReplicationState syncReplicationState) {
this.id = id;
this.enabled = enabled;
this.config = config;
+ this.syncReplicationState = syncReplicationState;
}
public String getPeerId() {
@@ -47,11 +50,16 @@ public class ReplicationPeerDescription {
return this.config;
}
+ public SyncReplicationState getSyncReplicationState() {
+ return this.syncReplicationState;
+ }
+
@Override
public String toString() {
StringBuilder builder = new StringBuilder("id : ").append(id);
builder.append(", enabled : " + enabled);
builder.append(", config : " + config);
+ builder.append(", syncReplicationState : " + syncReplicationState);
return builder.toString();
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/cd9848e7/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/SyncReplicationState.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/SyncReplicationState.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/SyncReplicationState.java
new file mode 100644
index 0000000..bd144e9
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/SyncReplicationState.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Used by synchronous replication. Indicate the state of the current cluster in a synchronous
+ * replication peer. The state may be one of {@link SyncReplicationState#ACTIVE},
+ * {@link SyncReplicationState#DOWNGRADE_ACTIVE} or
+ * {@link SyncReplicationState#STANDBY}.
+ * <p>
+ * For asynchronous replication, the state is {@link SyncReplicationState#NONE}.
+ */
+@InterfaceAudience.Public
+public enum SyncReplicationState {
+ NONE, ACTIVE, DOWNGRADE_ACTIVE, STANDBY;
+
+ public static SyncReplicationState valueOf(int value) {
+ switch (value) {
+ case 0:
+ return NONE;
+ case 1:
+ return ACTIVE;
+ case 2:
+ return DOWNGRADE_ACTIVE;
+ case 3:
+ return STANDBY;
+ default:
+ throw new IllegalArgumentException("Unknown synchronous replication state " + value);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/cd9848e7/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
index 8ac7058..659be2a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
@@ -54,6 +54,7 @@ import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
@@ -146,6 +147,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.Enabl
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest;
/**
@@ -1874,4 +1876,12 @@ public final class RequestConverter {
}
return pbServers;
}
+
+ public static TransitReplicationPeerSyncReplicationStateRequest
+ buildTransitReplicationPeerSyncReplicationStateRequest(String peerId,
+ SyncReplicationState state) {
+ return TransitReplicationPeerSyncReplicationStateRequest.newBuilder().setPeerId(peerId)
+ .setSyncReplicationState(ReplicationProtos.SyncReplicationState.forNumber(state.ordinal()))
+ .build();
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/cd9848e7/hbase-protocol-shaded/src/main/protobuf/Master.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol-shaded/src/main/protobuf/Master.proto b/hbase-protocol-shaded/src/main/protobuf/Master.proto
index 3a236c0..c2ab180 100644
--- a/hbase-protocol-shaded/src/main/protobuf/Master.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/Master.proto
@@ -962,6 +962,10 @@ service MasterService {
rpc ListReplicationPeers(ListReplicationPeersRequest)
returns(ListReplicationPeersResponse);
+ /** Transit the state of current cluster in a synchronous replication peer */
+ rpc TransitReplicationPeerSyncReplicationState(TransitReplicationPeerSyncReplicationStateRequest)
+ returns(TransitReplicationPeerSyncReplicationStateResponse);
+
/** Returns a list of ServerNames marked as decommissioned. */
rpc ListDecommissionedRegionServers(ListDecommissionedRegionServersRequest)
returns(ListDecommissionedRegionServersResponse);
http://git-wip-us.apache.org/repos/asf/hbase/blob/cd9848e7/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
index ae676ea..b3af029 100644
--- a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
@@ -409,4 +409,8 @@ message AddPeerStateData {
message UpdatePeerConfigStateData {
required ReplicationPeer peer_config = 1;
-}
\ No newline at end of file
+}
+
+message TransitPeerSyncReplicationStateStateData {
+ required SyncReplicationState syncReplicationState = 1;
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/cd9848e7/hbase-protocol-shaded/src/main/protobuf/Replication.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol-shaded/src/main/protobuf/Replication.proto b/hbase-protocol-shaded/src/main/protobuf/Replication.proto
index 44295d8..de7b742 100644
--- a/hbase-protocol-shaded/src/main/protobuf/Replication.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/Replication.proto
@@ -63,12 +63,23 @@ message ReplicationState {
}
/**
+ * Indicate the state of the current cluster in a synchronous replication peer.
+ */
+enum SyncReplicationState {
+ NONE = 0;
+ ACTIVE = 1;
+ DOWNGRADE_ACTIVE = 2;
+ STANDBY = 3;
+}
+
+/**
* Used by replication. Description of the replication peer.
*/
message ReplicationPeerDescription {
required string id = 1;
required ReplicationState state = 2;
required ReplicationPeer config = 3;
+ optional SyncReplicationState syncReplicationState = 4;
}
/**
@@ -137,3 +148,12 @@ message ListReplicationPeersRequest {
message ListReplicationPeersResponse {
repeated ReplicationPeerDescription peer_desc = 1;
}
+
+message TransitReplicationPeerSyncReplicationStateRequest {
+ required string peer_id = 1;
+ required SyncReplicationState syncReplicationState = 2;
+}
+
+message TransitReplicationPeerSyncReplicationStateResponse {
+ required uint64 proc_id = 1;
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/cd9848e7/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorage.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorage.java
index 1adda02..d2538ab 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorage.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorage.java
@@ -31,8 +31,8 @@ public interface ReplicationPeerStorage {
* Add a replication peer.
* @throws ReplicationException if there are errors accessing the storage service.
*/
- void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
- throws ReplicationException;
+ void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled,
+ SyncReplicationState syncReplicationState) throws ReplicationException;
/**
* Remove a replication peer.
@@ -70,4 +70,18 @@ public interface ReplicationPeerStorage {
* @throws ReplicationException if there are errors accessing the storage service.
*/
ReplicationPeerConfig getPeerConfig(String peerId) throws ReplicationException;
+
+ /**
+ * Set the state of current cluster in a synchronous replication peer.
+ * @throws ReplicationException if there are errors accessing the storage service.
+ */
+ void setPeerSyncReplicationState(String peerId, SyncReplicationState state)
+ throws ReplicationException;
+
+ /**
+ * Get the state of current cluster in a synchronous replication peer.
+ * @throws ReplicationException if there are errors accessing the storage service.
+ */
+ SyncReplicationState getPeerSyncReplicationState(String peerId)
+ throws ReplicationException;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/cd9848e7/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
index ebe68a7..93fe91d 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
@@ -22,6 +22,7 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CompoundConfiguration;
import org.apache.hadoop.hbase.HBaseConfiguration;
http://git-wip-us.apache.org/repos/asf/hbase/blob/cd9848e7/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java
index a53500a..338ce3f 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java
@@ -22,6 +22,7 @@ import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CollectionUtils;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
@@ -51,6 +52,8 @@ public class ZKReplicationPeerStorage extends ZKReplicationStorageBase
public static final byte[] DISABLED_ZNODE_BYTES =
toByteArray(ReplicationProtos.ReplicationState.State.DISABLED);
+ public static final String SYNCHRONOUS_REPLICATION_STATE_ZNODE = "sync-rep-state";
+
/**
* The name of the znode that contains the replication status of a remote slave (i.e. peer)
* cluster.
@@ -79,21 +82,29 @@ public class ZKReplicationPeerStorage extends ZKReplicationStorageBase
return ZNodePaths.joinZNode(peersZNode, peerId);
}
+ @VisibleForTesting
+ public String getSyncReplicationStateNode(String peerId) {
+ return ZNodePaths.joinZNode(getPeerNode(peerId), SYNCHRONOUS_REPLICATION_STATE_ZNODE);
+ }
+
@Override
- public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
- throws ReplicationException {
+ public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled,
+ SyncReplicationState syncReplicationState) throws ReplicationException {
+ List<ZKUtilOp> multiOps = Arrays.asList(
+ ZKUtilOp.createAndFailSilent(getPeerNode(peerId),
+ ReplicationPeerConfigUtil.toByteArray(peerConfig)),
+ ZKUtilOp.createAndFailSilent(getPeerStateNode(peerId),
+ enabled ? ENABLED_ZNODE_BYTES : DISABLED_ZNODE_BYTES),
+ ZKUtilOp.createAndFailSilent(getSyncReplicationStateNode(peerId),
+ Bytes.toBytes(syncReplicationState.ordinal())));
try {
ZKUtil.createWithParents(zookeeper, peersZNode);
- ZKUtil.multiOrSequential(zookeeper,
- Arrays.asList(
- ZKUtilOp.createAndFailSilent(getPeerNode(peerId),
- ReplicationPeerConfigUtil.toByteArray(peerConfig)),
- ZKUtilOp.createAndFailSilent(getPeerStateNode(peerId),
- enabled ? ENABLED_ZNODE_BYTES : DISABLED_ZNODE_BYTES)),
- false);
+ ZKUtil.multiOrSequential(zookeeper, multiOps, false);
} catch (KeeperException e) {
- throw new ReplicationException("Could not add peer with id=" + peerId + ", peerConfif=>"
- + peerConfig + ", state=" + (enabled ? "ENABLED" : "DISABLED"), e);
+ throw new ReplicationException(
+ "Could not add peer with id=" + peerId + ", peerConfig=>" + peerConfig + ", state=" +
+ (enabled ? "ENABLED" : "DISABLED") + ", syncReplicationState=" + syncReplicationState,
+ e);
}
}
@@ -166,4 +177,31 @@ public class ZKReplicationPeerStorage extends ZKReplicationStorageBase
"Failed to parse replication peer config for peer with id=" + peerId, e);
}
}
+
+ @Override
+ public void setPeerSyncReplicationState(String peerId, SyncReplicationState clusterState)
+ throws ReplicationException {
+ byte[] clusterStateBytes = Bytes.toBytes(clusterState.ordinal());
+ try {
+ ZKUtil.setData(zookeeper, getSyncReplicationStateNode(peerId), clusterStateBytes);
+ } catch (KeeperException e) {
+ throw new ReplicationException(
+ "Unable to change the cluster state for the synchronous replication peer with id=" +
+ peerId,
+ e);
+ }
+ }
+
+ @Override
+ public SyncReplicationState getPeerSyncReplicationState(String peerId)
+ throws ReplicationException {
+ byte[] data;
+ try {
+ data = ZKUtil.getData(zookeeper, getSyncReplicationStateNode(peerId));
+ } catch (KeeperException | InterruptedException e) {
+ throw new ReplicationException(
+ "Error getting cluster state for the synchronous replication peer with id=" + peerId, e);
+ }
+ return SyncReplicationState.valueOf(Bytes.toInt(data));
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/cd9848e7/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
index fccffb5..fe658a3 100644
--- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
+++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
@@ -160,7 +160,8 @@ public abstract class TestReplicationStateBasic {
assertTrue(rqs.getReplicableHFiles(ID_ONE).isEmpty());
assertEquals(0, rqs.getAllPeersFromHFileRefsQueue().size());
rp.getPeerStorage().addPeer(ID_ONE,
- ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(), true);
+ ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(), true,
+ SyncReplicationState.NONE);
rqs.addPeerToHFileRefs(ID_ONE);
rqs.addHFileRefs(ID_ONE, files1);
assertEquals(1, rqs.getAllPeersFromHFileRefsQueue().size());
@@ -183,10 +184,12 @@ public abstract class TestReplicationStateBasic {
public void testRemovePeerForHFileRefs() throws ReplicationException, KeeperException {
rp.init();
rp.getPeerStorage().addPeer(ID_ONE,
- ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(), true);
+ ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(), true,
+ SyncReplicationState.NONE);
rqs.addPeerToHFileRefs(ID_ONE);
rp.getPeerStorage().addPeer(ID_TWO,
- ReplicationPeerConfig.newBuilder().setClusterKey(KEY_TWO).build(), true);
+ ReplicationPeerConfig.newBuilder().setClusterKey(KEY_TWO).build(), true,
+ SyncReplicationState.NONE);
rqs.addPeerToHFileRefs(ID_TWO);
List<Pair<Path, Path>> files1 = new ArrayList<>(3);
@@ -235,9 +238,13 @@ public abstract class TestReplicationStateBasic {
assertNumberOfPeers(0);
// Add some peers
- rp.getPeerStorage().addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), true);
+ rp.getPeerStorage().addPeer(ID_ONE,
+ ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(), true,
+ SyncReplicationState.NONE);
assertNumberOfPeers(1);
- rp.getPeerStorage().addPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO), true);
+ rp.getPeerStorage().addPeer(ID_TWO,
+ ReplicationPeerConfig.newBuilder().setClusterKey(KEY_TWO).build(), true,
+ SyncReplicationState.NONE);
assertNumberOfPeers(2);
assertEquals(KEY_ONE, ZKConfig.getZooKeeperClusterKey(ReplicationUtils
@@ -247,7 +254,9 @@ public abstract class TestReplicationStateBasic {
assertNumberOfPeers(1);
// Add one peer
- rp.getPeerStorage().addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), true);
+ rp.getPeerStorage().addPeer(ID_ONE,
+ ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(), true,
+ SyncReplicationState.NONE);
rp.addPeer(ID_ONE);
assertNumberOfPeers(2);
assertTrue(rp.getPeer(ID_ONE).isPeerEnabled());
@@ -311,7 +320,7 @@ public abstract class TestReplicationStateBasic {
// Add peers for the corresponding queues so they are not orphans
rp.getPeerStorage().addPeer("qId" + i,
ReplicationPeerConfig.newBuilder().setClusterKey("localhost:2818:/bogus" + i).build(),
- true);
+ true, SyncReplicationState.NONE);
}
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/cd9848e7/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java
index 3eb11da..bdaa9fd 100644
--- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java
+++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java
@@ -82,8 +82,9 @@ public class TestZKReplicationPeerStorage {
Random rand = new Random(seed);
return ReplicationPeerConfig.newBuilder().setClusterKey(Long.toHexString(rand.nextLong()))
.setReplicationEndpointImpl(Long.toHexString(rand.nextLong()))
- .setNamespaces(randNamespaces(rand)).setExcludeNamespaces(randNamespaces(rand))
- .setTableCFsMap(randTableCFs(rand)).setReplicateAllUserTables(rand.nextBoolean())
+ .setRemoteWALDir(Long.toHexString(rand.nextLong())).setNamespaces(randNamespaces(rand))
+ .setExcludeNamespaces(randNamespaces(rand)).setTableCFsMap(randTableCFs(rand))
+ .setExcludeTableCFsMap(randTableCFs(rand)).setReplicateAllUserTables(rand.nextBoolean())
.setBandwidth(rand.nextInt(1000)).build();
}
@@ -134,7 +135,8 @@ public class TestZKReplicationPeerStorage {
public void test() throws ReplicationException {
int peerCount = 10;
for (int i = 0; i < peerCount; i++) {
- STORAGE.addPeer(Integer.toString(i), getConfig(i), i % 2 == 0);
+ STORAGE.addPeer(Integer.toString(i), getConfig(i), i % 2 == 0,
+ SyncReplicationState.valueOf(i % 4));
}
List<String> peerIds = STORAGE.listPeerIds();
assertEquals(peerCount, peerIds.size());
@@ -158,6 +160,10 @@ public class TestZKReplicationPeerStorage {
for (int i = 0; i < peerCount; i++) {
assertEquals(i % 2 != 0, STORAGE.isPeerEnabled(Integer.toString(i)));
}
+ for (int i = 0; i < peerCount; i++) {
+ assertEquals(SyncReplicationState.valueOf(i % 4),
+ STORAGE.getPeerSyncReplicationState(Integer.toString(i)));
+ }
String toRemove = Integer.toString(peerCount / 2);
STORAGE.removePeer(toRemove);
peerIds = STORAGE.listPeerIds();
http://git-wip-us.apache.org/repos/asf/hbase/blob/cd9848e7/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java
index a17bc9f..8d2b55f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.net.Address;
import org.apache.hadoop.hbase.quotas.GlobalQuotaSettings;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
@@ -1232,6 +1233,28 @@ public interface MasterObserver {
String regex) throws IOException {}
/**
+ * Called before transit current cluster state for the specified synchronous replication peer
+ * @param ctx the environment to interact with the framework and master
+ * @param peerId a short name that identifies the peer
+ * @param state a new state
+ */
+ default void preTransitReplicationPeerSyncReplicationState(
+ final ObserverContext<MasterCoprocessorEnvironment> ctx, String peerId,
+ SyncReplicationState state) throws IOException {
+ }
+
+ /**
+ * Called after transit current cluster state for the specified synchronous replication peer
+ * @param ctx the environment to interact with the framework and master
+ * @param peerId a short name that identifies the peer
+ * @param state a new state
+ */
+ default void postTransitReplicationPeerSyncReplicationState(
+ final ObserverContext<MasterCoprocessorEnvironment> ctx, String peerId,
+ SyncReplicationState state) throws IOException {
+ }
+
+ /**
* Called before new LockProcedure is queued.
* @param ctx the environment to interact with the framework and master
*/
http://git-wip-us.apache.org/repos/asf/hbase/blob/cd9848e7/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 88d8596..a495095 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -133,6 +133,7 @@ import org.apache.hadoop.hbase.master.replication.EnablePeerProcedure;
import org.apache.hadoop.hbase.master.replication.ModifyPeerProcedure;
import org.apache.hadoop.hbase.master.replication.RemovePeerProcedure;
import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
+import org.apache.hadoop.hbase.master.replication.TransitPeerSyncReplicationStateProcedure;
import org.apache.hadoop.hbase.master.replication.UpdatePeerConfigProcedure;
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
import org.apache.hadoop.hbase.mob.MobConstants;
@@ -166,6 +167,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.FIFOCompactionPolicy;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
+import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.replication.master.ReplicationPeerConfigUpgrader;
import org.apache.hadoop.hbase.replication.regionserver.Replication;
import org.apache.hadoop.hbase.security.AccessDeniedException;
@@ -3410,6 +3412,16 @@ public class HMaster extends HRegionServer implements MasterServices {
return peers;
}
+ @Override
+ public long transitReplicationPeerSyncReplicationState(String peerId, SyncReplicationState state)
+ throws ReplicationException, IOException {
+ LOG.info(
+ getClientIdAuditPrefix() +
+ " transit current cluster state to {} in a synchronous replication peer id={}",
+ state, peerId);
+ return executePeerProcedure(new TransitPeerSyncReplicationStateProcedure(peerId, state));
+ }
+
/**
* Mark region server(s) as decommissioned (previously called 'draining') to prevent additional
* regions from getting assigned to them. Also unload the regions on the servers asynchronously.0
http://git-wip-us.apache.org/repos/asf/hbase/blob/cd9848e7/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
index 8396145..70c8647 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java
@@ -55,6 +55,7 @@ import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.quotas.GlobalQuotaSettings;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.security.User;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
@@ -1524,6 +1525,26 @@ public class MasterCoprocessorHost
});
}
+ public void preTransitReplicationPeerSyncReplicationState(final String peerId,
+ final SyncReplicationState clusterState) throws IOException {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
+ @Override
+ public void call(MasterObserver observer) throws IOException {
+ observer.preTransitReplicationPeerSyncReplicationState(this, peerId, clusterState);
+ }
+ });
+ }
+
+ public void postTransitReplicationPeerSyncReplicationState(final String peerId,
+ final SyncReplicationState clusterState) throws IOException {
+ execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
+ @Override
+ public void call(MasterObserver observer) throws IOException {
+ observer.postTransitReplicationPeerSyncReplicationState(this, peerId, clusterState);
+ }
+ });
+ }
+
public void preRequestLock(String namespace, TableName tableName, RegionInfo[] regionInfos,
LockType type, String description) throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
http://git-wip-us.apache.org/repos/asf/hbase/blob/cd9848e7/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
index 377a9c6..f5bf117 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
@@ -83,6 +83,7 @@ import org.apache.hadoop.hbase.regionserver.RpcSchedulerFactory;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
+import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.access.AccessChecker;
import org.apache.hadoop.hbase.security.access.AccessController;
@@ -285,6 +286,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListR
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationState;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
@@ -1958,6 +1961,20 @@ public class MasterRpcServices extends RSRpcServices
}
@Override
+ public TransitReplicationPeerSyncReplicationStateResponse
+ transitReplicationPeerSyncReplicationState(RpcController controller,
+ TransitReplicationPeerSyncReplicationStateRequest request) throws ServiceException {
+ try {
+ long procId = master.transitReplicationPeerSyncReplicationState(request.getPeerId(),
+ SyncReplicationState.valueOf(request.getSyncReplicationState().getNumber()));
+ return TransitReplicationPeerSyncReplicationStateResponse.newBuilder().setProcId(procId)
+ .build();
+ } catch (ReplicationException | IOException e) {
+ throw new ServiceException(e);
+ }
+ }
+
+ @Override
public ListReplicationPeersResponse listReplicationPeers(RpcController controller,
ListReplicationPeersRequest request) throws ServiceException {
ListReplicationPeersResponse.Builder response = ListReplicationPeersResponse.newBuilder();
http://git-wip-us.apache.org/repos/asf/hbase/blob/cd9848e7/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
index 9d371bd..5c6f2dd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.quotas.MasterQuotaManager;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
+import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
@@ -480,6 +481,14 @@ public interface MasterServices extends Server {
IOException;
/**
+ * Set current cluster state for a synchronous replication peer.
+ * @param peerId a short name that identifies the peer
+ * @param clusterState state of current cluster
+ */
+ long transitReplicationPeerSyncReplicationState(String peerId, SyncReplicationState clusterState)
+ throws ReplicationException, IOException;
+
+ /**
* @return {@link LockManager} to lock namespaces/tables/regions.
*/
LockManager getLockManager();
http://git-wip-us.apache.org/repos/asf/hbase/blob/cd9848e7/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerProcedureInterface.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerProcedureInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerProcedureInterface.java
index 4abc9ad..fc5348e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerProcedureInterface.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerProcedureInterface.java
@@ -25,7 +25,7 @@ import org.apache.yetus.audience.InterfaceStability;
public interface PeerProcedureInterface {
enum PeerOperationType {
- ADD, REMOVE, ENABLE, DISABLE, UPDATE_CONFIG, REFRESH
+ ADD, REMOVE, ENABLE, DISABLE, UPDATE_CONFIG, REFRESH, TRANSIT_SYNC_REPLICATION_STATE
}
String getPeerId();
http://git-wip-us.apache.org/repos/asf/hbase/blob/cd9848e7/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
index d715e2e..9336fbd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.master.replication;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.EnumMap;
+import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -43,6 +45,7 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
+import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
@@ -61,6 +64,16 @@ public class ReplicationPeerManager {
private final ConcurrentMap<String, ReplicationPeerDescription> peers;
+ private final EnumMap<SyncReplicationState, EnumSet<SyncReplicationState>> allowedTransition =
+ new EnumMap<SyncReplicationState, EnumSet<SyncReplicationState>>(SyncReplicationState.class) {
+ {
+ put(SyncReplicationState.ACTIVE, EnumSet.of(SyncReplicationState.DOWNGRADE_ACTIVE));
+ put(SyncReplicationState.STANDBY, EnumSet.of(SyncReplicationState.DOWNGRADE_ACTIVE));
+ put(SyncReplicationState.DOWNGRADE_ACTIVE,
+ EnumSet.of(SyncReplicationState.STANDBY, SyncReplicationState.ACTIVE));
+ }
+ };
+
ReplicationPeerManager(ReplicationPeerStorage peerStorage, ReplicationQueueStorage queueStorage,
ConcurrentMap<String, ReplicationPeerDescription> peers) {
this.peerStorage = peerStorage;
@@ -163,6 +176,17 @@ public class ReplicationPeerManager {
}
}
+ public void preTransitPeerSyncReplicationState(String peerId, SyncReplicationState state)
+ throws DoNotRetryIOException {
+ ReplicationPeerDescription desc = checkPeerExists(peerId);
+ SyncReplicationState fromState = desc.getSyncReplicationState();
+ EnumSet<SyncReplicationState> allowedToStates = allowedTransition.get(fromState);
+ if (allowedToStates == null || !allowedToStates.contains(state)) {
+ throw new DoNotRetryIOException("Can not transit current cluster state from " + fromState +
+ " to " + state + " for peer id=" + peerId);
+ }
+ }
+
public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
throws ReplicationException {
if (peers.containsKey(peerId)) {
@@ -170,8 +194,12 @@ public class ReplicationPeerManager {
return;
}
ReplicationPeerConfig copiedPeerConfig = ReplicationPeerConfig.newBuilder(peerConfig).build();
- peerStorage.addPeer(peerId, copiedPeerConfig, enabled);
- peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, copiedPeerConfig));
+ SyncReplicationState syncReplicationState =
+ StringUtils.isBlank(peerConfig.getRemoteWALDir()) ? SyncReplicationState.NONE
+ : SyncReplicationState.DOWNGRADE_ACTIVE;
+ peerStorage.addPeer(peerId, copiedPeerConfig, enabled, syncReplicationState);
+ peers.put(peerId,
+ new ReplicationPeerDescription(peerId, enabled, copiedPeerConfig, syncReplicationState));
}
public void removePeer(String peerId) throws ReplicationException {
@@ -190,7 +218,8 @@ public class ReplicationPeerManager {
return;
}
peerStorage.setPeerState(peerId, enabled);
- peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, desc.getPeerConfig()));
+ peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, desc.getPeerConfig(),
+ desc.getSyncReplicationState()));
}
public void enablePeer(String peerId) throws ReplicationException {
@@ -215,7 +244,8 @@ public class ReplicationPeerManager {
newPeerConfigBuilder.putAllConfiguration(peerConfig.getConfiguration());
ReplicationPeerConfig newPeerConfig = newPeerConfigBuilder.build();
peerStorage.updatePeerConfig(peerId, newPeerConfig);
- peers.put(peerId, new ReplicationPeerDescription(peerId, desc.isEnabled(), newPeerConfig));
+ peers.put(peerId, new ReplicationPeerDescription(peerId, desc.isEnabled(), newPeerConfig,
+ desc.getSyncReplicationState()));
}
public List<ReplicationPeerDescription> listPeers(Pattern pattern) {
@@ -231,6 +261,14 @@ public class ReplicationPeerManager {
return desc != null ? Optional.of(desc.getPeerConfig()) : Optional.empty();
}
+ public void transitPeerSyncReplicationState(String peerId, SyncReplicationState state)
+ throws ReplicationException {
+ ReplicationPeerDescription desc = peers.get(peerId);
+ peerStorage.setPeerSyncReplicationState(peerId, state);
+ peers.put(peerId,
+ new ReplicationPeerDescription(peerId, desc.isEnabled(), desc.getPeerConfig(), state));
+ }
+
public void removeAllQueuesAndHFileRefs(String peerId) throws ReplicationException {
// Here we need two passes to address the problem of claimQueue. Maybe a claimQueue is still
// on-going when the refresh peer config procedure is done, if a RS which has already been
@@ -350,10 +388,11 @@ public class ReplicationPeerManager {
for (String peerId : peerStorage.listPeerIds()) {
ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId);
boolean enabled = peerStorage.isPeerEnabled(peerId);
- peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, peerConfig));
+ SyncReplicationState state = peerStorage.getPeerSyncReplicationState(peerId);
+ peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, peerConfig, state));
}
return new ReplicationPeerManager(peerStorage,
- ReplicationStorageFactory.getReplicationQueueStorage(zk, conf), peers);
+ ReplicationStorageFactory.getReplicationQueueStorage(zk, conf), peers);
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/cd9848e7/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
new file mode 100644
index 0000000..d26eecc
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.replication;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
+import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
+import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
+import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.SyncReplicationState;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationState;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.TransitPeerSyncReplicationStateStateData;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
+
+/**
+ * The procedure for transit current cluster state for a synchronous replication peer.
+ */
+@InterfaceAudience.Private
+public class TransitPeerSyncReplicationStateProcedure extends ModifyPeerProcedure {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TransitPeerSyncReplicationStateProcedure.class);
+
+ private SyncReplicationState state;
+
+ public TransitPeerSyncReplicationStateProcedure() {
+ }
+
+ public TransitPeerSyncReplicationStateProcedure(String peerId, SyncReplicationState state) {
+ super(peerId);
+ this.state = state;
+ }
+
+ @Override
+ public PeerOperationType getPeerOperationType() {
+ return PeerOperationType.TRANSIT_SYNC_REPLICATION_STATE;
+ }
+
+ @Override
+ protected void prePeerModification(MasterProcedureEnv env)
+ throws IOException, ReplicationException {
+ MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
+ if (cpHost != null) {
+ cpHost.preTransitReplicationPeerSyncReplicationState(peerId, state);
+ }
+ env.getReplicationPeerManager().preTransitPeerSyncReplicationState(peerId, state);
+ }
+
+ @Override
+ protected void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException {
+ env.getReplicationPeerManager().transitPeerSyncReplicationState(peerId, state);
+ }
+
+ @Override
+ protected void postPeerModification(MasterProcedureEnv env)
+ throws IOException, ReplicationException {
+ LOG.info("Successfully transit current cluster state to {} in synchronous replication peer {}",
+ state, peerId);
+ MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
+ if (cpHost != null) {
+ env.getMasterCoprocessorHost().postTransitReplicationPeerSyncReplicationState(peerId, state);
+ }
+ }
+
+ @Override
+ protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
+ super.serializeStateData(serializer);
+ serializer.serialize(TransitPeerSyncReplicationStateStateData.newBuilder()
+ .setSyncReplicationState(ReplicationProtos.SyncReplicationState.forNumber(state.ordinal()))
+ .build());
+ }
+
+ @Override
+ protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
+ super.deserializeStateData(serializer);
+ TransitPeerSyncReplicationStateStateData data =
+ serializer.deserialize(TransitPeerSyncReplicationStateStateData.class);
+ state = SyncReplicationState.valueOf(data.getSyncReplicationState().getNumber());
+ }
+
+ @Override
+ protected Flow executeFromState(MasterProcedureEnv env, PeerModificationState state)
+ throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
+ switch (state) {
+ case PRE_PEER_MODIFICATION:
+ try {
+ prePeerModification(env);
+ } catch (IOException e) {
+ LOG.warn("{} failed to call pre CP hook or the pre check is failed for peer {}, " +
+ "mark the procedure as failure and give up", getClass().getName(), peerId, e);
+ setFailure("master-" + getPeerOperationType().name().toLowerCase() + "-peer", e);
+ releaseLatch();
+ return Flow.NO_MORE_STATE;
+ } catch (ReplicationException e) {
+ LOG.warn("{} failed to call prePeerModification for peer {}, retry", getClass().getName(),
+ peerId, e);
+ throw new ProcedureYieldException();
+ }
+ setNextState(PeerModificationState.UPDATE_PEER_STORAGE);
+ return Flow.HAS_MORE_STATE;
+ case UPDATE_PEER_STORAGE:
+ try {
+ updatePeerStorage(env);
+ } catch (ReplicationException e) {
+ LOG.warn("{} update peer storage for peer {} failed, retry", getClass().getName(), peerId,
+ e);
+ throw new ProcedureYieldException();
+ }
+ setNextState(PeerModificationState.REFRESH_PEER_ON_RS);
+ return Flow.HAS_MORE_STATE;
+ case REFRESH_PEER_ON_RS:
+ // TODO: Need add child procedure for every RegionServer
+ setNextState(PeerModificationState.POST_PEER_MODIFICATION);
+ return Flow.HAS_MORE_STATE;
+ case POST_PEER_MODIFICATION:
+ try {
+ postPeerModification(env);
+ } catch (ReplicationException e) {
+ LOG.warn("{} failed to call postPeerModification for peer {}, retry",
+ getClass().getName(), peerId, e);
+ throw new ProcedureYieldException();
+ } catch (IOException e) {
+ LOG.warn("{} failed to call post CP hook for peer {}, " +
+ "ignore since the procedure has already done", getClass().getName(), peerId, e);
+ }
+ releaseLatch();
+ return Flow.NO_MORE_STATE;
+ default:
+ throw new UnsupportedOperationException("unhandled state=" + state);
+ }
+ }
+
+ private void releaseLatch() {
+ ProcedurePrepareLatch.releaseLatch(latch, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/cd9848e7/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
index 6acc133..491e4dc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
@@ -112,6 +112,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTrack
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.security.AccessDeniedException;
import org.apache.hadoop.hbase.security.Superusers;
import org.apache.hadoop.hbase.security.User;
@@ -2501,6 +2502,13 @@ public class AccessController implements MasterCoprocessor, RegionCoprocessor,
}
@Override
+ public void preTransitReplicationPeerSyncReplicationState(
+ final ObserverContext<MasterCoprocessorEnvironment> ctx, String peerId,
+ SyncReplicationState clusterState) throws IOException {
+ requirePermission(ctx, "transitSyncReplicationPeerState", Action.ADMIN);
+ }
+
+ @Override
public void preListReplicationPeers(final ObserverContext<MasterCoprocessorEnvironment> ctx,
String regex) throws IOException {
requirePermission(ctx, "listReplicationPeers", Action.ADMIN);
http://git-wip-us.apache.org/repos/asf/hbase/blob/cd9848e7/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
index cb06590..10f59e3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
+import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.replication.TestReplicationEndpoint.InterClusterReplicationEndpointForTest;
import org.apache.hadoop.hbase.replication.regionserver.TestReplicator.ReplicationEndpointForTest;
import org.apache.hadoop.hbase.testclassification.ClientTests;
@@ -977,4 +978,65 @@ public class TestReplicationAdmin {
// OK
}
}
+
+ @Test
+ public void testTransitSyncReplicationPeerState() throws Exception {
+ ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder();
+ builder.setClusterKey(KEY_ONE);
+ hbaseAdmin.addReplicationPeer(ID_ONE, builder.build());
+ assertEquals(SyncReplicationState.NONE,
+ hbaseAdmin.getReplicationPeerSyncReplicationState(ID_ONE));
+
+ try {
+ hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_ONE,
+ SyncReplicationState.DOWNGRADE_ACTIVE);
+ fail("Can't transit cluster state if replication peer don't config remote wal dir");
+ } catch (Exception e) {
+ // OK
+ }
+
+ String rootDir = "hdfs://srv1:9999/hbase";
+ builder = ReplicationPeerConfig.newBuilder();
+ builder.setClusterKey(KEY_SECOND);
+ builder.setRemoteWALDir(rootDir);
+ hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build());
+ assertEquals(SyncReplicationState.DOWNGRADE_ACTIVE,
+ hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND));
+
+ // Disable and enable peer don't affect SyncReplicationState
+ hbaseAdmin.disableReplicationPeer(ID_SECOND);
+ assertEquals(SyncReplicationState.DOWNGRADE_ACTIVE,
+ hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND));
+ hbaseAdmin.enableReplicationPeer(ID_SECOND);
+ assertEquals(SyncReplicationState.DOWNGRADE_ACTIVE,
+ hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND));
+
+ hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND, SyncReplicationState.ACTIVE);
+ assertEquals(SyncReplicationState.ACTIVE,
+ hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND));
+
+ try {
+ hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND,
+ SyncReplicationState.STANDBY);
+ fail("Can't transit cluster state from ACTIVE to STANDBY");
+ } catch (Exception e) {
+ // OK
+ }
+
+ hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND,
+ SyncReplicationState.DOWNGRADE_ACTIVE);
+ assertEquals(SyncReplicationState.DOWNGRADE_ACTIVE,
+ hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND));
+
+ hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND, SyncReplicationState.STANDBY);
+ assertEquals(SyncReplicationState.STANDBY,
+ hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND));
+
+ try {
+ hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND, SyncReplicationState.ACTIVE);
+ fail("Can't transit cluster state from STANDBY to ACTIVE");
+ } catch (Exception e) {
+ // OK
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/cd9848e7/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
index 48e9e8d..b7d54d6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java
@@ -54,11 +54,10 @@ import org.apache.hadoop.hbase.quotas.MasterQuotaManager;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
+import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
-import com.google.protobuf.Service;
-
public class MockNoopMasterServices implements MasterServices {
private final Configuration conf;
private final MetricsMaster metricsMaster;
@@ -481,4 +480,10 @@ public class MockNoopMasterServices implements MasterServices {
public ReplicationPeerManager getReplicationPeerManager() {
return null;
}
-}
+
+ @Override
+ public long transitReplicationPeerSyncReplicationState(String peerId,
+ SyncReplicationState clusterState) throws ReplicationException, IOException {
+ return 0;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/cd9848e7/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
index 82cc388..78f231b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
+import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner;
import org.apache.hadoop.hbase.replication.regionserver.Replication;
import org.apache.hadoop.hbase.testclassification.MasterTests;
@@ -102,7 +103,8 @@ public class TestReplicationHFileCleaner {
public void setup() throws ReplicationException, IOException {
root = TEST_UTIL.getDataTestDirOnTestFS();
rp.getPeerStorage().addPeer(peerId,
- ReplicationPeerConfig.newBuilder().setClusterKey(TEST_UTIL.getClusterKey()).build(), true);
+ ReplicationPeerConfig.newBuilder().setClusterKey(TEST_UTIL.getClusterKey()).build(), true,
+ SyncReplicationState.NONE);
rq.addPeerToHFileRefs(peerId);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/cd9848e7/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java
index 757d9a9..573cb80 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java
@@ -150,11 +150,13 @@ public class TestReplicationTrackerZKImpl {
public void testPeerNameControl() throws Exception {
int exists = 0;
rp.getPeerStorage().addPeer("6",
- ReplicationPeerConfig.newBuilder().setClusterKey(utility.getClusterKey()).build(), true);
+ ReplicationPeerConfig.newBuilder().setClusterKey(utility.getClusterKey()).build(), true,
+ SyncReplicationState.NONE);
try {
rp.getPeerStorage().addPeer("6",
- ReplicationPeerConfig.newBuilder().setClusterKey(utility.getClusterKey()).build(), true);
+ ReplicationPeerConfig.newBuilder().setClusterKey(utility.getClusterKey()).build(), true,
+ SyncReplicationState.NONE);
} catch (ReplicationException e) {
if (e.getCause() instanceof KeeperException.NodeExistsException) {
exists++;
http://git-wip-us.apache.org/repos/asf/hbase/blob/cd9848e7/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index 33216cb..d97f504 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -68,6 +68,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
+import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.replication.ZKReplicationPeerStorage;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker;
import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -560,7 +561,7 @@ public abstract class TestReplicationSourceManager {
private void addPeerAndWait(final String peerId, final ReplicationPeerConfig peerConfig,
final boolean waitForSource) throws Exception {
final ReplicationPeers rp = manager.getReplicationPeers();
- rp.getPeerStorage().addPeer(peerId, peerConfig, true);
+ rp.getPeerStorage().addPeer(peerId, peerConfig, true, SyncReplicationState.NONE);
try {
manager.addPeer(peerId);
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/cd9848e7/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java
index 06a55bc..0fbd0a6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java
@@ -116,6 +116,7 @@ import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.security.Superusers;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.access.Permission.Action;
@@ -2931,6 +2932,21 @@ public class TestAccessController extends SecureTestUtil {
}
@Test
+ public void testTransitSyncReplicationPeerState() throws Exception {
+ AccessTestAction action = new AccessTestAction() {
+ @Override
+ public Object run() throws Exception {
+ ACCESS_CONTROLLER.preTransitReplicationPeerSyncReplicationState(
+ ObserverContextImpl.createAndPrepare(CP_ENV), "test", SyncReplicationState.NONE);
+ return null;
+ }
+ };
+
+ verifyAllowed(action, SUPERUSER, USER_ADMIN);
+ verifyDenied(action, USER_CREATE, USER_RW, USER_RO, USER_NONE, USER_OWNER);
+ }
+
+ @Test
public void testListReplicationPeers() throws Exception {
AccessTestAction action = new AccessTestAction() {
@Override