You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/08/20 22:16:28 UTC
[7/7] hbase git commit: HBASE-20881 Introduce a region transition
procedure to handle all the state transition for a region
HBASE-20881 Introduce a region transition procedure to handle all the state transition for a region
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/bb349413
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/bb349413
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/bb349413
Branch: refs/heads/master
Commit: bb3494134edb6a4e607ab199e4d0542135d83a64
Parents: 7db116a
Author: zhangduo <zh...@apache.org>
Authored: Mon Aug 20 21:02:56 2018 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Tue Aug 21 06:12:09 2018 +0800
----------------------------------------------------------------------
.../apache/hadoop/hbase/master/RegionState.java | 180 ++---
.../hbase/procedure2/ProcedureExecutor.java | 29 +-
.../hbase/procedure2/StateMachineProcedure.java | 2 +-
.../procedure2/ProcedureTestingUtility.java | 29 +-
.../src/main/protobuf/ClusterStatus.proto | 4 +
.../src/main/protobuf/MasterProcedure.proto | 28 +
.../hbase/rsgroup/RSGroupAdminServer.java | 2 +-
.../hbase/rsgroup/RSGroupInfoManagerImpl.java | 2 +-
.../hadoop/hbase/master/AssignmentListener.java | 42 --
.../org/apache/hadoop/hbase/master/HMaster.java | 88 ++-
.../hadoop/hbase/master/MasterDumpServlet.java | 2 +-
.../master/assignment/AssignProcedure.java | 290 +------
.../master/assignment/AssignmentManager.java | 752 +++++++++----------
.../assignment/AssignmentManagerUtil.java | 195 +++++
.../master/assignment/CloseRegionProcedure.java | 82 ++
.../assignment/MergeTableRegionsProcedure.java | 125 ++-
.../master/assignment/MoveRegionProcedure.java | 69 +-
.../master/assignment/OpenRegionProcedure.java | 67 ++
.../assignment/RegionRemoteProcedureBase.java | 157 ++++
.../master/assignment/RegionStateNode.java | 313 ++++++++
.../master/assignment/RegionStateStore.java | 2 +-
.../hbase/master/assignment/RegionStates.java | 472 ++----------
.../assignment/RegionTransitionProcedure.java | 347 +--------
.../hbase/master/assignment/ServerState.java | 55 ++
.../master/assignment/ServerStateNode.java | 128 ++++
.../assignment/SplitTableRegionProcedure.java | 123 ++-
.../assignment/TransitRegionStateProcedure.java | 569 ++++++++++++++
.../master/assignment/UnassignProcedure.java | 238 +-----
.../hadoop/hbase/master/assignment/Util.java | 72 --
.../AbstractStateMachineRegionProcedure.java | 7 +-
.../AbstractStateMachineTableProcedure.java | 27 +-
.../master/procedure/CreateTableProcedure.java | 2 +-
.../master/procedure/DeleteTableProcedure.java | 12 +-
.../master/procedure/DisableTableProcedure.java | 53 +-
.../master/procedure/EnableTableProcedure.java | 3 +-
.../master/procedure/InitMetaProcedure.java | 7 +-
.../master/procedure/RecoverMetaProcedure.java | 199 +----
.../procedure/ReopenTableRegionsProcedure.java | 35 +-
.../master/procedure/ServerCrashProcedure.java | 146 ++--
.../procedure/TruncateTableProcedure.java | 7 +-
.../hbase/master/TestAssignmentListener.java | 294 --------
.../master/TestMasterAbortAndRSGotKilled.java | 67 +-
.../TestMergeTableRegionsWhileRSCrash.java | 9 +-
...stServerCrashProcedureCarryingMetaStuck.java | 9 +-
.../master/TestServerCrashProcedureStuck.java | 9 +-
.../master/TestSplitRegionWhileRSCrash.java | 7 +-
.../master/assignment/MockMasterServices.java | 2 +-
.../assignment/TestAMAssignWithRandExec.java | 53 ++
.../assignment/TestAMServerFailedOpen.java | 134 ++++
.../assignment/TestAssignmentManager.java | 699 +----------------
.../assignment/TestAssignmentManagerBase.java | 586 +++++++++++++++
.../assignment/TestAssignmentManagerUtil.java | 134 ++++
.../assignment/TestCloseRegionWhileRSCrash.java | 237 ++++++
.../TestMergeTableRegionsProcedure.java | 32 +-
.../TestSplitTableRegionProcedure.java | 9 +-
.../TestTransitRegionStateProcedure.java | 164 ++++
.../TestUnexpectedStateException.java | 167 ----
.../TestFavoredStochasticLoadBalancer.java | 2 +-
.../MasterProcedureTestingUtility.java | 24 +-
.../procedure/TestCloneSnapshotProcedure.java | 29 +-
.../procedure/TestEnableTableProcedure.java | 3 +-
.../procedure/TestRecoverMetaProcedure.java | 109 ---
.../procedure/TestServerCrashProcedure.java | 4 +
.../procedure/TestTruncateTableProcedure.java | 5 +-
.../master/snapshot/TestAssignProcedure.java | 216 ------
.../hbase/regionserver/TestRegionMove.java | 17 +-
.../TestSplitTransactionOnCluster.java | 105 +--
67 files changed, 4004 insertions(+), 4084 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/bb349413/hbase-client/src/main/java/org/apache/hadoop/hbase/master/RegionState.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/master/RegionState.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/master/RegionState.java
index 7289ce8..745e1ea 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/master/RegionState.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/master/RegionState.java
@@ -51,9 +51,13 @@ public class RegionState {
SPLITTING_NEW, // new region to be created when RS splits a parent
// region but hasn't be created yet, or master doesn't
// know it's already created
- MERGING_NEW; // new region to be created when RS merges two
+ MERGING_NEW, // new region to be created when RS merges two
// daughter regions but hasn't be created yet, or
// master doesn't know it's already created
+ ABNORMALLY_CLOSED; // the region is CLOSED because of a RS crashes. Usually it is the same
+ // with CLOSED, but for some operations such as merge/split, we can not
+ // apply it to a region in this state, as it may lead to data loss as we
+ // may have some data in recovered edits.
/**
* Convert to protobuf ClusterStatusProtos.RegionState.State
@@ -61,47 +65,50 @@ public class RegionState {
public ClusterStatusProtos.RegionState.State convert() {
ClusterStatusProtos.RegionState.State rs;
switch (this) {
- case OFFLINE:
- rs = ClusterStatusProtos.RegionState.State.OFFLINE;
- break;
- case OPENING:
- rs = ClusterStatusProtos.RegionState.State.OPENING;
- break;
- case OPEN:
- rs = ClusterStatusProtos.RegionState.State.OPEN;
- break;
- case CLOSING:
- rs = ClusterStatusProtos.RegionState.State.CLOSING;
- break;
- case CLOSED:
- rs = ClusterStatusProtos.RegionState.State.CLOSED;
- break;
- case SPLITTING:
- rs = ClusterStatusProtos.RegionState.State.SPLITTING;
- break;
- case SPLIT:
- rs = ClusterStatusProtos.RegionState.State.SPLIT;
- break;
- case FAILED_OPEN:
- rs = ClusterStatusProtos.RegionState.State.FAILED_OPEN;
- break;
- case FAILED_CLOSE:
- rs = ClusterStatusProtos.RegionState.State.FAILED_CLOSE;
- break;
- case MERGING:
- rs = ClusterStatusProtos.RegionState.State.MERGING;
- break;
- case MERGED:
- rs = ClusterStatusProtos.RegionState.State.MERGED;
- break;
- case SPLITTING_NEW:
- rs = ClusterStatusProtos.RegionState.State.SPLITTING_NEW;
- break;
- case MERGING_NEW:
- rs = ClusterStatusProtos.RegionState.State.MERGING_NEW;
- break;
- default:
- throw new IllegalStateException("");
+ case OFFLINE:
+ rs = ClusterStatusProtos.RegionState.State.OFFLINE;
+ break;
+ case OPENING:
+ rs = ClusterStatusProtos.RegionState.State.OPENING;
+ break;
+ case OPEN:
+ rs = ClusterStatusProtos.RegionState.State.OPEN;
+ break;
+ case CLOSING:
+ rs = ClusterStatusProtos.RegionState.State.CLOSING;
+ break;
+ case CLOSED:
+ rs = ClusterStatusProtos.RegionState.State.CLOSED;
+ break;
+ case SPLITTING:
+ rs = ClusterStatusProtos.RegionState.State.SPLITTING;
+ break;
+ case SPLIT:
+ rs = ClusterStatusProtos.RegionState.State.SPLIT;
+ break;
+ case FAILED_OPEN:
+ rs = ClusterStatusProtos.RegionState.State.FAILED_OPEN;
+ break;
+ case FAILED_CLOSE:
+ rs = ClusterStatusProtos.RegionState.State.FAILED_CLOSE;
+ break;
+ case MERGING:
+ rs = ClusterStatusProtos.RegionState.State.MERGING;
+ break;
+ case MERGED:
+ rs = ClusterStatusProtos.RegionState.State.MERGED;
+ break;
+ case SPLITTING_NEW:
+ rs = ClusterStatusProtos.RegionState.State.SPLITTING_NEW;
+ break;
+ case MERGING_NEW:
+ rs = ClusterStatusProtos.RegionState.State.MERGING_NEW;
+ break;
+ case ABNORMALLY_CLOSED:
+ rs = ClusterStatusProtos.RegionState.State.ABNORMALLY_CLOSED;
+ break;
+ default:
+ throw new IllegalStateException("");
}
return rs;
}
@@ -114,49 +121,52 @@ public class RegionState {
public static State convert(ClusterStatusProtos.RegionState.State protoState) {
State state;
switch (protoState) {
- case OFFLINE:
- state = OFFLINE;
- break;
- case PENDING_OPEN:
- case OPENING:
- state = OPENING;
- break;
- case OPEN:
- state = OPEN;
- break;
- case PENDING_CLOSE:
- case CLOSING:
- state = CLOSING;
- break;
- case CLOSED:
- state = CLOSED;
- break;
- case SPLITTING:
- state = SPLITTING;
- break;
- case SPLIT:
- state = SPLIT;
- break;
- case FAILED_OPEN:
- state = FAILED_OPEN;
- break;
- case FAILED_CLOSE:
- state = FAILED_CLOSE;
- break;
- case MERGING:
- state = MERGING;
- break;
- case MERGED:
- state = MERGED;
- break;
- case SPLITTING_NEW:
- state = SPLITTING_NEW;
- break;
- case MERGING_NEW:
- state = MERGING_NEW;
- break;
- default:
- throw new IllegalStateException("Unhandled state " + protoState);
+ case OFFLINE:
+ state = OFFLINE;
+ break;
+ case PENDING_OPEN:
+ case OPENING:
+ state = OPENING;
+ break;
+ case OPEN:
+ state = OPEN;
+ break;
+ case PENDING_CLOSE:
+ case CLOSING:
+ state = CLOSING;
+ break;
+ case CLOSED:
+ state = CLOSED;
+ break;
+ case SPLITTING:
+ state = SPLITTING;
+ break;
+ case SPLIT:
+ state = SPLIT;
+ break;
+ case FAILED_OPEN:
+ state = FAILED_OPEN;
+ break;
+ case FAILED_CLOSE:
+ state = FAILED_CLOSE;
+ break;
+ case MERGING:
+ state = MERGING;
+ break;
+ case MERGED:
+ state = MERGED;
+ break;
+ case SPLITTING_NEW:
+ state = SPLITTING_NEW;
+ break;
+ case MERGING_NEW:
+ state = MERGING_NEW;
+ break;
+ case ABNORMALLY_CLOSED:
+ state = ABNORMALLY_CLOSED;
+ break;
+ default:
+ throw new IllegalStateException("Unhandled state " + protoState);
}
return state;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/bb349413/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
index 695c7b0..fe97404 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Deque;
import java.util.HashSet;
import java.util.Iterator;
@@ -94,6 +95,7 @@ public class ProcedureExecutor<TEnvironment> {
* Class with parameters describing how to fail/die when in testing-context.
*/
public static class Testing {
+ protected boolean killIfHasParent = true;
protected boolean killIfSuspended = false;
/**
@@ -120,8 +122,14 @@ public class ProcedureExecutor<TEnvironment> {
return kill;
}
- protected boolean shouldKillBeforeStoreUpdate(final boolean isSuspended) {
- return (isSuspended && !killIfSuspended) ? false : shouldKillBeforeStoreUpdate();
+ protected boolean shouldKillBeforeStoreUpdate(boolean isSuspended, boolean hasParent) {
+ if (isSuspended && !killIfSuspended) {
+ return false;
+ }
+ if (hasParent && !killIfHasParent) {
+ return false;
+ }
+ return shouldKillBeforeStoreUpdate();
}
protected boolean shouldKillAfterStoreUpdate() {
@@ -457,6 +465,7 @@ public class ProcedureExecutor<TEnvironment> {
int failedCount = 0;
while (procIter.hasNext()) {
boolean finished = procIter.isNextFinished();
+ @SuppressWarnings("unchecked")
Procedure<TEnvironment> proc = procIter.next();
NonceKey nonceKey = proc.getNonceKey();
long procId = proc.getProcId();
@@ -508,6 +517,7 @@ public class ProcedureExecutor<TEnvironment> {
continue;
}
+ @SuppressWarnings("unchecked")
Procedure<TEnvironment> proc = procIter.next();
assert !(proc.isFinished() && !proc.hasParent()) : "unexpected completed proc=" + proc;
@@ -1180,6 +1190,17 @@ public class ProcedureExecutor<TEnvironment> {
}
/**
+ * Should only be used when starting up, where the procedure workers have not been started.
+ * <p/>
+ * If the procedure works has been started, the return values maybe changed when you are
+ * processing it so usually this is not safe. Use {@link #getProcedures()} below for most cases as
+ * it will do a copy, and also include the finished procedures.
+ */
+ public Collection<Procedure<TEnvironment>> getActiveProceduresNoCopy() {
+ return procedures.values();
+ }
+
+ /**
* Get procedures.
* @return the procedures in a list
*/
@@ -1607,7 +1628,8 @@ public class ProcedureExecutor<TEnvironment> {
// allows to kill the executor before something is stored to the wal.
// useful to test the procedure recovery.
- if (testing != null && testing.shouldKillBeforeStoreUpdate(suspended)) {
+ if (testing != null &&
+ testing.shouldKillBeforeStoreUpdate(suspended, procedure.hasParent())) {
kill("TESTING: Kill BEFORE store update: " + procedure);
}
@@ -1839,6 +1861,7 @@ public class ProcedureExecutor<TEnvironment> {
long lastUpdate = EnvironmentEdgeManager.currentTime();
try {
while (isRunning() && keepAlive(lastUpdate)) {
+ @SuppressWarnings("unchecked")
Procedure<TEnvironment> proc = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS);
if (proc == null) {
continue;
http://git-wip-us.apache.org/repos/asf/hbase/blob/bb349413/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java
index 4ed82f2..986b250 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java
@@ -185,7 +185,7 @@ public abstract class StateMachineProcedure<TEnvironment, TState>
this.cycles++;
}
- LOG.trace("{}", toString());
+ LOG.trace("{}", this);
stateFlow = executeFromState(env, state);
if (!hasMoreState()) setNextState(EOF_STATE);
if (subProcList != null && !subProcList.isEmpty()) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/bb349413/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
index 138215b..d52b6bb 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
@@ -26,7 +26,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Set;
import java.util.concurrent.Callable;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -37,14 +36,16 @@ import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
-import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
-import org.apache.hbase.thirdparty.com.google.protobuf.BytesValue;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
import org.apache.hadoop.hbase.util.NonceKey;
import org.apache.hadoop.hbase.util.Threads;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
+import org.apache.hbase.thirdparty.com.google.protobuf.BytesValue;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
+
public class ProcedureTestingUtility {
private static final Logger LOG = LoggerFactory.getLogger(ProcedureTestingUtility.class);
@@ -67,7 +68,7 @@ public class ProcedureTestingUtility {
}
public static <TEnv> void restart(final ProcedureExecutor<TEnv> procExecutor) throws Exception {
- restart(procExecutor, false, true, null, null);
+ restart(procExecutor, false, true, null, null, null);
}
public static void initAndStartWorkers(ProcedureExecutor<?> procExecutor, int numThreads,
@@ -76,9 +77,9 @@ public class ProcedureTestingUtility {
procExecutor.startWorkers();
}
- public static <TEnv> void restart(final ProcedureExecutor<TEnv> procExecutor,
- final boolean avoidTestKillDuringRestart, final boolean failOnCorrupted,
- final Callable<Void> stopAction, final Callable<Void> startAction)
+ public static <TEnv> void restart(ProcedureExecutor<TEnv> procExecutor,
+ boolean avoidTestKillDuringRestart, boolean failOnCorrupted, Callable<Void> stopAction,
+ Callable<Void> actionBeforeStartWorker, Callable<Void> startAction)
throws Exception {
final ProcedureStore procStore = procExecutor.getStore();
final int storeThreads = procExecutor.getCorePoolSize();
@@ -104,7 +105,11 @@ public class ProcedureTestingUtility {
// re-start
LOG.info("RESTART - Start");
procStore.start(storeThreads);
- initAndStartWorkers(procExecutor, execThreads, failOnCorrupted);
+ procExecutor.init(execThreads, failOnCorrupted);
+ if (actionBeforeStartWorker != null) {
+ actionBeforeStartWorker.call();
+ }
+ procExecutor.startWorkers();
if (startAction != null) {
startAction.call();
}
@@ -139,6 +144,12 @@ public class ProcedureTestingUtility {
}
}
+ public static <TEnv> void setKillIfHasParent(ProcedureExecutor<TEnv> procExecutor,
+ boolean value) {
+ createExecutorTesting(procExecutor);
+ procExecutor.testing.killIfHasParent = value;
+ }
+
public static <TEnv> void setKillIfSuspended(ProcedureExecutor<TEnv> procExecutor,
boolean value) {
createExecutorTesting(procExecutor);
http://git-wip-us.apache.org/repos/asf/hbase/blob/bb349413/hbase-protocol-shaded/src/main/protobuf/ClusterStatus.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol-shaded/src/main/protobuf/ClusterStatus.proto b/hbase-protocol-shaded/src/main/protobuf/ClusterStatus.proto
index 399ff5e..d39db36 100644
--- a/hbase-protocol-shaded/src/main/protobuf/ClusterStatus.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/ClusterStatus.proto
@@ -52,6 +52,10 @@ message RegionState {
MERGING_NEW = 14; // new region to be created when RS merges two
// daughter regions but hasn't be created yet, or
// master doesn't know it's already created
+ ABNORMALLY_CLOSED = 15;// the region is CLOSED because of a RS crash. Usually it is the same
+ // with CLOSED, but for some operations such as merge/split, we can not
+ // apply it to a region in this state, as it may lead to data loss as we
+ // may have some data in recovered edits.
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/bb349413/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
index 5227e64..e50a913 100644
--- a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
@@ -519,3 +519,31 @@ message ReplaySyncReplicationWALParameter {
required string peer_id = 1;
repeated string wal = 2;
}
+
+enum RegionStateTransitionState {
+ REGION_STATE_TRANSITION_GET_ASSIGN_CANDIDATE = 1;
+ REGION_STATE_TRANSITION_OPEN = 2;
+ REGION_STATE_TRANSITION_CONFIRM_OPENED = 3;
+ REGION_STATE_TRANSITION_CLOSE = 4;
+ REGION_STATE_TRANSITION_CONFIRM_CLOSED = 5;
+}
+
+message RegionStateTransitionStateData {
+ required RegionStateTransitionState initialState = 1;
+ required RegionStateTransitionState lastState = 2;
+ optional ServerName assign_candidate = 3;
+ required bool force_new_plan = 4;
+}
+
+message RegionRemoteProcedureBaseStateData {
+ required RegionInfo region = 1;
+ required ServerName target_server = 2;
+ required bool dispatched = 3;
+}
+
+message OpenRegionProcedureStateData {
+}
+
+message CloseRegionProcedureStateData {
+ optional ServerName assign_candidate = 1;
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/bb349413/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServer.java
----------------------------------------------------------------------
diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServer.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServer.java
index b39d3a1..720b193 100644
--- a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServer.java
+++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServer.java
@@ -41,7 +41,7 @@ import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.master.ServerManager;
import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
-import org.apache.hadoop.hbase.master.assignment.RegionStates.RegionStateNode;
+import org.apache.hadoop.hbase.master.assignment.RegionStateNode;
import org.apache.hadoop.hbase.net.Address;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
http://git-wip-us.apache.org/repos/asf/hbase/blob/bb349413/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java
----------------------------------------------------------------------
diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java
index 8e70f5e..ee0651b 100644
--- a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java
+++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java
@@ -64,7 +64,7 @@ import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.ServerListener;
import org.apache.hadoop.hbase.master.TableStateManager;
-import org.apache.hadoop.hbase.master.assignment.RegionStates.RegionStateNode;
+import org.apache.hadoop.hbase.master.assignment.RegionStateNode;
import org.apache.hadoop.hbase.net.Address;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
http://git-wip-us.apache.org/repos/asf/hbase/blob/bb349413/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentListener.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentListener.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentListener.java
deleted file mode 100644
index 84a7042..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentListener.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.master;
-
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.yetus.audience.InterfaceAudience;
-
-/**
- * Get notification of assignment events. The invocations are inline
- * so make sure your implementation is fast else you'll slow hbase.
- */
-@InterfaceAudience.Private
-public interface AssignmentListener {
- /**
- * The region was opened on the specified server.
- * @param regionInfo The opened region.
- * @param serverName The remote servers name.
- */
- void regionOpened(final RegionInfo regionInfo, final ServerName serverName);
-
- /**
- * The region was closed on the region server.
- * @param regionInfo The closed region.
- */
- void regionClosed(final RegionInfo regionInfo);
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/bb349413/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index abea148..50794f4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -100,10 +100,14 @@ import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hadoop.hbase.master.MasterRpcServices.BalanceSwitchMode;
+import org.apache.hadoop.hbase.master.assignment.AssignProcedure;
import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
import org.apache.hadoop.hbase.master.assignment.MergeTableRegionsProcedure;
+import org.apache.hadoop.hbase.master.assignment.MoveRegionProcedure;
+import org.apache.hadoop.hbase.master.assignment.RegionStateNode;
import org.apache.hadoop.hbase.master.assignment.RegionStates;
-import org.apache.hadoop.hbase.master.assignment.RegionStates.RegionStateNode;
+import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure;
+import org.apache.hadoop.hbase.master.assignment.UnassignProcedure;
import org.apache.hadoop.hbase.master.balancer.BalancerChore;
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore;
@@ -130,6 +134,7 @@ import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
import org.apache.hadoop.hbase.master.procedure.ModifyTableProcedure;
import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
+import org.apache.hadoop.hbase.master.procedure.RecoverMetaProcedure;
import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
import org.apache.hadoop.hbase.master.procedure.TruncateTableProcedure;
import org.apache.hadoop.hbase.master.replication.AbstractPeerProcedure;
@@ -213,6 +218,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
@@ -797,6 +803,45 @@ public class HMaster extends HRegionServer implements MasterServices {
this.mpmHost.initialize(this, this.metricsMaster);
}
+ private static final ImmutableSet<Class<?>> UNSUPPORTED_PROCEDURES =
+ ImmutableSet.of(RecoverMetaProcedure.class, AssignProcedure.class, UnassignProcedure.class,
+ MoveRegionProcedure.class);
+
+ /**
+ * In HBASE-20811, we have introduced a new TRSP to assign/unassign/move regions, and it is
+ * incompatible with the old AssignProcedure/UnassignProcedure/MoveRegionProcedure. So we need to
+ * make sure that there are none these procedures when upgrading. If there are, the master will
+ * quit, you need to go back to the old version to finish these procedures first before upgrading.
+ */
+ private void checkUnsupportedProcedure(
+ Map<Class<? extends Procedure>, List<Procedure<MasterProcedureEnv>>> procsByType)
+ throws HBaseIOException {
+ // Confirm that we do not have unfinished assign/unassign related procedures. It is not easy to
+ // support both the old assign/unassign procedures and the new TransitRegionStateProcedure as
+ // there will be conflict in the code for AM. We should finish all these procedures before
+ // upgrading.
+ for (Class<?> clazz : UNSUPPORTED_PROCEDURES) {
+ List<Procedure<MasterProcedureEnv>> procs = procsByType.get(clazz);
+ if (procs != null) {
+ LOG.error(
+ "Unsupported procedure type {} found, please rollback your master to the old" +
+ " version to finish them, and then try to upgrade again. The full procedure list: {}",
+ clazz, procs);
+ throw new HBaseIOException("Unsupported procedure type " + clazz + " found");
+ }
+ }
+ // A special check for SCP, as we do not support RecoverMetaProcedure any more so we need to
+ // make sure that no one will try to schedule it but SCP does have a state which will schedule
+ // it.
+ if (procsByType.getOrDefault(ServerCrashProcedure.class, Collections.emptyList()).stream()
+ .map(p -> (ServerCrashProcedure) p).anyMatch(ServerCrashProcedure::isInRecoverMetaState)) {
+ LOG.error("At least one ServerCrashProcedure is going to schedule a RecoverMetaProcedure," +
+ " which is not supported any more. Please rollback your master to the old version to" +
+ " finish them, and then try to upgrade again.");
+ throw new HBaseIOException("Unsupported procedure state found for ServerCrashProcedure");
+ }
+ }
+
/**
* Finish initialization of HMaster after becoming the primary master.
* <p/>
@@ -870,24 +915,39 @@ public class HMaster extends HRegionServer implements MasterServices {
ZKClusterId.setClusterId(this.zooKeeper, fileSystemManager.getClusterId());
this.clusterId = clusterId.toString();
-
-
status.setStatus("Initialze ServerManager and schedule SCP for crash servers");
this.serverManager = createServerManager(this);
createProcedureExecutor();
+ @SuppressWarnings("rawtypes")
+ Map<Class<? extends Procedure>, List<Procedure<MasterProcedureEnv>>> procsByType =
+ procedureExecutor.getActiveProceduresNoCopy().stream()
+ .collect(Collectors.groupingBy(p -> p.getClass()));
+
+ checkUnsupportedProcedure(procsByType);
+
// Create Assignment Manager
this.assignmentManager = new AssignmentManager(this);
this.assignmentManager.start();
+ // TODO: TRSP can perform as the sub procedure for other procedures, so even if it is marked as
+ // completed, it could still be in the procedure list. This is a bit strange but is another
+ // story, need to verify the implementation for ProcedureExecutor and ProcedureStore.
+ List<TransitRegionStateProcedure> ritList =
+ procsByType.getOrDefault(TransitRegionStateProcedure.class, Collections.emptyList()).stream()
+ .filter(p -> !p.isFinished()).map(p -> (TransitRegionStateProcedure) p)
+ .collect(Collectors.toList());
+ this.assignmentManager.setupRIT(ritList);
+
this.regionServerTracker = new RegionServerTracker(zooKeeper, this, this.serverManager);
this.regionServerTracker.start(
- procedureExecutor.getProcedures().stream().filter(p -> p instanceof ServerCrashProcedure)
- .map(p -> ((ServerCrashProcedure) p).getServerName()).collect(Collectors.toSet()),
+ procsByType.getOrDefault(ServerCrashProcedure.class, Collections.emptyList()).stream()
+ .map(p -> (ServerCrashProcedure) p).map(p -> p.getServerName()).collect(Collectors.toSet()),
walManager.getLiveServersFromWALDir());
// This manager will be started AFTER hbase:meta is confirmed on line.
// hbase.mirror.table.state.to.zookeeper is so hbase1 clients can connect. They read table
// state from zookeeper while hbase2 reads it from hbase:meta. Disable if no hbase1 clients.
this.tableStateManager =
- this.conf.getBoolean(MirroringTableStateManager.MIRROR_TABLE_STATE_TO_ZK_KEY, true)?
+ this.conf.getBoolean(MirroringTableStateManager.MIRROR_TABLE_STATE_TO_ZK_KEY, true)
+ ?
new MirroringTableStateManager(this):
new TableStateManager(this);
@@ -3577,7 +3637,6 @@ public class HMaster extends HRegionServer implements MasterServices {
* Mark region server(s) as decommissioned (previously called 'draining') to prevent additional
* regions from getting assigned to them. Also unload the regions on the servers asynchronously.0
* @param servers Region servers to decommission.
- * @throws HBaseIOException
*/
public void decommissionRegionServers(final List<ServerName> servers, final boolean offload)
throws HBaseIOException {
@@ -3590,7 +3649,7 @@ public class HMaster extends HRegionServer implements MasterServices {
ZKUtil.createAndFailSilent(getZooKeeper(), node);
} catch (KeeperException ke) {
throw new HBaseIOException(
- this.zooKeeper.prefix("Unable to decommission '" + server.getServerName() + "'."), ke);
+ this.zooKeeper.prefix("Unable to decommission '" + server.getServerName() + "'."), ke);
}
if (this.serverManager.addServerToDrainList(server)) {
serversAdded.add(server);
@@ -3601,7 +3660,7 @@ public class HMaster extends HRegionServer implements MasterServices {
final List<ServerName> destServers = this.serverManager.createDestinationServersList();
for (ServerName server : serversAdded) {
final List<RegionInfo> regionsOnServer =
- this.assignmentManager.getRegionStates().getServerRegionInfoSet(server);
+ this.assignmentManager.getRegionStates().getServerRegionInfoSet(server);
for (RegionInfo hri : regionsOnServer) {
ServerName dest = balancer.randomAssignment(hri, destServers);
if (dest == null) {
@@ -3627,10 +3686,9 @@ public class HMaster extends HRegionServer implements MasterServices {
* Remove decommission marker (previously called 'draining') from a region server to allow regions
* assignments. Load regions onto the server asynchronously if a list of regions is given
* @param server Region server to remove decommission marker from.
- * @throws HBaseIOException
*/
public void recommissionRegionServer(final ServerName server,
- final List<byte[]> encodedRegionNames) throws HBaseIOException {
+ final List<byte[]> encodedRegionNames) throws IOException {
// Remove the server from decommissioned (draining) server list.
String parentZnode = getZooKeeper().getZNodePaths().drainingZNode;
String node = ZNodePaths.joinZNode(parentZnode, server.getServerName());
@@ -3638,7 +3696,7 @@ public class HMaster extends HRegionServer implements MasterServices {
ZKUtil.deleteNodeFailSilent(getZooKeeper(), node);
} catch (KeeperException ke) {
throw new HBaseIOException(
- this.zooKeeper.prefix("Unable to recommission '" + server.getServerName() + "'."), ke);
+ this.zooKeeper.prefix("Unable to recommission '" + server.getServerName() + "'."), ke);
}
this.serverManager.removeServerFromDrainList(server);
@@ -3651,15 +3709,15 @@ public class HMaster extends HRegionServer implements MasterServices {
}
for (byte[] encodedRegionName : encodedRegionNames) {
RegionState regionState =
- assignmentManager.getRegionStates().getRegionState(Bytes.toString(encodedRegionName));
+ assignmentManager.getRegionStates().getRegionState(Bytes.toString(encodedRegionName));
if (regionState == null) {
LOG.warn("Unknown region " + Bytes.toStringBinary(encodedRegionName));
continue;
}
RegionInfo hri = regionState.getRegion();
if (server.equals(regionState.getServerName())) {
- LOG.info("Skipping move of region " + hri.getRegionNameAsString()
- + " because region already assigned to the same server " + server + ".");
+ LOG.info("Skipping move of region " + hri.getRegionNameAsString() +
+ " because region already assigned to the same server " + server + ".");
continue;
}
RegionPlan rp = new RegionPlan(hri, regionState.getServerName(), server);
http://git-wip-us.apache.org/repos/asf/hbase/blob/bb349413/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterDumpServlet.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterDumpServlet.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterDumpServlet.java
index 0dd50ff..ec8e523 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterDumpServlet.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterDumpServlet.java
@@ -30,7 +30,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ServerMetrics;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
-import org.apache.hadoop.hbase.master.assignment.RegionStates.RegionStateNode;
+import org.apache.hadoop.hbase.master.assignment.RegionStateNode;
import org.apache.hadoop.hbase.monitoring.LogMonitoring;
import org.apache.hadoop.hbase.monitoring.StateDumpServlet;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
http://git-wip-us.apache.org/repos/asf/hbase/blob/bb349413/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignProcedure.java
index 55aee4a..33a3545 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignProcedure.java
@@ -1,5 +1,4 @@
-/*
- *
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -16,20 +15,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.hadoop.hbase.master.assignment;
import java.io.IOException;
-import java.util.Comparator;
-
import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
-import org.apache.hadoop.hbase.master.RegionState.State;
-import org.apache.hadoop.hbase.master.TableStateManager;
-import org.apache.hadoop.hbase.master.assignment.RegionStates.RegionStateNode;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.RegionOpenOperation;
import org.apache.hadoop.hbase.procedure2.ProcedureMetrics;
@@ -37,75 +27,27 @@ import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation;
import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.AssignRegionStateData;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionTransitionState;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
/**
- * Procedure that describe the assignment of a single region.
- * There can only be one RegionTransitionProcedure per region running at a time
- * since each procedure takes a lock on the region.
- *
- * <p>The Assign starts by pushing the "assign" operation to the AssignmentManager
- * and then will go in a "waiting" state.
- * The AM will batch the "assign" requests and ask the Balancer where to put
- * the region (the various policies will be respected: retain, round-robin, random).
- * Once the AM and the balancer have found a place for the region the procedure
- * will be resumed and an "open region" request will be placed in the Remote Dispatcher
- * queue, and the procedure once again will go in a "waiting state".
- * The Remote Dispatcher will batch the various requests for that server and
- * they will be sent to the RS for execution.
- * The RS will complete the open operation by calling master.reportRegionStateTransition().
- * The AM will intercept the transition report, and notify the procedure.
- * The procedure will finish the assignment by publishing to new state on meta
- * or it will retry the assignment.
- *
- * <p>This procedure does not rollback when beyond the first
- * REGION_TRANSITION_QUEUE step; it will press on trying to assign in the face of
- * failure. Should we ignore rollback calls to Assign/Unassign then? Or just
- * remove rollback here?
+ * Leave here only for checking if we can successfully start the master.
+ * @deprecated Do not use any more.
+ * @see TransitRegionStateProcedure
*/
// TODO: Add being able to assign a region to open read-only.
+@Deprecated
@InterfaceAudience.Private
public class AssignProcedure extends RegionTransitionProcedure {
- private static final Logger LOG = LoggerFactory.getLogger(AssignProcedure.class);
- /**
- * Set to true when we need recalibrate -- choose a new target -- because original assign failed.
- */
private boolean forceNewPlan = false;
- /**
- * Gets set as desired target on move, merge, etc., when we want to go to a particular server.
- * We may not be able to respect this request but will try. When it is NOT set, then we ask
- * the balancer to assign. This value is used below in startTransition to set regionLocation if
- * non-null. Setting regionLocation in regionServerNode is how we override balancer setting
- * destination.
- */
protected volatile ServerName targetServer;
- /**
- * Comparator that will sort AssignProcedures so meta assigns come first, then system table
- * assigns and finally user space assigns.
- */
- public static final CompareAssignProcedure COMPARATOR = new CompareAssignProcedure();
-
public AssignProcedure() {
- // Required by the Procedure framework to create the procedure on replay
- super();
- }
-
- public AssignProcedure(final RegionInfo regionInfo) {
- super(regionInfo);
- this.targetServer = null;
- }
-
- public AssignProcedure(final RegionInfo regionInfo, final ServerName destinationServer) {
- super(regionInfo);
- this.targetServer = destinationServer;
}
@Override
@@ -124,10 +66,9 @@ public class AssignProcedure extends RegionTransitionProcedure {
}
@Override
- protected void serializeStateData(ProcedureStateSerializer serializer)
- throws IOException {
- final AssignRegionStateData.Builder state = AssignRegionStateData.newBuilder()
- .setTransitionState(getTransitionState())
+ protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
+ final AssignRegionStateData.Builder state =
+ AssignRegionStateData.newBuilder().setTransitionState(getTransitionState())
.setRegionInfo(ProtobufUtil.toRegionInfo(getRegionInfo()));
if (forceNewPlan) {
state.setForceNewPlan(true);
@@ -142,8 +83,7 @@ public class AssignProcedure extends RegionTransitionProcedure {
}
@Override
- protected void deserializeStateData(ProcedureStateSerializer serializer)
- throws IOException {
+ protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
final AssignRegionStateData state = serializer.deserialize(AssignRegionStateData.class);
setTransitionState(state.getTransitionState());
setRegionInfo(ProtobufUtil.toRegionInfo(state.getRegionInfo()));
@@ -159,202 +99,36 @@ public class AssignProcedure extends RegionTransitionProcedure {
@Override
protected boolean startTransition(final MasterProcedureEnv env, final RegionStateNode regionNode)
throws IOException {
- // If the region is already open we can't do much...
- if (regionNode.isInState(State.OPEN) && isServerOnline(env, regionNode)) {
- LOG.info("Assigned, not reassigning; " + this + "; " + regionNode.toShortString());
- return false;
- }
- // Don't assign if table is in disabling or disabled state.
- TableStateManager tsm = env.getMasterServices().getTableStateManager();
- TableName tn = regionNode.getRegionInfo().getTable();
- if (tsm.getTableState(tn).isDisabledOrDisabling()) {
- LOG.info("Table " + tn + " state=" + tsm.getTableState(tn) + ", skipping " + this);
- return false;
- }
- // If the region is SPLIT, we can't assign it. But state might be CLOSED, rather than
- // SPLIT which is what a region gets set to when unassigned as part of SPLIT. FIX.
- if (regionNode.isInState(State.SPLIT) ||
- (regionNode.getRegionInfo().isOffline() && regionNode.getRegionInfo().isSplit())) {
- LOG.info("SPLIT, cannot be assigned; " + this + "; " + regionNode +
- "; hri=" + regionNode.getRegionInfo());
- return false;
- }
-
- // If we haven't started the operation yet, we can abort
- if (aborted.get() && regionNode.isInState(State.CLOSED, State.OFFLINE)) {
- if (incrementAndCheckMaxAttempts(env, regionNode)) {
- regionNode.setState(State.FAILED_OPEN);
- setFailure(getClass().getSimpleName(),
- new RetriesExhaustedException("Max attempts exceeded"));
- } else {
- setAbortFailure(getClass().getSimpleName(), "Abort requested");
- }
- return false;
- }
-
- // Send assign (add into assign-pool). We call regionNode.offline below to set state to
- // OFFLINE and to clear the region location. Setting a new regionLocation here is how we retain
- // old assignment or specify target server if a move or merge. See
- // AssignmentManager#processAssignQueue. Otherwise, balancer gives us location.
- // TODO: Region will be set into OFFLINE state below regardless of what its previous state was
- // This is dangerous? Wrong? What if region was in an unexpected state?
- ServerName lastRegionLocation = regionNode.offline();
- boolean retain = false;
- if (!forceNewPlan) {
- if (this.targetServer != null) {
- retain = targetServer.equals(lastRegionLocation);
- regionNode.setRegionLocation(targetServer);
- } else {
- if (lastRegionLocation != null) {
- // Try and keep the location we had before we offlined.
- retain = true;
- regionNode.setRegionLocation(lastRegionLocation);
- } else if (regionNode.getLastHost() != null) {
- retain = true;
- LOG.info("Setting lastHost as the region location " + regionNode.getLastHost());
- regionNode.setRegionLocation(regionNode.getLastHost());
- }
- }
- }
- LOG.info("Starting " + this + "; " + regionNode.toShortString() +
- "; forceNewPlan=" + this.forceNewPlan +
- ", retain=" + retain);
- env.getAssignmentManager().queueAssign(regionNode);
return true;
}
@Override
protected boolean updateTransition(final MasterProcedureEnv env, final RegionStateNode regionNode)
- throws IOException, ProcedureSuspendedException {
- // TODO: crash if destinationServer is specified and not online
- // which is also the case when the balancer provided us with a different location.
- if (LOG.isTraceEnabled()) {
- LOG.trace("Update " + this + "; " + regionNode.toShortString());
- }
- if (regionNode.getRegionLocation() == null) {
- setTransitionState(RegionTransitionState.REGION_TRANSITION_QUEUE);
- return true;
- }
-
- if (!isServerOnline(env, regionNode)) {
- // TODO: is this correct? should we wait the chore/ssh?
- LOG.info("Server not online, re-queuing " + this + "; " + regionNode.toShortString());
- setTransitionState(RegionTransitionState.REGION_TRANSITION_QUEUE);
- return true;
- }
-
- if (env.getAssignmentManager().waitServerReportEvent(regionNode.getRegionLocation(), this)) {
- LOG.info("Early suspend! " + this + "; " + regionNode.toShortString());
- throw new ProcedureSuspendedException();
- }
-
- if (regionNode.isInState(State.OPEN)) {
- LOG.info("Already assigned: " + this + "; " + regionNode.toShortString());
- return false;
- }
-
- // Transition regionNode State. Set it to OPENING. Update hbase:meta, and add
- // region to list of regions on the target regionserver. Need to UNDO if failure!
- env.getAssignmentManager().markRegionAsOpening(regionNode);
-
- // TODO: Requires a migration to be open by the RS?
- // regionNode.getFormatVersion()
-
- if (!addToRemoteDispatcher(env, regionNode.getRegionLocation())) {
- // Failed the dispatch BUT addToRemoteDispatcher internally does
- // cleanup on failure -- even the undoing of markRegionAsOpening above --
- // so nothing more to do here; in fact we need to get out of here
- // fast since we've been put back on the scheduler.
- }
-
- // We always return true, even if we fail dispatch because addToRemoteDispatcher
- // failure processing sets state back to REGION_TRANSITION_QUEUE so we try again;
- // i.e. return true to keep the Procedure running; it has been reset to startover.
+ throws IOException, ProcedureSuspendedException {
return true;
}
@Override
protected void finishTransition(final MasterProcedureEnv env, final RegionStateNode regionNode)
throws IOException {
- env.getAssignmentManager().markRegionAsOpened(regionNode);
- // This success may have been after we failed open a few times. Be sure to cleanup any
- // failed open references. See #incrementAndCheckMaxAttempts and where it is called.
- env.getAssignmentManager().getRegionStates().removeFromFailedOpen(regionNode.getRegionInfo());
}
@Override
protected void reportTransition(final MasterProcedureEnv env, final RegionStateNode regionNode,
final TransitionCode code, final long openSeqNum) throws UnexpectedStateException {
- switch (code) {
- case OPENED:
- if (openSeqNum < 0) {
- throw new UnexpectedStateException("Received report unexpected " + code +
- " transition openSeqNum=" + openSeqNum + ", " + regionNode);
- }
- if (openSeqNum < regionNode.getOpenSeqNum()) {
- // Don't bother logging if openSeqNum == 0
- if (openSeqNum != 0) {
- LOG.warn("Skipping update of open seqnum with " + openSeqNum +
- " because current seqnum=" + regionNode.getOpenSeqNum());
- }
- } else {
- regionNode.setOpenSeqNum(openSeqNum);
- }
- // Leave the state here as OPENING for now. We set it to OPEN in
- // REGION_TRANSITION_FINISH section where we do a bunch of checks.
- // regionNode.setState(RegionState.State.OPEN, RegionState.State.OPENING);
- setTransitionState(RegionTransitionState.REGION_TRANSITION_FINISH);
- break;
- case FAILED_OPEN:
- handleFailure(env, regionNode);
- break;
- default:
- throw new UnexpectedStateException("Received report unexpected " + code +
- " transition openSeqNum=" + openSeqNum + ", " + regionNode.toShortString() +
- ", " + this + ", expected OPENED or FAILED_OPEN.");
- }
- }
-
- /**
- * Called when dispatch or subsequent OPEN request fail. Can be run by the
- * inline dispatch call or later by the ServerCrashProcedure. Our state is
- * generally OPENING. Cleanup and reset to OFFLINE and put our Procedure
- * State back to REGION_TRANSITION_QUEUE so the Assign starts over.
- */
- private void handleFailure(final MasterProcedureEnv env, final RegionStateNode regionNode) {
- if (incrementAndCheckMaxAttempts(env, regionNode)) {
- aborted.set(true);
- }
- this.forceNewPlan = true;
- this.targetServer = null;
- regionNode.offline();
- // We were moved to OPENING state before dispatch. Undo. It is safe to call
- // this method because it checks for OPENING first.
- env.getAssignmentManager().undoRegionAsOpening(regionNode);
- setTransitionState(RegionTransitionState.REGION_TRANSITION_QUEUE);
- }
-
- private boolean incrementAndCheckMaxAttempts(final MasterProcedureEnv env,
- final RegionStateNode regionNode) {
- final int retries = env.getAssignmentManager().getRegionStates().
- addToFailedOpen(regionNode).incrementAndGetRetries();
- int max = env.getAssignmentManager().getAssignMaxAttempts();
- LOG.info("Retry=" + retries + " of max=" + max + "; " +
- this + "; " + regionNode.toShortString());
- return retries >= max;
}
@Override
- public RemoteOperation remoteCallBuild(final MasterProcedureEnv env, final ServerName serverName) {
+ public RemoteOperation remoteCallBuild(final MasterProcedureEnv env,
+ final ServerName serverName) {
assert serverName.equals(getRegionState(env).getRegionLocation());
return new RegionOpenOperation(this, getRegionInfo(),
- env.getAssignmentManager().getFavoredNodes(getRegionInfo()), false);
+ env.getAssignmentManager().getFavoredNodes(getRegionInfo()), false);
}
@Override
protected boolean remoteCallFailed(final MasterProcedureEnv env, final RegionStateNode regionNode,
final IOException exception) {
- handleFailure(env, regionNode);
return true;
}
@@ -365,43 +139,7 @@ public class AssignProcedure extends RegionTransitionProcedure {
}
@Override
- public ServerName getServer(final MasterProcedureEnv env) {
- RegionStateNode node =
- env.getAssignmentManager().getRegionStates().getRegionStateNode(this.getRegionInfo());
- if (node == null) return null;
- return node.getRegionLocation();
- }
-
- @Override
protected ProcedureMetrics getProcedureMetrics(MasterProcedureEnv env) {
return env.getAssignmentManager().getAssignmentManagerMetrics().getAssignProcMetrics();
}
-
- /**
- * Sort AssignProcedures such that meta and system assigns come first before user-space assigns.
- * Have to do it this way w/ distinct Comparator because Procedure is already Comparable on
- * 'Env'(?).
- */
- public static class CompareAssignProcedure implements Comparator<AssignProcedure> {
- @Override
- public int compare(AssignProcedure left, AssignProcedure right) {
- if (left.getRegionInfo().isMetaRegion()) {
- if (right.getRegionInfo().isMetaRegion()) {
- return RegionInfo.COMPARATOR.compare(left.getRegionInfo(), right.getRegionInfo());
- }
- return -1;
- } else if (right.getRegionInfo().isMetaRegion()) {
- return +1;
- }
- if (left.getRegionInfo().getTable().isSystemTable()) {
- if (right.getRegionInfo().getTable().isSystemTable()) {
- return RegionInfo.COMPARATOR.compare(left.getRegionInfo(), right.getRegionInfo());
- }
- return -1;
- } else if (right.getRegionInfo().getTable().isSystemTable()) {
- return +1;
- }
- return RegionInfo.COMPARATOR.compare(left.getRegionInfo(), right.getRegionInfo());
- }
- }
}