You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/01/12 14:05:13 UTC
[05/37] hbase git commit: HBASE-19216 Implement a general framework
to execute remote procedure on RS
HBASE-19216 Implement a general framework to execute remote procedure on RS
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/a92d2226
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/a92d2226
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/a92d2226
Branch: refs/heads/HBASE-19397-branch-2
Commit: a92d222677296b1bb0f6bbad748ba3b23c702d3c
Parents: 72702ee
Author: zhangduo <zh...@apache.org>
Authored: Fri Dec 15 21:06:44 2017 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Fri Jan 12 21:42:57 2018 +0800
----------------------------------------------------------------------
.../hbase/procedure2/LockedResourceType.java | 4 +-
.../procedure2/RemoteProcedureDispatcher.java | 23 +-
.../src/main/protobuf/Admin.proto | 9 +-
.../src/main/protobuf/MasterProcedure.proto | 30 +++
.../src/main/protobuf/RegionServerStatus.proto | 15 ++
.../apache/hadoop/hbase/executor/EventType.java | 26 ++-
.../hadoop/hbase/executor/ExecutorType.java | 3 +-
.../org/apache/hadoop/hbase/master/HMaster.java | 33 ++-
.../hadoop/hbase/master/MasterRpcServices.java | 13 ++
.../assignment/RegionTransitionProcedure.java | 18 +-
.../procedure/MasterProcedureScheduler.java | 224 +++++++++++++------
.../procedure/PeerProcedureInterface.java | 34 +++
.../master/procedure/RSProcedureDispatcher.java | 101 +++++----
.../master/replication/ModifyPeerProcedure.java | 127 +++++++++++
.../master/replication/RefreshPeerCallable.java | 67 ++++++
.../replication/RefreshPeerProcedure.java | 197 ++++++++++++++++
.../hbase/procedure2/RSProcedureCallable.java | 43 ++++
.../hbase/regionserver/HRegionServer.java | 90 ++++++--
.../hbase/regionserver/RSRpcServices.java | 56 +++--
.../handler/RSProcedureHandler.java | 51 +++++
.../assignment/TestAssignmentManager.java | 20 +-
.../replication/DummyModifyPeerProcedure.java | 41 ++++
.../TestDummyModifyPeerProcedure.java | 80 +++++++
.../security/access/TestAccessController.java | 1 +
24 files changed, 1122 insertions(+), 184 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/a92d2226/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockedResourceType.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockedResourceType.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockedResourceType.java
index c5fe62b..dc9b5d4 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockedResourceType.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockedResourceType.java
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -22,5 +22,5 @@ import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
public enum LockedResourceType {
- SERVER, NAMESPACE, TABLE, REGION
+ SERVER, NAMESPACE, TABLE, REGION, PEER
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/a92d2226/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java
index 71932b8..78c49fb 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java
@@ -226,13 +226,30 @@ public abstract class RemoteProcedureDispatcher<TEnv, TRemote extends Comparable
/**
* Remote procedure reference.
- * @param <TEnv>
- * @param <TRemote>
*/
public interface RemoteProcedure<TEnv, TRemote> {
+ /**
+ * For building the remote operation.
+ */
RemoteOperation remoteCallBuild(TEnv env, TRemote remote);
- void remoteCallCompleted(TEnv env, TRemote remote, RemoteOperation response);
+
+ /**
+ * Called when the executeProcedure call is failed.
+ */
void remoteCallFailed(TEnv env, TRemote remote, IOException exception);
+
+ /**
+ * Called when RS tells the remote procedure is succeeded through the
+ * {@code reportProcedureDone} method.
+ */
+ void remoteOperationCompleted(TEnv env);
+
+ /**
+ * Called when RS tells the remote procedure is failed through the {@code reportProcedureDone}
+ * method.
+ * @param error the error message
+ */
+ void remoteOperationFailed(TEnv env, String error);
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/a92d2226/hbase-protocol-shaded/src/main/protobuf/Admin.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol-shaded/src/main/protobuf/Admin.proto b/hbase-protocol-shaded/src/main/protobuf/Admin.proto
index 118c79b..ddcc266 100644
--- a/hbase-protocol-shaded/src/main/protobuf/Admin.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/Admin.proto
@@ -256,14 +256,19 @@ message ClearRegionBlockCacheResponse {
required CacheEvictionStats stats = 1;
}
+message RemoteProcedureRequest {
+ required uint64 proc_id = 1;
+ required string proc_class = 2;
+ optional bytes proc_data = 3;
+}
+
message ExecuteProceduresRequest {
repeated OpenRegionRequest open_region = 1;
repeated CloseRegionRequest close_region = 2;
+ repeated RemoteProcedureRequest proc = 3;
}
message ExecuteProceduresResponse {
- repeated OpenRegionResponse open_region = 1;
- repeated CloseRegionResponse close_region = 2;
}
service AdminService {
http://git-wip-us.apache.org/repos/asf/hbase/blob/a92d2226/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
index f9b8807..0e2bdba 100644
--- a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
@@ -365,3 +365,33 @@ message GCMergedRegionsStateData {
required RegionInfo parent_b = 2;
required RegionInfo merged_child = 3;
}
+
+enum PeerModificationState {
+ UPDATE_PEER_STORAGE = 1;
+ REFRESH_PEER_ON_RS = 2;
+ POST_PEER_MODIFICATION = 3;
+}
+
+message PeerModificationStateData {
+ required string peer_id = 1;
+}
+
+enum PeerModificationType {
+ ADD_PEER = 1;
+ REMOVE_PEER = 2;
+ ENABLE_PEER = 3;
+ DISABLE_PEER = 4;
+ UPDATE_PEER_CONFIG = 5;
+}
+
+message RefreshPeerStateData {
+ required string peer_id = 1;
+ required PeerModificationType type = 2;
+ required ServerName target_server = 3;
+}
+
+message RefreshPeerParameter {
+ required string peer_id = 1;
+ required PeerModificationType type = 2;
+ required ServerName target_server = 3;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/a92d2226/hbase-protocol-shaded/src/main/protobuf/RegionServerStatus.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol-shaded/src/main/protobuf/RegionServerStatus.proto b/hbase-protocol-shaded/src/main/protobuf/RegionServerStatus.proto
index f83bb20..eb396ac 100644
--- a/hbase-protocol-shaded/src/main/protobuf/RegionServerStatus.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/RegionServerStatus.proto
@@ -143,7 +143,19 @@ message RegionSpaceUseReportRequest {
}
message RegionSpaceUseReportResponse {
+}
+message ReportProcedureDoneRequest {
+ required uint64 proc_id = 1;
+ enum Status {
+ SUCCESS = 1;
+ ERROR = 2;
+ }
+ required Status status = 2;
+ optional string error = 3;
+}
+
+message ReportProcedureDoneResponse {
}
service RegionServerStatusService {
@@ -181,4 +193,7 @@ service RegionServerStatusService {
*/
rpc ReportRegionSpaceUse(RegionSpaceUseReportRequest)
returns(RegionSpaceUseReportResponse);
+
+ rpc ReportProcedureDone(ReportProcedureDoneRequest)
+ returns(ReportProcedureDoneResponse);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/a92d2226/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java
index 26fb63a..922deb8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java
@@ -20,15 +20,14 @@ package org.apache.hadoop.hbase.executor;
import org.apache.yetus.audience.InterfaceAudience;
/**
- * List of all HBase event handler types. Event types are named by a
- * convention: event type names specify the component from which the event
- * originated and then where its destined -- e.g. RS2ZK_ prefix means the
- * event came from a regionserver destined for zookeeper -- and then what
- * the even is; e.g. REGION_OPENING.
- *
- * <p>We give the enums indices so we can add types later and keep them
- * grouped together rather than have to add them always to the end as we
- * would have to if we used raw enum ordinals.
+ * List of all HBase event handler types.
+ * <p>
+ * Event types are named by a convention: event type names specify the component from which the
+ * event originated and then where its destined -- e.g. RS_ZK_ prefix means the event came from a
+ * regionserver destined for zookeeper -- and then what the even is; e.g. REGION_OPENING.
+ * <p>
+ * We give the enums indices so we can add types later and keep them grouped together rather than
+ * have to add them always to the end as we would have to if we used raw enum ordinals.
*/
@InterfaceAudience.Private
public enum EventType {
@@ -275,7 +274,14 @@ public enum EventType {
*
* RS_COMPACTED_FILES_DISCHARGER
*/
- RS_COMPACTED_FILES_DISCHARGER (83, ExecutorType.RS_COMPACTED_FILES_DISCHARGER);
+ RS_COMPACTED_FILES_DISCHARGER (83, ExecutorType.RS_COMPACTED_FILES_DISCHARGER),
+
+ /**
+ * RS refresh peer.<br>
+ *
+ * RS_REFRESH_PEER
+ */
+ RS_REFRESH_PEER (84, ExecutorType.RS_REFRESH_PEER);
private final int code;
private final ExecutorType executor;
http://git-wip-us.apache.org/repos/asf/hbase/blob/a92d2226/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java
index c75a0a9..7f130d1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java
@@ -46,7 +46,8 @@ public enum ExecutorType {
RS_LOG_REPLAY_OPS (27),
RS_REGION_REPLICA_FLUSH_OPS (28),
RS_COMPACTED_FILES_DISCHARGER (29),
- RS_OPEN_PRIORITY_REGION (30);
+ RS_OPEN_PRIORITY_REGION (30),
+ RS_REFRESH_PEER (31);
ExecutorType(int value) {
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/a92d2226/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index ee7cd18..44a9cdf 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -1,5 +1,4 @@
-/*
- *
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -139,6 +138,7 @@ import org.apache.hadoop.hbase.procedure2.LockedResource;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
+import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure;
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
import org.apache.hadoop.hbase.quotas.MasterQuotaManager;
import org.apache.hadoop.hbase.quotas.MasterSpaceQuotaObserver;
@@ -327,8 +327,7 @@ public class HMaster extends HRegionServer implements MasterServices {
// flag set after we become the active master (used for testing)
private volatile boolean activeMaster = false;
- // flag set after we complete initialization once active,
- // it is not private since it's used in unit tests
+ // flag set after we complete initialization once active
private final ProcedureEvent initialized = new ProcedureEvent("master initialized");
// flag set after master services are started,
@@ -3529,4 +3528,28 @@ public class HMaster extends HRegionServer implements MasterServices {
public SpaceQuotaSnapshotNotifier getSpaceQuotaSnapshotNotifier() {
return this.spaceQuotaSnapshotNotifier;
}
-}
+
+ @SuppressWarnings("unchecked")
+ private RemoteProcedure<MasterProcedureEnv, ?> getRemoteProcedure(long procId) {
+ Procedure<?> procedure = procedureExecutor.getProcedure(procId);
+ if (procedure == null) {
+ return null;
+ }
+ assert procedure instanceof RemoteProcedure;
+ return (RemoteProcedure<MasterProcedureEnv, ?>) procedure;
+ }
+
+ public void remoteProcedureCompleted(long procId) {
+ RemoteProcedure<MasterProcedureEnv, ?> procedure = getRemoteProcedure(procId);
+ if (procedure != null) {
+ procedure.remoteOperationCompleted(procedureExecutor.getEnvironment());
+ }
+ }
+
+ public void remoteProcedureFailed(long procId, String error) {
+ RemoteProcedure<MasterProcedureEnv, ?> procedure = getRemoteProcedure(procId);
+ if (procedure != null) {
+ procedure.remoteOperationFailed(procedureExecutor.getEnvironment(), error);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/a92d2226/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
index 907ca9b..f875e20 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
@@ -264,6 +264,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProto
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUseReportRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUseReportResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportProcedureDoneRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportProcedureDoneResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
@@ -2247,4 +2249,15 @@ public class MasterRpcServices extends RSRpcServices
}
return response.build();
}
+
+ @Override
+ public ReportProcedureDoneResponse reportProcedureDone(RpcController controller,
+ ReportProcedureDoneRequest request) throws ServiceException {
+ if (request.getStatus() == ReportProcedureDoneRequest.Status.SUCCESS) {
+ master.remoteProcedureCompleted(request.getProcId());
+ } else {
+ master.remoteProcedureFailed(request.getProcId(), request.getError());
+ }
+ return ReportProcedureDoneResponse.getDefaultInstance();
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/a92d2226/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java
index 4a88e3b..04dccc4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java
@@ -173,12 +173,6 @@ public abstract class RegionTransitionProcedure
RegionStateNode regionNode, IOException exception);
@Override
- public void remoteCallCompleted(final MasterProcedureEnv env,
- final ServerName serverName, final RemoteOperation response) {
- // Ignore the response? reportTransition() is the one that count?
- }
-
- @Override
public void remoteCallFailed(final MasterProcedureEnv env,
final ServerName serverName, final IOException exception) {
final RegionStateNode regionNode = getRegionState(env);
@@ -413,4 +407,16 @@ public abstract class RegionTransitionProcedure
* @return ServerName the Assign or Unassign is going against.
*/
public abstract ServerName getServer(final MasterProcedureEnv env);
+
+ @Override
+ public void remoteOperationCompleted(MasterProcedureEnv env) {
+ // should not be called for region operation until we modified the open/close region procedure
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void remoteOperationFailed(MasterProcedureEnv env, String error) {
+ // should not be called for region operation until we modified the open/close region procedure
+ throw new UnsupportedOperationException();
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/a92d2226/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
index dc9c69d..8ff2d12 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
@@ -24,7 +24,7 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
+import java.util.function.Function;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ServerName;
@@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.master.locking.LockProcedure;
+import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface.PeerOperationType;
import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface.TableOperationType;
import org.apache.hadoop.hbase.procedure2.AbstractProcedureScheduler;
import org.apache.hadoop.hbase.procedure2.LockAndQueue;
@@ -109,12 +110,17 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
new ServerQueueKeyComparator();
private final static TableQueueKeyComparator TABLE_QUEUE_KEY_COMPARATOR =
new TableQueueKeyComparator();
+ private final static PeerQueueKeyComparator PEER_QUEUE_KEY_COMPARATOR =
+ new PeerQueueKeyComparator();
private final FairQueue<ServerName> serverRunQueue = new FairQueue<>();
private final FairQueue<TableName> tableRunQueue = new FairQueue<>();
+ private final FairQueue<String> peerRunQueue = new FairQueue<>();
private final ServerQueue[] serverBuckets = new ServerQueue[128];
private TableQueue tableMap = null;
+ private PeerQueue peerMap = null;
+
private final SchemaLocking locking = new SchemaLocking();
/**
@@ -161,6 +167,8 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
doAdd(tableRunQueue, getTableQueue(getTableName(proc)), proc, addFront);
} else if (isServerProcedure(proc)) {
doAdd(serverRunQueue, getServerQueue(getServerName(proc)), proc, addFront);
+ } else if (isPeerProcedure(proc)) {
+ doAdd(peerRunQueue, getPeerQueue(getPeerId(proc)), proc, addFront);
} else {
// TODO: at the moment we only have Table and Server procedures
// if you are implementing a non-table/non-server procedure, you have two options: create
@@ -172,7 +180,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
}
private <T extends Comparable<T>> void doAdd(final FairQueue<T> fairq,
- final Queue<T> queue, final Procedure proc, final boolean addFront) {
+ final Queue<T> queue, final Procedure<?> proc, final boolean addFront) {
queue.add(proc, addFront);
if (!queue.getLockStatus().hasExclusiveLock() || queue.getLockStatus().isLockOwner(proc.getProcId())) {
// if the queue was not remove for an xlock execution
@@ -189,7 +197,8 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
@Override
protected boolean queueHasRunnables() {
- return tableRunQueue.hasRunnables() || serverRunQueue.hasRunnables();
+ return tableRunQueue.hasRunnables() || serverRunQueue.hasRunnables() ||
+ peerRunQueue.hasRunnables();
}
@Override
@@ -197,7 +206,10 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
// For now, let server handling have precedence over table handling; presumption is that it
// is more important handling crashed servers than it is running the
// enabling/disabling tables, etc.
- Procedure pollResult = doPoll(serverRunQueue);
+ Procedure<?> pollResult = doPoll(serverRunQueue);
+ if (pollResult == null) {
+ pollResult = doPoll(peerRunQueue);
+ }
if (pollResult == null) {
pollResult = doPoll(tableRunQueue);
}
@@ -267,60 +279,30 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
exclusiveLockOwnerProcedure, sharedLockCount, waitingProcedures);
}
+ private <T> void addToLockedResources(List<LockedResource> lockedResources,
+ Map<T, LockAndQueue> locks, Function<T, String> keyTransformer,
+ LockedResourceType resourcesType) {
+ locks.entrySet().stream().filter(e -> e.getValue().isLocked())
+ .map(
+ e -> createLockedResource(resourcesType, keyTransformer.apply(e.getKey()), e.getValue()))
+ .forEachOrdered(lockedResources::add);
+ }
+
@Override
public List<LockedResource> getLocks() {
schedLock();
-
try {
List<LockedResource> lockedResources = new ArrayList<>();
-
- for (Entry<ServerName, LockAndQueue> entry : locking.serverLocks
- .entrySet()) {
- String serverName = entry.getKey().getServerName();
- LockAndQueue queue = entry.getValue();
-
- if (queue.isLocked()) {
- LockedResource lockedResource =
- createLockedResource(LockedResourceType.SERVER, serverName, queue);
- lockedResources.add(lockedResource);
- }
- }
-
- for (Entry<String, LockAndQueue> entry : locking.namespaceLocks
- .entrySet()) {
- String namespaceName = entry.getKey();
- LockAndQueue queue = entry.getValue();
-
- if (queue.isLocked()) {
- LockedResource lockedResource =
- createLockedResource(LockedResourceType.NAMESPACE, namespaceName, queue);
- lockedResources.add(lockedResource);
- }
- }
-
- for (Entry<TableName, LockAndQueue> entry : locking.tableLocks
- .entrySet()) {
- String tableName = entry.getKey().getNameAsString();
- LockAndQueue queue = entry.getValue();
-
- if (queue.isLocked()) {
- LockedResource lockedResource =
- createLockedResource(LockedResourceType.TABLE, tableName, queue);
- lockedResources.add(lockedResource);
- }
- }
-
- for (Entry<String, LockAndQueue> entry : locking.regionLocks.entrySet()) {
- String regionName = entry.getKey();
- LockAndQueue queue = entry.getValue();
-
- if (queue.isLocked()) {
- LockedResource lockedResource =
- createLockedResource(LockedResourceType.REGION, regionName, queue);
- lockedResources.add(lockedResource);
- }
- }
-
+ addToLockedResources(lockedResources, locking.serverLocks, sn -> sn.getServerName(),
+ LockedResourceType.SERVER);
+ addToLockedResources(lockedResources, locking.namespaceLocks, Function.identity(),
+ LockedResourceType.NAMESPACE);
+ addToLockedResources(lockedResources, locking.tableLocks, tn -> tn.getNameAsString(),
+ LockedResourceType.TABLE);
+ addToLockedResources(lockedResources, locking.regionLocks, Function.identity(),
+ LockedResourceType.REGION);
+ addToLockedResources(lockedResources, locking.peerLocks, Function.identity(),
+ LockedResourceType.PEER);
return lockedResources;
} finally {
schedUnlock();
@@ -328,8 +310,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
}
@Override
- public LockedResource getLockResource(LockedResourceType resourceType,
- String resourceName) {
+ public LockedResource getLockResource(LockedResourceType resourceType, String resourceName) {
LockAndQueue queue = null;
schedLock();
try {
@@ -346,8 +327,10 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
case REGION:
queue = locking.regionLocks.get(resourceName);
break;
+ case PEER:
+ queue = locking.peerLocks.get(resourceName);
+ break;
}
-
return queue != null ? createLockedResource(resourceType, resourceName, queue) : null;
} finally {
schedUnlock();
@@ -431,6 +414,11 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
markTableAsDeleted(iProcTable.getTableName(), proc);
return;
}
+ } else if (proc instanceof PeerProcedureInterface) {
+ PeerProcedureInterface iProcPeer = (PeerProcedureInterface) proc;
+ if (iProcPeer.getPeerOperationType() == PeerOperationType.REMOVE) {
+ removePeerQueue(iProcPeer.getPeerId());
+ }
} else {
// No cleanup for ServerProcedureInterface types, yet.
return;
@@ -468,12 +456,11 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
locking.removeTableLock(tableName);
}
-
- private static boolean isTableProcedure(Procedure proc) {
+ private static boolean isTableProcedure(Procedure<?> proc) {
return proc instanceof TableProcedureInterface;
}
- private static TableName getTableName(Procedure proc) {
+ private static TableName getTableName(Procedure<?> proc) {
return ((TableProcedureInterface)proc).getTableName();
}
@@ -494,15 +481,42 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
return Math.abs(hashCode) % buckets.length;
}
- private static boolean isServerProcedure(Procedure proc) {
+ private static boolean isServerProcedure(Procedure<?> proc) {
return proc instanceof ServerProcedureInterface;
}
- private static ServerName getServerName(Procedure proc) {
+ private static ServerName getServerName(Procedure<?> proc) {
return ((ServerProcedureInterface)proc).getServerName();
}
// ============================================================================
+ // Peer Queue Lookup Helpers
+ // ============================================================================
+ private PeerQueue getPeerQueue(String peerId) {
+ PeerQueue node = AvlTree.get(peerMap, peerId, PEER_QUEUE_KEY_COMPARATOR);
+ if (node != null) {
+ return node;
+ }
+ node = new PeerQueue(peerId, locking.getPeerLock(peerId));
+ peerMap = AvlTree.insert(peerMap, node);
+ return node;
+ }
+
+ private void removePeerQueue(String peerId) {
+ peerMap = AvlTree.remove(peerMap, peerId, PEER_QUEUE_KEY_COMPARATOR);
+ locking.removePeerLock(peerId);
+ }
+
+
+ private static boolean isPeerProcedure(Procedure<?> proc) {
+ return proc instanceof PeerProcedureInterface;
+ }
+
+ private static String getPeerId(Procedure<?> proc) {
+ return ((PeerProcedureInterface) proc).getPeerId();
+ }
+
+ // ============================================================================
// Table and Server Queue Implementation
// ============================================================================
private static class ServerQueueKeyComparator implements AvlKeyComparator<ServerQueue> {
@@ -571,6 +585,26 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
}
}
+ private static class PeerQueueKeyComparator implements AvlKeyComparator<PeerQueue> {
+
+ @Override
+ public int compareKey(PeerQueue node, Object key) {
+ return node.compareKey((String) key);
+ }
+ }
+
+ public static class PeerQueue extends Queue<String> {
+
+ public PeerQueue(String peerId, LockStatus lockStatus) {
+ super(peerId, lockStatus);
+ }
+
+ @Override
+ public boolean requireExclusiveLock(Procedure proc) {
+ return requirePeerExclusiveLock((PeerProcedureInterface) proc);
+ }
+ }
+
// ============================================================================
// Table Locking Helpers
// ============================================================================
@@ -958,7 +992,8 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
* @param serverName Server to lock
* @return true if the procedure has to wait for the server to be available
*/
- public boolean waitServerExclusiveLock(final Procedure procedure, final ServerName serverName) {
+ public boolean waitServerExclusiveLock(final Procedure<?> procedure,
+ final ServerName serverName) {
schedLock();
try {
final LockAndQueue lock = locking.getServerLock(serverName);
@@ -980,7 +1015,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
* @param procedure the procedure releasing the lock
* @param serverName the server that has the exclusive lock
*/
- public void wakeServerExclusiveLock(final Procedure procedure, final ServerName serverName) {
+ public void wakeServerExclusiveLock(final Procedure<?> procedure, final ServerName serverName) {
schedLock();
try {
final LockAndQueue lock = locking.getServerLock(serverName);
@@ -994,6 +1029,56 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
}
// ============================================================================
+ // Peer Locking Helpers
+ // ============================================================================
+
+ private static boolean requirePeerExclusiveLock(PeerProcedureInterface proc) {
+ return proc.getPeerOperationType() != PeerOperationType.REFRESH;
+ }
+
+ /**
+ * Try to acquire the exclusive lock on the specified peer.
+ * @see #wakePeerExclusiveLock(Procedure, String)
+ * @param procedure the procedure trying to acquire the lock
+ * @param peerId peer to lock
+ * @return true if the procedure has to wait for the per to be available
+ */
+ public boolean waitPeerExclusiveLock(Procedure<?> procedure, String peerId) {
+ schedLock();
+ try {
+ final LockAndQueue lock = locking.getPeerLock(peerId);
+ if (lock.tryExclusiveLock(procedure)) {
+ removeFromRunQueue(peerRunQueue, getPeerQueue(peerId));
+ return false;
+ }
+ waitProcedure(lock, procedure);
+ logLockedResource(LockedResourceType.PEER, peerId);
+ return true;
+ } finally {
+ schedUnlock();
+ }
+ }
+
+ /**
+ * Wake the procedures waiting for the specified peer
+ * @see #waitPeerExclusiveLock(Procedure, String)
+ * @param procedure the procedure releasing the lock
+ * @param peerId the peer that has the exclusive lock
+ */
+ public void wakePeerExclusiveLock(Procedure<?> procedure, String peerId) {
+ schedLock();
+ try {
+ final LockAndQueue lock = locking.getPeerLock(peerId);
+ lock.releaseExclusiveLock(procedure);
+ addToRunQueue(peerRunQueue, getPeerQueue(peerId));
+ int waitingCount = wakeWaitingProcedures(lock);
+ wakePollIfNeeded(waitingCount);
+ } finally {
+ schedUnlock();
+ }
+ }
+
+ // ============================================================================
// Generic Helpers
// ============================================================================
private static abstract class Queue<TKey extends Comparable<TKey>>
@@ -1098,6 +1183,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
final Map<TableName, LockAndQueue> tableLocks = new HashMap<>();
// Single map for all regions irrespective of tables. Key is encoded region name.
final Map<String, LockAndQueue> regionLocks = new HashMap<>();
+ final Map<String, LockAndQueue> peerLocks = new HashMap<>();
private <T> LockAndQueue getLock(Map<T, LockAndQueue> map, T key) {
LockAndQueue lock = map.get(key);
@@ -1132,6 +1218,14 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
return getLock(serverLocks, serverName);
}
+ LockAndQueue getPeerLock(String peerId) {
+ return getLock(peerLocks, peerId);
+ }
+
+ LockAndQueue removePeerLock(String peerId) {
+ return peerLocks.remove(peerId);
+ }
+
/**
* Removes all locks by clearing the maps.
* Used when procedure executor is stopped for failure and recovery testing.
@@ -1142,6 +1236,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
namespaceLocks.clear();
tableLocks.clear();
regionLocks.clear();
+ peerLocks.clear();
}
@Override
@@ -1149,7 +1244,8 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
return "serverLocks=" + filterUnlocked(this.serverLocks) +
", namespaceLocks=" + filterUnlocked(this.namespaceLocks) +
", tableLocks=" + filterUnlocked(this.tableLocks) +
- ", regionLocks=" + filterUnlocked(this.regionLocks);
+ ", regionLocks=" + filterUnlocked(this.regionLocks) +
+ ", peerLocks=" + filterUnlocked(this.peerLocks);
}
private String filterUnlocked(Map<?, LockAndQueue> locks) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/a92d2226/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerProcedureInterface.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerProcedureInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerProcedureInterface.java
new file mode 100644
index 0000000..4abc9ad
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerProcedureInterface.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.procedure;
+
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public interface PeerProcedureInterface {
+
+ enum PeerOperationType {
+ ADD, REMOVE, ENABLE, DISABLE, UPDATE_CONFIG, REFRESH
+ }
+
+ String getPeerId();
+
+ PeerOperationType getPeerOperationType();
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/a92d2226/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java
index 65c4d08..0f68f31 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java
@@ -24,7 +24,6 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
-
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.RegionInfo;
@@ -36,10 +35,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.ipc.RemoteException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap;
-import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
-import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
-import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
+
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
@@ -49,6 +45,13 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProc
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RemoteProcedureRequest;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap;
+import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
+import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
/**
* A remote procecdure dispatcher for regionservers.
@@ -222,7 +225,10 @@ public class RSProcedureDispatcher
private interface RemoteProcedureResolver {
void dispatchOpenRequests(MasterProcedureEnv env, List<RegionOpenOperation> operations);
+
void dispatchCloseRequests(MasterProcedureEnv env, List<RegionCloseOperation> operations);
+
+ void dispatchServerOperations(MasterProcedureEnv env, List<ServerOperation> operations);
}
/**
@@ -231,22 +237,28 @@ public class RSProcedureDispatcher
* Then {@code resolver} is used to dispatch {@link RegionOpenOperation}s and
* {@link RegionCloseOperation}s.
* @param serverName RegionServer to which the remote operations are sent
- * @param remoteProcedures Remote procedures which are dispatched to the given server
+ * @param operations Remote procedures which are dispatched to the given server
* @param resolver Used to dispatch remote procedures to given server.
*/
- public void splitAndResolveOperation(final ServerName serverName,
- final Set<RemoteProcedure> remoteProcedures, final RemoteProcedureResolver resolver) {
- final ArrayListMultimap<Class<?>, RemoteOperation> reqsByType =
- buildAndGroupRequestByType(procedureEnv, serverName, remoteProcedures);
+ public void splitAndResolveOperation(ServerName serverName, Set<RemoteProcedure> operations,
+ RemoteProcedureResolver resolver) {
+ MasterProcedureEnv env = master.getMasterProcedureExecutor().getEnvironment();
+ ArrayListMultimap<Class<?>, RemoteOperation> reqsByType =
+ buildAndGroupRequestByType(env, serverName, operations);
- final List<RegionOpenOperation> openOps = fetchType(reqsByType, RegionOpenOperation.class);
+ List<RegionOpenOperation> openOps = fetchType(reqsByType, RegionOpenOperation.class);
if (!openOps.isEmpty()) {
- resolver.dispatchOpenRequests(procedureEnv, openOps);
+ resolver.dispatchOpenRequests(env, openOps);
}
- final List<RegionCloseOperation> closeOps = fetchType(reqsByType, RegionCloseOperation.class);
+ List<RegionCloseOperation> closeOps = fetchType(reqsByType, RegionCloseOperation.class);
if (!closeOps.isEmpty()) {
- resolver.dispatchCloseRequests(procedureEnv, closeOps);
+ resolver.dispatchCloseRequests(env, closeOps);
+ }
+
+ List<ServerOperation> refreshOps = fetchType(reqsByType, ServerOperation.class);
+ if (!refreshOps.isEmpty()) {
+ resolver.dispatchServerOperations(env, refreshOps);
}
if (!reqsByType.isEmpty()) {
@@ -277,8 +289,7 @@ public class RSProcedureDispatcher
splitAndResolveOperation(getServerName(), remoteProcedures, this);
try {
- final ExecuteProceduresResponse response = sendRequest(getServerName(), request.build());
- remoteCallCompleted(procedureEnv, response);
+ sendRequest(getServerName(), request.build());
} catch (IOException e) {
e = unwrapException(e);
// TODO: In the future some operation may want to bail out early.
@@ -302,6 +313,11 @@ public class RSProcedureDispatcher
}
}
+ @Override
+ public void dispatchServerOperations(MasterProcedureEnv env, List<ServerOperation> operations) {
+ operations.stream().map(o -> o.buildRequest()).forEachOrdered(request::addProc);
+ }
+
protected ExecuteProceduresResponse sendRequest(final ServerName serverName,
final ExecuteProceduresRequest request) throws IOException {
try {
@@ -311,17 +327,8 @@ public class RSProcedureDispatcher
}
}
-
- private void remoteCallCompleted(final MasterProcedureEnv env,
- final ExecuteProceduresResponse response) {
- /*
- for (RemoteProcedure proc: operations) {
- proc.remoteCallCompleted(env, getServerName(), response);
- }*/
- }
-
private void remoteCallFailed(final MasterProcedureEnv env, final IOException e) {
- for (RemoteProcedure proc: remoteProcedures) {
+ for (RemoteProcedure proc : remoteProcedures) {
proc.remoteCallFailed(env, getServerName(), e);
}
}
@@ -362,8 +369,7 @@ public class RSProcedureDispatcher
buildOpenRegionRequest(procedureEnv, getServerName(), operations);
try {
- OpenRegionResponse response = sendRequest(getServerName(), request);
- remoteCallCompleted(procedureEnv, response);
+ sendRequest(getServerName(), request);
} catch (IOException e) {
e = unwrapException(e);
// TODO: In the future some operation may want to bail out early.
@@ -384,16 +390,6 @@ public class RSProcedureDispatcher
}
}
- private void remoteCallCompleted(final MasterProcedureEnv env,
- final OpenRegionResponse response) {
- int index = 0;
- for (RegionOpenOperation op: operations) {
- OpenRegionResponse.RegionOpeningState state = response.getOpeningState(index++);
- op.setFailedOpen(state == OpenRegionResponse.RegionOpeningState.FAILED_OPENING);
- op.getRemoteProcedure().remoteCallCompleted(env, getServerName(), op);
- }
- }
-
private void remoteCallFailed(final MasterProcedureEnv env, final IOException e) {
for (RegionOpenOperation op: operations) {
op.getRemoteProcedure().remoteCallFailed(env, getServerName(), e);
@@ -443,7 +439,6 @@ public class RSProcedureDispatcher
private void remoteCallCompleted(final MasterProcedureEnv env,
final CloseRegionResponse response) {
operation.setClosed(response.getClosed());
- operation.getRemoteProcedure().remoteCallCompleted(env, getServerName(), operation);
}
private void remoteCallFailed(final MasterProcedureEnv env, final IOException e) {
@@ -482,6 +477,11 @@ public class RSProcedureDispatcher
submitTask(new CloseRegionRemoteCall(serverName, op));
}
}
+
+ @Override
+ public void dispatchServerOperations(MasterProcedureEnv env, List<ServerOperation> operations) {
+ throw new UnsupportedOperationException();
+ }
}
// ==========================================================================
@@ -489,13 +489,28 @@ public class RSProcedureDispatcher
// - ServerOperation: refreshConfig, grant, revoke, ... (TODO)
// - RegionOperation: open, close, flush, snapshot, ...
// ==========================================================================
- /* Currently unused
- public static abstract class ServerOperation extends RemoteOperation {
- protected ServerOperation(final RemoteProcedure remoteProcedure) {
+
+ public static final class ServerOperation extends RemoteOperation {
+
+ private final long procId;
+
+ private final Class<?> rsProcClass;
+
+ private final byte[] rsProcData;
+
+ public ServerOperation(RemoteProcedure remoteProcedure, long procId, Class<?> rsProcClass,
+ byte[] rsProcData) {
super(remoteProcedure);
+ this.procId = procId;
+ this.rsProcClass = rsProcClass;
+ this.rsProcData = rsProcData;
+ }
+
+ public RemoteProcedureRequest buildRequest() {
+ return RemoteProcedureRequest.newBuilder().setProcId(procId)
+ .setProcClass(rsProcClass.getName()).setProcData(ByteString.copyFrom(rsProcData)).build();
}
}
- */
public static abstract class RegionOperation extends RemoteOperation {
private final RegionInfo regionInfo;
http://git-wip-us.apache.org/repos/asf/hbase/blob/a92d2226/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
new file mode 100644
index 0000000..fca05a7
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.replication;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface;
+import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
+import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
+import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationState;
+
+@InterfaceAudience.Private
+public abstract class ModifyPeerProcedure
+ extends StateMachineProcedure<MasterProcedureEnv, PeerModificationState>
+ implements PeerProcedureInterface {
+
+ private static final Log LOG = LogFactory.getLog(ModifyPeerProcedure.class);
+
+ protected String peerId;
+
+ protected ModifyPeerProcedure() {
+ }
+
+ protected ModifyPeerProcedure(String peerId) {
+ this.peerId = peerId;
+ }
+
+ @Override
+ public String getPeerId() {
+ return peerId;
+ }
+
+ /**
+ * Return {@code false} means that the operation is invalid and we should give up, otherwise
+ * {@code true}.
+ * <p>
+ * You need to call {@link #setFailure(String, Throwable)} to give the detail failure information.
+ */
+ protected abstract boolean updatePeerStorage() throws IOException;
+
+ protected void postPeerModification() {
+ }
+
+ @Override
+ protected Flow executeFromState(MasterProcedureEnv env, PeerModificationState state)
+ throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
+ switch (state) {
+ case UPDATE_PEER_STORAGE:
+ try {
+ if (!updatePeerStorage()) {
+ assert isFailed() : "setFailure is not called";
+ return Flow.NO_MORE_STATE;
+ }
+ } catch (IOException e) {
+ LOG.warn("update peer storage failed, retry", e);
+ throw new ProcedureYieldException();
+ }
+ setNextState(PeerModificationState.REFRESH_PEER_ON_RS);
+ return Flow.HAS_MORE_STATE;
+ case REFRESH_PEER_ON_RS:
+ addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream()
+ .map(sn -> new RefreshPeerProcedure(peerId, getPeerOperationType(), sn))
+ .toArray(RefreshPeerProcedure[]::new));
+ setNextState(PeerModificationState.POST_PEER_MODIFICATION);
+ return Flow.HAS_MORE_STATE;
+ case POST_PEER_MODIFICATION:
+ postPeerModification();
+ return Flow.NO_MORE_STATE;
+ default:
+ throw new UnsupportedOperationException("unhandled state=" + state);
+ }
+ }
+
+ @Override
+ protected LockState acquireLock(MasterProcedureEnv env) {
+ return env.getProcedureScheduler().waitPeerExclusiveLock(this, peerId)
+ ? LockState.LOCK_EVENT_WAIT
+ : LockState.LOCK_ACQUIRED;
+ }
+
+ @Override
+ protected void releaseLock(MasterProcedureEnv env) {
+ env.getProcedureScheduler().wakePeerExclusiveLock(this, peerId);
+ }
+
+ @Override
+ protected void rollbackState(MasterProcedureEnv env, PeerModificationState state)
+ throws IOException, InterruptedException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected PeerModificationState getState(int stateId) {
+ return PeerModificationState.forNumber(stateId);
+ }
+
+ @Override
+ protected int getStateId(PeerModificationState state) {
+ return state.getNumber();
+ }
+
+ @Override
+ protected PeerModificationState getInitialState() {
+ return PeerModificationState.UPDATE_PEER_STORAGE;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/a92d2226/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerCallable.java
new file mode 100644
index 0000000..4e09107
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerCallable.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.replication;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.executor.EventType;
+import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RefreshPeerParameter;
+
+/**
+ * The callable executed at RS side to refresh the peer config/state.
+ * <p>
+ * TODO: only a dummy implementation for verifying the framework, will add implementation later.
+ */
+@InterfaceAudience.Private
+public class RefreshPeerCallable implements RSProcedureCallable {
+
+ private HRegionServer rs;
+
+ private String peerId;
+
+ private Exception initError;
+
+ @Override
+ public Void call() throws Exception {
+ if (initError != null) {
+ throw initError;
+ }
+ rs.getFileSystem().create(new Path("/" + peerId + "/" + rs.getServerName().toString())).close();
+ return null;
+ }
+
+ @Override
+ public void init(byte[] parameter, HRegionServer rs) {
+ this.rs = rs;
+ try {
+ this.peerId = RefreshPeerParameter.parseFrom(parameter).getPeerId();
+ } catch (InvalidProtocolBufferException e) {
+ initError = e;
+ return;
+ }
+ }
+
+ @Override
+ public EventType getEventType() {
+ return EventType.RS_REFRESH_PEER;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/a92d2226/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java
new file mode 100644
index 0000000..18da487
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.replication;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface;
+import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.ServerOperation;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
+import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
+import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
+import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
+import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation;
+import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationType;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RefreshPeerParameter;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RefreshPeerStateData;
+
+@InterfaceAudience.Private
+public class RefreshPeerProcedure extends Procedure<MasterProcedureEnv>
+ implements PeerProcedureInterface, RemoteProcedure<MasterProcedureEnv, ServerName> {
+
+ private static final Log LOG = LogFactory.getLog(RefreshPeerProcedure.class);
+
+ private String peerId;
+
+ private PeerOperationType type;
+
+ private ServerName targetServer;
+
+ private boolean dispatched;
+
+ private ProcedureEvent<?> event;
+
+ private boolean succ;
+
+ public RefreshPeerProcedure() {
+ }
+
+ public RefreshPeerProcedure(String peerId, PeerOperationType type, ServerName targetServer) {
+ this.peerId = peerId;
+ this.type = type;
+ this.targetServer = targetServer;
+ }
+
+ @Override
+ public String getPeerId() {
+ return peerId;
+ }
+
+ @Override
+ public PeerOperationType getPeerOperationType() {
+ return PeerOperationType.REFRESH;
+ }
+
+ private static PeerModificationType toPeerModificationType(PeerOperationType type) {
+ switch (type) {
+ case ADD:
+ return PeerModificationType.ADD_PEER;
+ case REMOVE:
+ return PeerModificationType.REMOVE_PEER;
+ case ENABLE:
+ return PeerModificationType.ENABLE_PEER;
+ case DISABLE:
+ return PeerModificationType.DISABLE_PEER;
+ case UPDATE_CONFIG:
+ return PeerModificationType.UPDATE_PEER_CONFIG;
+ default:
+ throw new IllegalArgumentException("Unknown type: " + type);
+ }
+ }
+
+ private static PeerOperationType toPeerOperationType(PeerModificationType type) {
+ switch (type) {
+ case ADD_PEER:
+ return PeerOperationType.ADD;
+ case REMOVE_PEER:
+ return PeerOperationType.REMOVE;
+ case ENABLE_PEER:
+ return PeerOperationType.ENABLE;
+ case DISABLE_PEER:
+ return PeerOperationType.DISABLE;
+ case UPDATE_PEER_CONFIG:
+ return PeerOperationType.UPDATE_CONFIG;
+ default:
+ throw new IllegalArgumentException("Unknown type: " + type);
+ }
+ }
+
+ @Override
+ public RemoteOperation remoteCallBuild(MasterProcedureEnv env, ServerName remote) {
+ assert targetServer.equals(remote);
+ return new ServerOperation(this, getProcId(), RefreshPeerCallable.class,
+ RefreshPeerParameter.newBuilder().setPeerId(peerId).setType(toPeerModificationType(type))
+ .setTargetServer(ProtobufUtil.toServerName(remote)).build().toByteArray());
+ }
+
+ private void complete(MasterProcedureEnv env, boolean succ) {
+ if (event == null) {
+ LOG.warn("procedure event for " + getProcId() +
+ " is null, maybe the procedure is created when recovery", new Exception());
+ return;
+ }
+ LOG.info("Refresh peer " + peerId + " for " + type + " on " + targetServer +
+ (succ ? " suceeded" : " failed"));
+ this.succ = succ;
+ event.wake(env.getProcedureScheduler());
+ event = null;
+ }
+
+ @Override
+ public synchronized void remoteCallFailed(MasterProcedureEnv env, ServerName remote,
+ IOException exception) {
+ complete(env, false);
+ }
+
+ @Override
+ public synchronized void remoteOperationCompleted(MasterProcedureEnv env) {
+ complete(env, true);
+ }
+
+ @Override
+ public synchronized void remoteOperationFailed(MasterProcedureEnv env, String error) {
+ complete(env, false);
+ }
+
+ @Override
+ protected synchronized Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env)
+ throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
+ if (dispatched) {
+ if (succ) {
+ return null;
+ }
+ // retry
+ dispatched = false;
+ }
+ if (!env.getRemoteDispatcher().addOperationToNode(targetServer, this)) {
+ LOG.info("Can not add remote operation for refreshing peer " + peerId + " for " + type +
+ " to " + targetServer + ", this usually because the server is already dead," +
+ " give up and mark the procedure as complete");
+ return null;
+ }
+ dispatched = true;
+ event = new ProcedureEvent<>(this);
+ event.suspendIfNotReady(this);
+ throw new ProcedureSuspendedException();
+ }
+
+ @Override
+ protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected boolean abort(MasterProcedureEnv env) {
+ // TODO: no correctness problem if we just ignore this, implement later.
+ return false;
+ }
+
+ @Override
+ protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
+ serializer.serialize(
+ RefreshPeerStateData.newBuilder().setPeerId(peerId).setType(toPeerModificationType(type))
+ .setTargetServer(ProtobufUtil.toServerName(targetServer)).build());
+ }
+
+ @Override
+ protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
+ RefreshPeerStateData data = serializer.deserialize(RefreshPeerStateData.class);
+ peerId = data.getPeerId();
+ type = toPeerOperationType(data.getType());
+ targetServer = ProtobufUtil.toServerName(data.getTargetServer());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/a92d2226/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/RSProcedureCallable.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/RSProcedureCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/RSProcedureCallable.java
new file mode 100644
index 0000000..62c2e36
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/RSProcedureCallable.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure2;
+
+import java.util.concurrent.Callable;
+
+import org.apache.hadoop.hbase.executor.EventType;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * A general interface for a sub procedure runs at RS side.
+ */
+@InterfaceAudience.Private
+public interface RSProcedureCallable extends Callable<Void> {
+
+ /**
+ * Initialize the callable
+ * @param parameter the parameter passed from master.
+ * @param rs the regionserver instance
+ */
+ void init(byte[] parameter, HRegionServer rs);
+
+ /**
+ * Event type used to select thread pool.
+ */
+ EventType getEventType();
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/a92d2226/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 3a52a16..03b7ef7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -17,9 +17,6 @@
*/
package org.apache.hadoop.hbase.regionserver;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-import javax.servlet.http.HttpServlet;
import java.io.IOException;
import java.lang.Thread.UncaughtExceptionHandler;
import java.lang.management.MemoryType;
@@ -50,7 +47,9 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
-
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import javax.servlet.http.HttpServlet;
import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.SystemUtils;
import org.apache.hadoop.conf.Configuration;
@@ -117,6 +116,7 @@ import org.apache.hadoop.hbase.master.LoadBalancer;
import org.apache.hadoop.hbase.master.RegionState.State;
import org.apache.hadoop.hbase.mob.MobCacheConfig;
import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost;
+import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
import org.apache.hadoop.hbase.quotas.FileSystemUtilizationChore;
import org.apache.hadoop.hbase.quotas.QuotaUtil;
import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
@@ -127,6 +127,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
+import org.apache.hadoop.hbase.regionserver.handler.RSProcedureHandler;
import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler;
import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
@@ -173,14 +174,9 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
-import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
-import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
-import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
-import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
-import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
-import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
-import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
+import sun.misc.Signal;
+import sun.misc.SignalHandler;
+
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall;
@@ -206,12 +202,20 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProto
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUseReportRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportProcedureDoneRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
-import sun.misc.Signal;
-import sun.misc.SignalHandler;
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
+import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
+import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
+import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
+import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
+import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
/**
* HRegionServer makes a set of HRegions available to clients. It checks in with
@@ -1925,6 +1929,8 @@ public class HRegionServer extends HasThread implements
conf.getInt("hbase.regionserver.region.replica.flusher.threads",
conf.getInt("hbase.regionserver.executor.openregion.threads", 3)));
}
+ this.executorService.startExecutorService(ExecutorType.RS_REFRESH_PEER,
+ conf.getInt("hbase.regionserver.executor.refresh.peer.threads", 2));
Threads.setDaemonThreadRunning(this.walRoller.getThread(), getName() + ".logRoller",
uncaughtExceptionHandler);
@@ -3706,4 +3712,60 @@ public class HRegionServer extends HasThread implements
return ConnectionUtils.createShortCircuitConnection(conf, null, user, this.serverName,
this.rpcServices, this.rpcServices);
}
+
+ public void executeProcedure(long procId, RSProcedureCallable callable) {
+ executorService.submit(new RSProcedureHandler(this, procId, callable));
+ }
+
+ public void reportProcedureDone(long procId, Throwable error) {
+ ReportProcedureDoneRequest.Builder builder =
+ ReportProcedureDoneRequest.newBuilder().setProcId(procId);
+ if (error != null) {
+ builder.setStatus(ReportProcedureDoneRequest.Status.ERROR)
+ .setError(Throwables.getStackTraceAsString(error));
+ } else {
+ builder.setStatus(ReportProcedureDoneRequest.Status.SUCCESS);
+ }
+ ReportProcedureDoneRequest request = builder.build();
+ int tries = 0;
+ long pauseTime = INIT_PAUSE_TIME_MS;
+ while (keepLooping()) {
+ RegionServerStatusService.BlockingInterface rss = rssStub;
+ try {
+ if (rss == null) {
+ createRegionServerStatusStub();
+ continue;
+ }
+ rss.reportProcedureDone(null, request);
+ // Log if we had to retry else don't log unless TRACE. We want to
+ // know if were successful after an attempt showed in logs as failed.
+ if (tries > 0 || LOG.isTraceEnabled()) {
+ LOG.info("PROCEDURE REPORTED " + request);
+ }
+ return;
+ } catch (ServiceException se) {
+ IOException ioe = ProtobufUtil.getRemoteException(se);
+ boolean pause =
+ ioe instanceof ServerNotRunningYetException || ioe instanceof PleaseHoldException;
+ if (pause) {
+ // Do backoff else we flood the Master with requests.
+ pauseTime = ConnectionUtils.getPauseTime(INIT_PAUSE_TIME_MS, tries);
+ } else {
+ pauseTime = INIT_PAUSE_TIME_MS; // Reset.
+ }
+ LOG.info(
+ "Failed to report transition " + TextFormat.shortDebugString(request) + "; retry (#" +
+ tries + ")" + (pause ? " after " + pauseTime + "ms delay (Master is coming online...)."
+ : " immediately."),
+ ioe);
+ if (pause) {
+ Threads.sleep(pauseTime);
+ }
+ tries++;
+ if (rssStub == rss) {
+ rssStub = null;
+ }
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/a92d2226/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index b6c0ebe..e88f70e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -1,5 +1,4 @@
-/*
- *
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -100,6 +99,7 @@ import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hadoop.hbase.master.MasterRpcServices;
+import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
import org.apache.hadoop.hbase.quotas.ActivePolicyEnforcement;
import org.apache.hadoop.hbase.quotas.OperationQuota;
import org.apache.hadoop.hbase.quotas.QuotaUtil;
@@ -172,6 +172,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionR
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RemoteProcedureRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest;
@@ -3435,23 +3436,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
@Override
- public ExecuteProceduresResponse executeProcedures(RpcController controller,
- ExecuteProceduresRequest request) throws ServiceException {
- ExecuteProceduresResponse.Builder builder = ExecuteProceduresResponse.newBuilder();
- if (request.getOpenRegionCount() > 0) {
- for (OpenRegionRequest req : request.getOpenRegionList()) {
- builder.addOpenRegion(openRegion(controller, req));
- }
- }
- if (request.getCloseRegionCount() > 0) {
- for (CloseRegionRequest req : request.getCloseRegionList()) {
- builder.addCloseRegion(closeRegion(controller, req));
- }
- }
- return builder.build();
- }
-
- @Override
public ClearRegionBlockCacheResponse clearRegionBlockCache(RpcController controller,
ClearRegionBlockCacheRequest request) {
ClearRegionBlockCacheResponse.Builder builder =
@@ -3468,4 +3452,38 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
stats.withMaxCacheSize(regionServer.getCacheConfig().getBlockCache().getMaxSize());
return builder.setStats(ProtobufUtil.toCacheEvictionStats(stats.build())).build();
}
+
+ @Override
+ public ExecuteProceduresResponse executeProcedures(RpcController controller,
+ ExecuteProceduresRequest request) throws ServiceException {
+ if (request.getOpenRegionCount() > 0) {
+ for (OpenRegionRequest req : request.getOpenRegionList()) {
+ openRegion(controller, req);
+ }
+ }
+ if (request.getCloseRegionCount() > 0) {
+ for (CloseRegionRequest req : request.getCloseRegionList()) {
+ closeRegion(controller, req);
+ }
+ }
+ if (request.getProcCount() > 0) {
+ for (RemoteProcedureRequest req : request.getProcList()) {
+ RSProcedureCallable callable;
+ try {
+ callable =
+ Class.forName(req.getProcClass()).asSubclass(RSProcedureCallable.class).newInstance();
+ } catch (Exception e) {
+ // here we just ignore the error as this should not happen and we do not provide a general
+ // way to report errors for all types of remote procedure. The procedure will hang at
+ // master side but after you solve the problem and restart master it will be executed
+ // again and pass.
+ LOG.warn("create procedure of type " + req.getProcClass() + " failed, give up", e);
+ continue;
+ }
+ callable.init(req.getProcData().toByteArray(), regionServer);
+ regionServer.executeProcedure(req.getProcId(), callable);
+ }
+ }
+ return ExecuteProceduresResponse.getDefaultInstance();
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/a92d2226/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RSProcedureHandler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RSProcedureHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RSProcedureHandler.java
new file mode 100644
index 0000000..94bcfec
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RSProcedureHandler.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.handler;
+
+import org.apache.hadoop.hbase.executor.EventHandler;
+import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * A event handler for running procedure.
+ */
+@InterfaceAudience.Private
+public class RSProcedureHandler extends EventHandler {
+
+ private final long procId;
+
+ private final RSProcedureCallable callable;
+
+ public RSProcedureHandler(HRegionServer rs, long procId, RSProcedureCallable callable) {
+ super(rs, callable.getEventType());
+ this.procId = procId;
+ this.callable = callable;
+ }
+
+ @Override
+ public void process() {
+ Exception error = null;
+ try {
+ callable.call();
+ } catch (Exception e) {
+ error = e;
+ }
+ ((HRegionServer) server).reportProcedureDone(procId, error);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/a92d2226/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java
index 3c453bc..3ab915b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java
@@ -531,26 +531,16 @@ public class TestAssignmentManager {
@Override
public ExecuteProceduresResponse sendRequest(ServerName server,
ExecuteProceduresRequest request) throws IOException {
- ExecuteProceduresResponse.Builder builder = ExecuteProceduresResponse.newBuilder();
if (request.getOpenRegionCount() > 0) {
- for (OpenRegionRequest req: request.getOpenRegionList()) {
- OpenRegionResponse.Builder resp = OpenRegionResponse.newBuilder();
- for (RegionOpenInfo openReq: req.getOpenInfoList()) {
- RegionOpeningState state = execOpenRegion(server, openReq);
- if (state != null) {
- resp.addOpeningState(state);
- }
+ for (OpenRegionRequest req : request.getOpenRegionList()) {
+ for (RegionOpenInfo openReq : req.getOpenInfoList()) {
+ execOpenRegion(server, openReq);
}
- builder.addOpenRegion(resp.build());
}
}
if (request.getCloseRegionCount() > 0) {
- for (CloseRegionRequest req: request.getCloseRegionList()) {
- CloseRegionResponse resp = execCloseRegion(server,
- req.getRegion().getValue().toByteArray());
- if (resp != null) {
- builder.addCloseRegion(resp);
- }
+ for (CloseRegionRequest req : request.getCloseRegionList()) {
+ execCloseRegion(server, req.getRegion().getValue().toByteArray());
}
}
return ExecuteProceduresResponse.newBuilder().build();
http://git-wip-us.apache.org/repos/asf/hbase/blob/a92d2226/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/DummyModifyPeerProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/DummyModifyPeerProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/DummyModifyPeerProcedure.java
new file mode 100644
index 0000000..44343d7
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/DummyModifyPeerProcedure.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.replication;
+
+import java.io.IOException;
+
+public class DummyModifyPeerProcedure extends ModifyPeerProcedure {
+
+ public DummyModifyPeerProcedure() {
+ }
+
+ public DummyModifyPeerProcedure(String peerId) {
+ super(peerId);
+ }
+
+ @Override
+ public PeerOperationType getPeerOperationType() {
+ return PeerOperationType.ADD;
+ }
+
+ @Override
+ protected boolean updatePeerStorage() throws IOException {
+ return true;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/a92d2226/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestDummyModifyPeerProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestDummyModifyPeerProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestDummyModifyPeerProcedure.java
new file mode 100644
index 0000000..ec06306
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestDummyModifyPeerProcedure.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.replication;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ MasterTests.class, LargeTests.class })
+public class TestDummyModifyPeerProcedure {
+
+ private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+ private static String PEER_ID;
+
+ private static Path DIR;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ UTIL.startMiniCluster(3);
+ PEER_ID = "testPeer";
+ DIR = new Path("/" + PEER_ID);
+ UTIL.getTestFileSystem().mkdirs(DIR);
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ UTIL.shutdownMiniCluster();
+ }
+
+ @Test
+ public void test() throws Exception {
+ ProcedureExecutor<?> executor =
+ UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
+ long procId = executor.submitProcedure(new DummyModifyPeerProcedure(PEER_ID));
+ UTIL.waitFor(30000, new Waiter.Predicate<Exception>() {
+
+ @Override
+ public boolean evaluate() throws Exception {
+ return executor.isFinished(procId);
+ }
+ });
+ Set<String> serverNames = UTIL.getHBaseCluster().getRegionServerThreads().stream()
+ .map(t -> t.getRegionServer().getServerName().toString())
+ .collect(Collectors.toCollection(HashSet::new));
+ for (FileStatus s : UTIL.getTestFileSystem().listStatus(DIR)) {
+ assertTrue(serverNames.remove(s.getPath().getName()));
+ }
+ assertTrue(serverNames.isEmpty());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/a92d2226/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java
index be1b0e4..99e212d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java
@@ -30,6 +30,7 @@ import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
import com.google.protobuf.ServiceException;
+
import java.io.IOException;
import java.security.PrivilegedAction;
import java.util.ArrayList;