You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2018/08/03 23:41:11 UTC
[2/2] hbase git commit: HBASE-21004 Backport to branch-2.0
HBASE-20708 "Remove the usage of RecoverMetaProcedure"
HBASE-21004 Backport to branch-2.0 HBASE-20708 "Remove the usage of RecoverMetaProcedure"
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/013ea3e3
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/013ea3e3
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/013ea3e3
Branch: refs/heads/branch-2.0
Commit: 013ea3e3d25ebf2afa21d93e12065c80deb8bcb5
Parents: 7a16237
Author: zhangduo <zh...@apache.org>
Authored: Mon Jun 18 20:24:32 2018 +0800
Committer: Michael Stack <st...@apache.org>
Committed: Fri Aug 3 16:41:02 2018 -0700
----------------------------------------------------------------------
.../apache/hadoop/hbase/MetaTableAccessor.java | 10 +-
.../hbase/procedure2/ProcedureExecutor.java | 45 +--
.../procedure2/ProcedureTestingUtility.java | 10 +-
.../hbase/procedure2/TestChildProcedures.java | 4 +-
.../hbase/procedure2/TestProcedureEvents.java | 4 +-
.../procedure2/TestProcedureExecution.java | 4 +-
.../hbase/procedure2/TestProcedureExecutor.java | 5 +-
.../procedure2/TestProcedureInMemoryChore.java | 5 +-
.../hbase/procedure2/TestProcedureMetrics.java | 2 +-
.../hbase/procedure2/TestProcedureNonce.java | 4 +-
.../hbase/procedure2/TestProcedureRecovery.java | 4 +-
.../procedure2/TestProcedureReplayOrder.java | 6 +-
.../procedure2/TestProcedureSuspended.java | 4 +-
.../procedure2/TestStateMachineProcedure.java | 4 +-
.../hbase/procedure2/TestYieldProcedures.java | 6 +-
.../src/main/protobuf/MasterProcedure.proto | 15 +-
.../hadoop/hbase/master/CatalogJanitor.java | 13 +-
.../org/apache/hadoop/hbase/master/HMaster.java | 202 +++++++------
.../hbase/master/MasterMetaBootstrap.java | 41 +--
.../hadoop/hbase/master/MasterServices.java | 12 -
.../hadoop/hbase/master/MasterWalManager.java | 28 +-
.../hbase/master/RegionServerTracker.java | 19 +-
.../hadoop/hbase/master/ServerManager.java | 163 ++---------
.../master/assignment/AssignmentManager.java | 293 +++++++------------
.../assignment/MergeTableRegionsProcedure.java | 2 +-
.../master/assignment/RegionStateStore.java | 17 +-
.../assignment/RegionTransitionProcedure.java | 11 +-
.../assignment/SplitTableRegionProcedure.java | 2 +-
.../AbstractStateMachineTableProcedure.java | 5 +-
.../master/procedure/InitMetaProcedure.java | 115 ++++++++
.../master/procedure/MasterProcedureEnv.java | 16 +-
.../procedure/MasterProcedureScheduler.java | 26 +-
.../procedure/MetaProcedureInterface.java | 5 +
.../hbase/master/procedure/MetaQueue.java | 5 +
.../master/procedure/RecoverMetaProcedure.java | 9 +-
.../hbase/master/procedure/SchemaLocking.java | 4 +
.../master/procedure/ServerCrashProcedure.java | 61 ++--
.../hadoop/hbase/TestMetaTableAccessor.java | 2 -
.../hbase/master/MockNoopMasterServices.java | 20 +-
.../hadoop/hbase/master/TestCatalogJanitor.java | 3 +-
.../master/assignment/MockMasterServices.java | 25 +-
.../assignment/TestAssignmentManager.java | 17 +-
.../MasterProcedureTestingUtility.java | 5 -
.../procedure/TestMasterProcedureEvents.java | 76 +----
.../procedure/TestServerCrashProcedure.java | 32 +-
45 files changed, 618 insertions(+), 743 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/013ea3e3/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
index 1880a0d..e855263 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
@@ -1334,9 +1334,17 @@ public class MetaTableAccessor {
*/
public static void putsToMetaTable(final Connection connection, final List<Put> ps)
throws IOException {
+ if (ps.isEmpty()) {
+ return;
+ }
try (Table t = getMetaHTable(connection)) {
debugLogMutations(ps);
- t.put(ps);
+ // the implementation for putting a single Put is much simpler so here we do a check first.
+ if (ps.size() == 1) {
+ t.put(ps.get(0));
+ } else {
+ t.put(ps);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/013ea3e3/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
index 9b98c59..263b6be 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
@@ -520,21 +520,16 @@ public class ProcedureExecutor<TEnvironment> {
}
/**
- * Start the procedure executor.
- * It calls ProcedureStore.recoverLease() and ProcedureStore.load() to
- * recover the lease, and ensure a single executor, and start the procedure
- * replay to resume and recover the previous pending and in-progress perocedures.
- *
+ * Initialize the procedure executor, but do not start workers. We will start them later.
+ * <p/>
+ * It calls ProcedureStore.recoverLease() and ProcedureStore.load() to recover the lease, and
+ * ensure a single executor, and start the procedure replay to resume and recover the previous
+ * pending and in-progress procedures.
* @param numThreads number of threads available for procedure execution.
- * @param abortOnCorruption true if you want to abort your service in case
- * a corrupted procedure is found on replay. otherwise false.
+ * @param abortOnCorruption true if you want to abort your service in case a corrupted procedure
+ * is found on replay. otherwise false.
*/
- public void start(int numThreads, boolean abortOnCorruption) throws IOException {
- if (!running.compareAndSet(false, true)) {
- LOG.warn("Already running");
- return;
- }
-
+ public void init(int numThreads, boolean abortOnCorruption) throws IOException {
// We have numThreads executor + one timer thread used for timing out
// procedures and triggering periodic procedures.
this.corePoolSize = numThreads;
@@ -555,11 +550,11 @@ public class ProcedureExecutor<TEnvironment> {
long st, et;
// Acquire the store lease.
- st = EnvironmentEdgeManager.currentTime();
+ st = System.nanoTime();
store.recoverLease();
- et = EnvironmentEdgeManager.currentTime();
+ et = System.nanoTime();
LOG.info("Recovered {} lease in {}", store.getClass().getSimpleName(),
- StringUtils.humanTimeDiff(et - st));
+ StringUtils.humanTimeDiff(TimeUnit.NANOSECONDS.toMillis(et - st)));
// start the procedure scheduler
scheduler.start();
@@ -569,12 +564,21 @@ public class ProcedureExecutor<TEnvironment> {
// The first one will make sure that we have the latest id,
// so we can start the threads and accept new procedures.
// The second step will do the actual load of old procedures.
- st = EnvironmentEdgeManager.currentTime();
+ st = System.nanoTime();
load(abortOnCorruption);
- et = EnvironmentEdgeManager.currentTime();
+ et = System.nanoTime();
LOG.info("Loaded {} in {}", store.getClass().getSimpleName(),
- StringUtils.humanTimeDiff(et - st));
+ StringUtils.humanTimeDiff(TimeUnit.NANOSECONDS.toMillis(et - st)));
+ }
+ /**
+ * Start the workers.
+ */
+ public void startWorkers() throws IOException {
+ if (!running.compareAndSet(false, true)) {
+ LOG.warn("Already running");
+ return;
+ }
// Start the executors. Here we must have the lastProcId set.
LOG.trace("Start workers {}", workerThreads.size());
timeoutExecutor.start();
@@ -870,7 +874,6 @@ public class ProcedureExecutor<TEnvironment> {
justification = "FindBugs is blind to the check-for-null")
public long submitProcedure(final Procedure proc, final NonceKey nonceKey) {
Preconditions.checkArgument(lastProcId.get() >= 0);
- Preconditions.checkArgument(isRunning(), "executor not running");
prepareProcedure(proc);
@@ -904,7 +907,6 @@ public class ProcedureExecutor<TEnvironment> {
// TODO: Do we need to take nonces here?
public void submitProcedures(final Procedure[] procs) {
Preconditions.checkArgument(lastProcId.get() >= 0);
- Preconditions.checkArgument(isRunning(), "executor not running");
if (procs == null || procs.length <= 0) {
return;
}
@@ -928,7 +930,6 @@ public class ProcedureExecutor<TEnvironment> {
private Procedure prepareProcedure(final Procedure proc) {
Preconditions.checkArgument(proc.getState() == ProcedureState.INITIALIZING);
- Preconditions.checkArgument(isRunning(), "executor not running");
Preconditions.checkArgument(!proc.hasParent(), "unexpected parent", proc);
if (this.checkOwnerSet) {
Preconditions.checkArgument(proc.hasOwner(), "missing owner");
http://git-wip-us.apache.org/repos/asf/hbase/blob/013ea3e3/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
index d29e376..e8d72f9 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
@@ -70,6 +70,12 @@ public class ProcedureTestingUtility {
restart(procExecutor, false, true, null, null);
}
+ public static void initAndStartWorkers(ProcedureExecutor<?> procExecutor, int numThreads,
+ boolean abortOnCorruption) throws IOException {
+ procExecutor.init(numThreads, abortOnCorruption);
+ procExecutor.startWorkers();
+ }
+
public static <TEnv> void restart(final ProcedureExecutor<TEnv> procExecutor,
final boolean avoidTestKillDuringRestart, final boolean failOnCorrupted,
final Callable<Void> stopAction, final Callable<Void> startAction)
@@ -98,7 +104,7 @@ public class ProcedureTestingUtility {
// re-start
LOG.info("RESTART - Start");
procStore.start(storeThreads);
- procExecutor.start(execThreads, failOnCorrupted);
+ initAndStartWorkers(procExecutor, execThreads, failOnCorrupted);
if (startAction != null) {
startAction.call();
}
@@ -183,7 +189,7 @@ public class ProcedureTestingUtility {
NoopProcedureStore procStore = new NoopProcedureStore();
ProcedureExecutor<TEnv> procExecutor = new ProcedureExecutor<>(conf, env, procStore);
procStore.start(1);
- procExecutor.start(1, false);
+ initAndStartWorkers(procExecutor, 1, false);
try {
return submitAndWait(procExecutor, proc, HConstants.NO_NONCE, HConstants.NO_NONCE);
} finally {
http://git-wip-us.apache.org/repos/asf/hbase/blob/013ea3e3/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestChildProcedures.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestChildProcedures.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestChildProcedures.java
index 3d99b31..cce4caf 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestChildProcedures.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestChildProcedures.java
@@ -66,10 +66,10 @@ public class TestChildProcedures {
logDir = new Path(testDir, "proc-logs");
procEnv = new TestProcEnv();
procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), logDir);
- procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore);
+ procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), procEnv, procStore);
procExecutor.testing = new ProcedureExecutor.Testing();
procStore.start(PROCEDURE_EXECUTOR_SLOTS);
- procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true);
+ ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true);
}
@After
http://git-wip-us.apache.org/repos/asf/hbase/blob/013ea3e3/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureEvents.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureEvents.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureEvents.java
index b7c59c8..8351e4c 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureEvents.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureEvents.java
@@ -67,9 +67,9 @@ public class TestProcedureEvents {
procEnv = new TestProcEnv();
procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
- procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore);
+ procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), procEnv, procStore);
procStore.start(1);
- procExecutor.start(1, true);
+ ProcedureTestingUtility.initAndStartWorkers(procExecutor, 1, true);
}
@After
http://git-wip-us.apache.org/repos/asf/hbase/blob/013ea3e3/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecution.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecution.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecution.java
index 7e660e4..a3cff58 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecution.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecution.java
@@ -71,9 +71,9 @@ public class TestProcedureExecution {
logDir = new Path(testDir, "proc-logs");
procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
- procExecutor = new ProcedureExecutor(htu.getConfiguration(), null, procStore);
+ procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), null, procStore);
procStore.start(PROCEDURE_EXECUTOR_SLOTS);
- procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true);
+ ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true);
}
@After
http://git-wip-us.apache.org/repos/asf/hbase/blob/013ea3e3/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecutor.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecutor.java
index 1c53098..7f130ca 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecutor.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecutor.java
@@ -24,7 +24,6 @@ import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
-import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure;
import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore;
import org.apache.hadoop.hbase.testclassification.MasterTests;
@@ -71,8 +70,8 @@ public class TestProcedureExecutor {
}
private void createNewExecutor(final Configuration conf, final int numThreads) throws Exception {
- procExecutor = new ProcedureExecutor(conf, procEnv, procStore);
- procExecutor.start(numThreads, true);
+ procExecutor = new ProcedureExecutor<>(conf, procEnv, procStore);
+ ProcedureTestingUtility.initAndStartWorkers(procExecutor, numThreads, true);
}
@Test
http://git-wip-us.apache.org/repos/asf/hbase/blob/013ea3e3/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureInMemoryChore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureInMemoryChore.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureInMemoryChore.java
index 86293e1..75c8d16 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureInMemoryChore.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureInMemoryChore.java
@@ -53,17 +53,16 @@ public class TestProcedureInMemoryChore {
private HBaseCommonTestingUtility htu;
- @SuppressWarnings("rawtypes")
@Before
public void setUp() throws IOException {
htu = new HBaseCommonTestingUtility();
procEnv = new TestProcEnv();
procStore = new NoopProcedureStore();
- procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore);
+ procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), procEnv, procStore);
procExecutor.testing = new ProcedureExecutor.Testing();
procStore.start(PROCEDURE_EXECUTOR_SLOTS);
- procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true);
+ ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true);
}
@After
http://git-wip-us.apache.org/repos/asf/hbase/blob/013ea3e3/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureMetrics.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureMetrics.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureMetrics.java
index 94a293d..2acb7dd 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureMetrics.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureMetrics.java
@@ -75,7 +75,7 @@ public class TestProcedureMetrics {
procExecutor = new ProcedureExecutor<TestProcEnv>(htu.getConfiguration(), procEnv, procStore);
procExecutor.testing = new ProcedureExecutor.Testing();
procStore.start(PROCEDURE_EXECUTOR_SLOTS);
- procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true);
+ ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true);
}
@After
http://git-wip-us.apache.org/repos/asf/hbase/blob/013ea3e3/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureNonce.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureNonce.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureNonce.java
index b702314..2bf11fb 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureNonce.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureNonce.java
@@ -72,10 +72,10 @@ public class TestProcedureNonce {
logDir = new Path(testDir, "proc-logs");
procEnv = new TestProcEnv();
procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), logDir);
- procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore);
+ procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), procEnv, procStore);
procExecutor.testing = new ProcedureExecutor.Testing();
procStore.start(PROCEDURE_EXECUTOR_SLOTS);
- procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true);
+ ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true);
}
@After
http://git-wip-us.apache.org/repos/asf/hbase/blob/013ea3e3/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java
index aece1de..532fcf3 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java
@@ -76,10 +76,10 @@ public class TestProcedureRecovery {
logDir = new Path(testDir, "proc-logs");
procEnv = new TestProcEnv();
procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), logDir);
- procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore);
+ procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), procEnv, procStore);
procExecutor.testing = new ProcedureExecutor.Testing();
procStore.start(PROCEDURE_EXECUTOR_SLOTS);
- procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true);
+ ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true);
procSleepInterval = 0;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/013ea3e3/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java
index 7d0529e..319ddb2 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java
@@ -53,7 +53,7 @@ public class TestProcedureReplayOrder {
private static final int NUM_THREADS = 16;
- private ProcedureExecutor<Void> procExecutor;
+ private ProcedureExecutor<TestProcedureEnv> procExecutor;
private TestProcedureEnv procEnv;
private ProcedureStore procStore;
@@ -74,9 +74,9 @@ public class TestProcedureReplayOrder {
logDir = new Path(testDir, "proc-logs");
procEnv = new TestProcedureEnv();
procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
- procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore);
+ procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), procEnv, procStore);
procStore.start(NUM_THREADS);
- procExecutor.start(1, true);
+ ProcedureTestingUtility.initAndStartWorkers(procExecutor, 1, true);
}
@After
http://git-wip-us.apache.org/repos/asf/hbase/blob/013ea3e3/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java
index 3da7c11..a9e919c 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java
@@ -60,9 +60,9 @@ public class TestProcedureSuspended {
htu = new HBaseCommonTestingUtility();
procStore = new NoopProcedureStore();
- procExecutor = new ProcedureExecutor(htu.getConfiguration(), new TestProcEnv(), procStore);
+ procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), new TestProcEnv(), procStore);
procStore.start(PROCEDURE_EXECUTOR_SLOTS);
- procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true);
+ ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true);
}
@After
http://git-wip-us.apache.org/repos/asf/hbase/blob/013ea3e3/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestStateMachineProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestStateMachineProcedure.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestStateMachineProcedure.java
index 95f0e06..07e4330 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestStateMachineProcedure.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestStateMachineProcedure.java
@@ -81,9 +81,9 @@ public class TestStateMachineProcedure {
logDir = new Path(testDir, "proc-logs");
procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
- procExecutor = new ProcedureExecutor(htu.getConfiguration(), new TestProcEnv(), procStore);
+ procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), new TestProcEnv(), procStore);
procStore.start(PROCEDURE_EXECUTOR_SLOTS);
- procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true);
+ ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true);
}
@After
http://git-wip-us.apache.org/repos/asf/hbase/blob/013ea3e3/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java
index 7fa7682..8d9b325 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java
@@ -71,10 +71,10 @@ public class TestYieldProcedures {
logDir = new Path(testDir, "proc-logs");
procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
procRunnables = new TestScheduler();
- procExecutor = new ProcedureExecutor(htu.getConfiguration(), new TestProcEnv(),
- procStore, procRunnables);
+ procExecutor =
+ new ProcedureExecutor<>(htu.getConfiguration(), new TestProcEnv(), procStore, procRunnables);
procStore.start(PROCEDURE_EXECUTOR_SLOTS);
- procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true);
+ ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true);
}
@After
http://git-wip-us.apache.org/repos/asf/hbase/blob/013ea3e3/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
index da36564..3826548 100644
--- a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
@@ -295,15 +295,17 @@ message RecoverMetaStateData {
enum ServerCrashState {
SERVER_CRASH_START = 1;
- SERVER_CRASH_PROCESS_META = 2;
+ SERVER_CRASH_PROCESS_META = 2[deprecated=true];
SERVER_CRASH_GET_REGIONS = 3;
- SERVER_CRASH_NO_SPLIT_LOGS = 4;
+ SERVER_CRASH_NO_SPLIT_LOGS = 4[deprecated=true];
SERVER_CRASH_SPLIT_LOGS = 5;
// Removed SERVER_CRASH_PREPARE_LOG_REPLAY = 6;
// Removed SERVER_CRASH_CALC_REGIONS_TO_ASSIGN = 7;
SERVER_CRASH_ASSIGN = 8;
SERVER_CRASH_WAIT_ON_ASSIGN = 9;
- SERVER_CRASH_HANDLE_RIT2 = 20;
+ SERVER_CRASH_SPLIT_META_LOGS = 10;
+ SERVER_CRASH_ASSIGN_META = 11;
+ SERVER_CRASH_HANDLE_RIT2 = 20[deprecated=true];
SERVER_CRASH_FINISH = 100;
}
@@ -388,3 +390,10 @@ message ReopenTableRegionsStateData {
required TableName table_name = 1;
repeated RegionLocation region = 2;
}
+
+enum InitMetaState {
+ INIT_META_ASSIGN_META = 1;
+}
+
+message InitMetaStateData {
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/013ea3e3/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java
index 23912d6..8515093 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java
@@ -111,17 +111,14 @@ public class CatalogJanitor extends ScheduledChore {
protected void chore() {
try {
AssignmentManager am = this.services.getAssignmentManager();
- if (this.enabled.get()
- && !this.services.isInMaintenanceMode()
- && am != null
- && am.isFailoverCleanupDone()
- && !am.hasRegionsInTransition()) {
+ if (this.enabled.get() && !this.services.isInMaintenanceMode() && am != null &&
+ am.isMetaLoaded() && !am.hasRegionsInTransition()) {
scan();
} else {
LOG.warn("CatalogJanitor is disabled! Enabled=" + this.enabled.get() +
- ", maintenanceMode=" + this.services.isInMaintenanceMode() +
- ", am=" + am + ", failoverCleanupDone=" + (am != null && am.isFailoverCleanupDone()) +
- ", hasRIT=" + (am != null && am.hasRegionsInTransition()));
+ ", maintenanceMode=" + this.services.isInMaintenanceMode() + ", am=" + am +
+ ", metaLoaded=" + (am != null && am.isMetaLoaded()) + ", hasRIT=" +
+ (am != null && am.hasRegionsInTransition()));
}
} catch (IOException e) {
LOG.warn("Failed scan of catalog table", e);
http://git-wip-us.apache.org/repos/asf/hbase/blob/013ea3e3/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index a5b9b88..696760c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -40,6 +40,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@@ -80,6 +81,7 @@ import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.MasterSwitchType;
import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
@@ -120,13 +122,14 @@ import org.apache.hadoop.hbase.master.procedure.DeleteNamespaceProcedure;
import org.apache.hadoop.hbase.master.procedure.DeleteTableProcedure;
import org.apache.hadoop.hbase.master.procedure.DisableTableProcedure;
import org.apache.hadoop.hbase.master.procedure.EnableTableProcedure;
+import org.apache.hadoop.hbase.master.procedure.InitMetaProcedure;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
import org.apache.hadoop.hbase.master.procedure.ModifyTableProcedure;
import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
-import org.apache.hadoop.hbase.master.procedure.RecoverMetaProcedure;
+import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
import org.apache.hadoop.hbase.master.procedure.TruncateTableProcedure;
import org.apache.hadoop.hbase.master.replication.ReplicationManager;
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
@@ -223,7 +226,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
@SuppressWarnings("deprecation")
public class HMaster extends HRegionServer implements MasterServices {
- private static Logger LOG = LoggerFactory.getLogger(HMaster.class.getName());
+ private static Logger LOG = LoggerFactory.getLogger(HMaster.class);
/**
* Protection against zombie master. Started once Master accepts active responsibility and
@@ -336,10 +339,6 @@ public class HMaster extends HRegionServer implements MasterServices {
// initialization may have not completed yet.
volatile boolean serviceStarted = false;
- // flag set after we complete assignMeta.
- private final ProcedureEvent serverCrashProcessingEnabled =
- new ProcedureEvent("server crash processing");
-
// Maximum time we should run balancer for
private final int maxBlancingTime;
// Maximum percent of regions in transition when balancing
@@ -547,18 +546,20 @@ public class HMaster extends HRegionServer implements MasterServices {
public void run() {
try {
if (!conf.getBoolean("hbase.testing.nocluster", false)) {
- try {
- int infoPort = putUpJettyServer();
- startActiveMasterManager(infoPort);
- } catch (Throwable t) {
- // Make sure we log the exception.
- String error = "Failed to become Active Master";
- LOG.error(error, t);
- // Abort should have been called already.
- if (!isAborted()) {
- abort(error, t);
+ Threads.setDaemonThreadRunning(new Thread(() -> {
+ try {
+ int infoPort = putUpJettyServer();
+ startActiveMasterManager(infoPort);
+ } catch (Throwable t) {
+ // Make sure we log the exception.
+ String error = "Failed to become Active Master";
+ LOG.error(error, t);
+ // Abort should have been called already.
+ if (!isAborted()) {
+ abort(error, t);
+ }
}
- }
+ }));
}
// Fall in here even if we have been aborted. Need to run the shutdown services and
// the super run call will do this for us.
@@ -708,7 +709,8 @@ public class HMaster extends HRegionServer implements MasterServices {
}
/**
- * Initialize all ZK based system trackers.
+ * Initialize all ZK based system trackers. But do not include {@link RegionServerTracker}, it
+ * should have already been initialized along with {@link ServerManager}.
* Will be overridden in tests.
*/
@VisibleForTesting
@@ -727,15 +729,8 @@ public class HMaster extends HRegionServer implements MasterServices {
this.splitOrMergeTracker = new SplitOrMergeTracker(zooKeeper, conf, this);
this.splitOrMergeTracker.start();
- // Create Assignment Manager
- this.assignmentManager = new AssignmentManager(this);
- this.assignmentManager.start();
-
this.replicationManager = new ReplicationManager(conf, zooKeeper, this);
- this.regionServerTracker = new RegionServerTracker(zooKeeper, this, this.serverManager);
- this.regionServerTracker.start();
-
this.drainingServerTracker = new DrainingServerTracker(zooKeeper, this, this.serverManager);
this.drainingServerTracker.start();
@@ -764,17 +759,37 @@ public class HMaster extends HRegionServer implements MasterServices {
/**
* Finish initialization of HMaster after becoming the primary master.
*
- * <ol>
- * <li>Initialize master components - file system manager, server manager,
- * assignment manager, region server tracker, etc</li>
- * <li>Start necessary service threads - balancer, catalog janior,
- * executor services, etc</li>
- * <li>Set cluster as UP in ZooKeeper</li>
- * <li>Wait for RegionServers to check-in</li>
- * <li>Split logs and perform data recovery, if necessary</li>
- * <li>Ensure assignment of meta/namespace regions<li>
- * <li>Handle either fresh cluster start or master failover</li>
+ * The startup order is a bit complicated but very important, do not change it unless you know
+ * what you are doing.
+ * <li>Initialize file system based components - file system manager, wal manager, table
+ * descriptors, etc</li>
+ * <li>Publish cluster id</li>
+ * <li>Here comes the most complicated part - initialize server manager, assignment manager and
+ * region server tracker
+ * <ol type='i'>
+ * <li>Create server manager</li>
+ * <li>Create procedure executor, load the procedures, but do not start workers. We will start it
+ * later after we finish scheduling SCPs to avoid scheduling duplicated SCPs for the same
+ * server</li>
+ * <li>Create assignment manager and start it, load the meta region state, but do not load data
+ * from meta region</li>
+ * <li>Start region server tracker, construct the online servers set and find out dead servers and
+ * schedule SCP for them. The online servers will be constructed by scanning zk, and we will also
+ * scan the wal directory to find out possible live region servers, and the differences between
+ * these two sets are the dead servers</li>
* </ol>
+ * </li>
+ * <li>If this is a new deploy, schedule a InitMetaProcedure to initialize meta</li>
+ * <li>Start necessary service threads - balancer, catalog janior, executor services, and also the
+ * procedure executor, etc. Notice that the balancer must be created first as assignment manager
+ * may use it when assigning regions.</li>
+ * <li>Wait for meta to be initialized if necesssary, start table state manager.</li>
+ * <li>Wait for enough region servers to check-in</li>
+ * <li>Let assignment manager load data from meta and construct region states</li>
+ * <li>Start all other things such as chore services, etc</li>
+ * <p/>
+ * Notice that now we will not schedule a special procedure to make meta online(unless the first
+ * time where meta has not been created yet), we will rely on SCP to bring meta online.
*/
private void finishActiveMasterInitialization(MonitoredTask status)
throws IOException, InterruptedException, KeeperException {
@@ -813,10 +828,20 @@ public class HMaster extends HRegionServer implements MasterServices {
ZKClusterId.setClusterId(this.zooKeeper, fileSystemManager.getClusterId());
this.clusterId = clusterId.toString();
- this.serverManager = createServerManager(this);
- // This manager is started AFTER hbase:meta is confirmed on line.
- // See inside metaBootstrap.recoverMeta(); below. Shouldn't be so cryptic!
+
+ status.setStatus("Initialze ServerManager and schedule SCP for crash servers");
+ this.serverManager = createServerManager(this);
+ createProcedureExecutor();
+ // Create Assignment Manager
+ this.assignmentManager = new AssignmentManager(this);
+ this.assignmentManager.start();
+ this.regionServerTracker = new RegionServerTracker(zooKeeper, this, this.serverManager);
+ this.regionServerTracker.start(
+ procedureExecutor.getProcedures().stream().filter(p -> p instanceof ServerCrashProcedure)
+ .map(p -> ((ServerCrashProcedure) p).getServerName()).collect(Collectors.toSet()),
+ walManager.getLiveServersFromWALDir());
+ // This manager will be started AFTER hbase:meta is confirmed on line.
// hbase.mirror.table.state.to.zookeeper is so hbase1 clients can connect. They read table
// state from zookeeper while hbase2 reads it from hbase:meta. Disable if no hbase1 clients.
this.tableStateManager =
@@ -845,10 +870,37 @@ public class HMaster extends HRegionServer implements MasterServices {
status.setStatus("Initializing master coprocessors");
this.cpHost = new MasterCoprocessorHost(this, this.conf);
+ status.setStatus("Initializing meta table if this is a new deploy");
+ InitMetaProcedure initMetaProc = null;
+ if (assignmentManager.getRegionStates().getRegionState(RegionInfoBuilder.FIRST_META_REGIONINFO)
+ .isOffline()) {
+ Optional<Procedure<?>> optProc = procedureExecutor.getProcedures().stream()
+ .filter(p -> p instanceof InitMetaProcedure).findAny();
+ if (optProc.isPresent()) {
+ initMetaProc = (InitMetaProcedure) optProc.get();
+ } else {
+ // schedule an init meta procedure if meta has not been deployed yet
+ initMetaProc = new InitMetaProcedure();
+ procedureExecutor.submitProcedure(initMetaProc);
+ }
+ }
+ if (this.balancer instanceof FavoredNodesPromoter) {
+ favoredNodesManager = new FavoredNodesManager(this);
+ }
+
+ // initialize load balancer
+ this.balancer.setMasterServices(this);
+ this.balancer.setClusterMetrics(getClusterMetricsWithoutCoprocessor());
+ this.balancer.initialize();
+
// start up all service threads.
status.setStatus("Initializing master service threads");
startServiceThreads();
-
+ // wait meta to be initialized after we start procedure executor
+ if (initMetaProc != null) {
+ initMetaProc.await();
+ }
+ tableStateManager.start();
// Wake up this server to check in
sleeper.skipSleepCycle();
@@ -860,28 +912,11 @@ public class HMaster extends HRegionServer implements MasterServices {
LOG.info(Objects.toString(status));
waitForRegionServers(status);
- if (this.balancer instanceof FavoredNodesPromoter) {
- favoredNodesManager = new FavoredNodesManager(this);
- }
-
- //initialize load balancer
- this.balancer.setMasterServices(this);
- this.balancer.setClusterMetrics(getClusterMetricsWithoutCoprocessor());
- this.balancer.initialize();
-
- // Make sure meta assigned before proceeding.
- status.setStatus("Recovering Meta Region");
-
// Check if master is shutting down because issue initializing regionservers or balancer.
if (isStopped()) {
return;
}
- // Bring up hbase:meta. recoverMeta is a blocking call waiting until hbase:meta is deployed.
- // It also starts the TableStateManager.
- MasterMetaBootstrap metaBootstrap = createMetaBootstrap();
- metaBootstrap.recoverMeta();
-
//Initialize after meta as it scans meta
if (favoredNodesManager != null) {
SnapshotOfRegionAssignmentFromMeta snapshotOfRegionAssignment =
@@ -890,9 +925,6 @@ public class HMaster extends HRegionServer implements MasterServices {
favoredNodesManager.initialize(snapshotOfRegionAssignment);
}
- status.setStatus("Submitting log splitting work for previously failed region servers");
- metaBootstrap.processDeadServers();
-
// Fix up assignment manager status
status.setStatus("Starting assignment manager");
this.assignmentManager.joinCluster();
@@ -933,6 +965,7 @@ public class HMaster extends HRegionServer implements MasterServices {
setInitialized(true);
assignmentManager.checkIfShouldMoveSystemRegionAsync();
status.setStatus("Assign meta replicas");
+ MasterMetaBootstrap metaBootstrap = createMetaBootstrap();
metaBootstrap.assignMetaReplicas();
status.setStatus("Starting quota manager");
initQuotaManager();
@@ -1075,7 +1108,6 @@ public class HMaster extends HRegionServer implements MasterServices {
private void initQuotaManager() throws IOException {
MasterQuotaManager quotaManager = new MasterQuotaManager(this);
- this.assignmentManager.setRegionStateListener(quotaManager);
quotaManager.start();
this.quotaManager = quotaManager;
}
@@ -1221,10 +1253,10 @@ public class HMaster extends HRegionServer implements MasterServices {
if (this.mpmHost != null) this.mpmHost.stop("server shutting down.");
}
- private void startProcedureExecutor() throws IOException {
- final MasterProcedureEnv procEnv = new MasterProcedureEnv(this);
- procedureStore = new WALProcedureStore(conf,
- new MasterProcedureEnv.WALStoreLeaseRecovery(this));
+ private void createProcedureExecutor() throws IOException {
+ MasterProcedureEnv procEnv = new MasterProcedureEnv(this);
+ procedureStore =
+ new WALProcedureStore(conf, new MasterProcedureEnv.WALStoreLeaseRecovery(this));
procedureStore.registerListener(new MasterProcedureEnv.MasterProcedureStoreListener(this));
MasterProcedureScheduler procedureScheduler = procEnv.getProcedureScheduler();
procedureExecutor = new ProcedureExecutor<>(conf, procEnv, procedureStore, procedureScheduler);
@@ -1237,10 +1269,17 @@ public class HMaster extends HRegionServer implements MasterServices {
conf.getBoolean(MasterProcedureConstants.EXECUTOR_ABORT_ON_CORRUPTION,
MasterProcedureConstants.DEFAULT_EXECUTOR_ABORT_ON_CORRUPTION);
procedureStore.start(numThreads);
- procedureExecutor.start(numThreads, abortOnCorruption);
+ // Just initialize it but do not start the workers, we will start the workers later by calling
+ // startProcedureExecutor. See the javadoc for finishActiveMasterInitialization for more
+ // details.
+ procedureExecutor.init(numThreads, abortOnCorruption);
procEnv.getRemoteDispatcher().start();
}
+ private void startProcedureExecutor() throws IOException {
+ procedureExecutor.startWorkers();
+ }
+
private void stopProcedureExecutor() {
if (procedureExecutor != null) {
configurationManager.deregisterObserver(procedureExecutor.getEnvironment());
@@ -2836,25 +2875,6 @@ public class HMaster extends HRegionServer implements MasterServices {
}
/**
- * ServerCrashProcessingEnabled is set false before completing assignMeta to prevent processing
- * of crashed servers.
- * @return true if assignMeta has completed;
- */
- @Override
- public boolean isServerCrashProcessingEnabled() {
- return serverCrashProcessingEnabled.isReady();
- }
-
- @VisibleForTesting
- public void setServerCrashProcessingEnabled(final boolean b) {
- procedureExecutor.getEnvironment().setEventReady(serverCrashProcessingEnabled, b);
- }
-
- public ProcedureEvent getServerCrashProcessingEnabledEvent() {
- return serverCrashProcessingEnabled;
- }
-
- /**
* Compute the average load across all region servers.
* Currently, this uses a very naive computation - just uses the number of
* regions being served, ignoring stats about number of requests.
@@ -3624,18 +3644,6 @@ public class HMaster extends HRegionServer implements MasterServices {
return lockManager;
}
- @Override
- public boolean recoverMeta() throws IOException {
- // we need to block here so the latch should be greater than the current version to make sure
- // that we will block.
- ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch(Integer.MAX_VALUE, 0);
- procedureExecutor.submitProcedure(new RecoverMetaProcedure(null, true, latch));
- latch.await();
- LOG.info("hbase:meta deployed at={}",
- getMetaTableLocator().getMetaRegionLocation(getZooKeeper()));
- return assignmentManager.isMetaInitialized();
- }
-
public QuotaObserverChore getQuotaObserverChore() {
return this.quotaObserverChore;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/013ea3e3/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMetaBootstrap.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMetaBootstrap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMetaBootstrap.java
index f140827..4bf451c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMetaBootstrap.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMetaBootstrap.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.master;
import java.io.IOException;
import java.util.List;
-import java.util.Set;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.RegionInfo;
@@ -39,7 +38,7 @@ import org.slf4j.LoggerFactory;
* Used by the HMaster on startup to split meta logs and assign the meta table.
*/
@InterfaceAudience.Private
-public class MasterMetaBootstrap {
+class MasterMetaBootstrap {
private static final Logger LOG = LoggerFactory.getLogger(MasterMetaBootstrap.class);
private final HMaster master;
@@ -48,35 +47,12 @@ public class MasterMetaBootstrap {
this.master = master;
}
- public void recoverMeta() throws InterruptedException, IOException {
- // This is a blocking call that waits until hbase:meta is deployed.
- master.recoverMeta();
- // Now we can start the TableStateManager. It is backed by hbase:meta.
- master.getTableStateManager().start();
- // Enable server crash procedure handling
- enableCrashedServerProcessing();
- }
-
- public void processDeadServers() {
- // get a list for previously failed RS which need log splitting work
- // we recover hbase:meta region servers inside master initialization and
- // handle other failed servers in SSH in order to start up master node ASAP
- Set<ServerName> previouslyFailedServers =
- master.getMasterWalManager().getFailedServersFromLogFolders();
-
- // Master has recovered hbase:meta region server and we put
- // other failed region servers in a queue to be handled later by SSH
- for (ServerName tmpServer : previouslyFailedServers) {
- master.getServerManager().processDeadServer(tmpServer, true);
- }
- }
-
/**
* For assigning hbase:meta replicas only.
* TODO: The way this assign runs, nothing but chance to stop all replicas showing up on same
* server as the hbase:meta region.
*/
- protected void assignMetaReplicas()
+ void assignMetaReplicas()
throws IOException, InterruptedException, KeeperException {
int numReplicas = master.getConfiguration().getInt(HConstants.META_REPLICAS_NUM,
HConstants.DEFAULT_META_REPLICA_NUM);
@@ -85,7 +61,7 @@ public class MasterMetaBootstrap {
return;
}
final AssignmentManager assignmentManager = master.getAssignmentManager();
- if (!assignmentManager.isMetaInitialized()) {
+ if (!assignmentManager.isMetaLoaded()) {
throw new IllegalStateException("hbase:meta must be initialized first before we can " +
"assign out its replicas");
}
@@ -137,15 +113,4 @@ public class MasterMetaBootstrap {
LOG.warn("Ignoring exception " + ex);
}
}
-
- private void enableCrashedServerProcessing() throws InterruptedException {
- // If crashed server processing is disabled, we enable it and expire those dead but not expired
- // servers. This is required so that if meta is assigning to a server which dies after
- // assignMeta starts assignment, ServerCrashProcedure can re-assign it. Otherwise, we will be
- // stuck here waiting forever if waitForMeta is specified.
- if (!master.isServerCrashProcessingEnabled()) {
- master.setServerCrashProcessingEnabled(true);
- master.getServerManager().processQueuedDeadServers();
- }
- }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/013ea3e3/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
index 11f5975..1892600 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
@@ -322,11 +322,6 @@ public interface MasterServices extends Server {
TableDescriptors getTableDescriptors();
/**
- * @return true if master enables ServerShutdownHandler;
- */
- boolean isServerCrashProcessingEnabled();
-
- /**
* Registers a new protocol buffer {@link Service} subclass as a master coprocessor endpoint.
*
* <p>
@@ -490,13 +485,6 @@ public interface MasterServices extends Server {
*/
public void checkIfShouldMoveSystemRegionAsync();
- /**
- * Recover meta table. Will result in no-op is meta is already initialized. Any code that has
- * access to master and requires to access meta during process initialization can call this
- * method to make sure meta is initialized.
- */
- boolean recoverMeta() throws IOException;
-
String getClientIdAuditPrefix();
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/013ea3e3/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java
index 4070ed3..2dc8918 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java
@@ -26,7 +26,8 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -142,10 +143,34 @@ public class MasterWalManager {
return this.fsOk;
}
+ public Set<ServerName> getLiveServersFromWALDir() throws IOException {
+ Path walDirPath = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME);
+ FileStatus[] walDirForLiveServers = FSUtils.listStatus(fs, walDirPath,
+ p -> !p.getName().endsWith(AbstractFSWALProvider.SPLITTING_EXT));
+ if (walDirForLiveServers == null) {
+ return Collections.emptySet();
+ }
+ return Stream.of(walDirForLiveServers).map(s -> {
+ ServerName serverName = AbstractFSWALProvider.getServerNameFromWALDirectoryName(s.getPath());
+ if (serverName == null) {
+ LOG.warn("Log folder {} doesn't look like its name includes a " +
+ "region server name; leaving in place. If you see later errors about missing " +
+ "write ahead logs they may be saved in this location.", s.getPath());
+ return null;
+ }
+ return serverName;
+ }).filter(s -> s != null).collect(Collectors.toSet());
+ }
+
/**
* Inspect the log directory to find dead servers which need recovery work
* @return A set of ServerNames which aren't running but still have WAL files left in file system
+ * @deprecated With proc-v2, we can record the crash server with procedure store, so do not need
+ * to scan the wal directory to find out the splitting wal directory any more. Leave
+ * it here only because {@code RecoverMetaProcedure}(which is also deprecated) uses
+ * it.
*/
+ @Deprecated
public Set<ServerName> getFailedServersFromLogFolders() {
boolean retrySplitting = !conf.getBoolean("hbase.hlog.split.skip.errors",
WALSplitter.SPLIT_SKIP_ERRORS_DEFAULT);
@@ -240,6 +265,7 @@ public class MasterWalManager {
boolean needReleaseLock = false;
if (!this.services.isInitialized()) {
// during master initialization, we could have multiple places splitting a same wal
+ // XXX: Does this still exist after we move to proc-v2?
this.splitLogLock.lock();
needReleaseLock = true;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/013ea3e3/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionServerTracker.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionServerTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionServerTracker.java
index 51371c9..83c8afd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionServerTracker.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionServerTracker.java
@@ -73,8 +73,8 @@ public class RegionServerTracker extends ZKListener {
super(watcher);
this.server = server;
this.serverManager = serverManager;
- executor = Executors.newSingleThreadExecutor(
- new ThreadFactoryBuilder().setDaemon(true).setNameFormat("RegionServerTracker-%d").build());
+ this.executor = Executors.newSingleThreadExecutor(
+ new ThreadFactoryBuilder().setDaemon(true).setNameFormat("RegionServerTracker-%d").build());
}
private Pair<ServerName, RegionServerInfo> getServerInfo(String name)
@@ -108,9 +108,19 @@ public class RegionServerTracker extends ZKListener {
/**
* Starts the tracking of online RegionServers.
*
- * All RSs will be tracked after this method is called.
+ * Starts the tracking of online RegionServers. All RSes will be tracked after this method is
+ * called.
+ * <p/>
+ * In this method, we will also construct the region server sets in {@link ServerManager}. If a
+ * region server is dead between the crash of the previous master instance and the start of the
+ * current master instance, we will schedule a SCP for it. This is done in
+ * {@link ServerManager#findOutDeadServersAndProcess(Set, Set)}, we call it here under the lock
+ * protection to prevent concurrency issues with server expiration operation.
+ * @param deadServersFromPE the region servers which already have SCP associated.
+ * @param liveServersFromWALDir the live region servers from wal directory.
*/
- public void start() throws KeeperException, IOException {
+ public void start(Set<ServerName> deadServersFromPE, Set<ServerName> liveServersFromWALDir)
+ throws KeeperException, IOException {
watcher.registerListener(this);
synchronized (this) {
List<String> servers =
@@ -126,6 +136,7 @@ public class RegionServerTracker extends ZKListener {
: ServerMetricsBuilder.of(serverName);
serverManager.checkAndRecordNewServer(serverName, serverMetrics);
}
+ serverManager.findOutDeadServersAndProcess(deadServersFromPE, liveServersFromWALDir);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/013ea3e3/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
index ebccf3c..db335c1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
@@ -25,13 +25,10 @@ import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -146,41 +143,6 @@ public class ServerManager {
private final RpcControllerFactory rpcControllerFactory;
- /**
- * Set of region servers which are dead but not processed immediately. If one
- * server died before master enables ServerShutdownHandler, the server will be
- * added to this set and will be processed through calling
- * {@link ServerManager#processQueuedDeadServers()} by master.
- * <p>
- * A dead server is a server instance known to be dead, not listed in the /hbase/rs
- * znode any more. It may have not been submitted to ServerShutdownHandler yet
- * because the handler is not enabled.
- * <p>
- * A dead server, which has been submitted to ServerShutdownHandler while the
- * handler is not enabled, is queued up.
- * <p>
- * So this is a set of region servers known to be dead but not submitted to
- * ServerShutdownHandler for processing yet.
- */
- private Set<ServerName> queuedDeadServers = new HashSet<>();
-
- /**
- * Set of region servers which are dead and submitted to ServerShutdownHandler to process but not
- * fully processed immediately.
- * <p>
- * If one server died before assignment manager finished the failover cleanup, the server will be
- * added to this set and will be processed through calling
- * {@link ServerManager#processQueuedDeadServers()} by assignment manager.
- * <p>
- * The Boolean value indicates whether log split is needed inside ServerShutdownHandler
- * <p>
- * ServerShutdownHandler processes a dead server submitted to the handler after the handler is
- * enabled. It may not be able to complete the processing because meta is not yet online or master
- * is currently in startup mode. In this case, the dead server will be parked in this set
- * temporarily.
- */
- private Map<ServerName, Boolean> requeuedDeadServers = new ConcurrentHashMap<>();
-
/** Listeners that are called on server events. */
private List<ServerListener> listeners = new CopyOnWriteArrayList<>();
@@ -342,6 +304,26 @@ public class ServerManager {
}
/**
+ * Find out the region servers crashed between the crash of the previous master instance and the
+ * current master instance and schedule SCP for them.
+ * <p/>
+ * Since the {@code RegionServerTracker} has already helped us to construct the online servers set
+ * by scanning zookeeper, now we can compare the online servers with {@code liveServersFromWALDir}
+ * to find out whether there are servers which are already dead.
+ * <p/>
+ * Must be called inside the initialization method of {@code RegionServerTracker} to avoid
+ * concurrency issue.
+ * @param deadServersFromPE the region servers which already have SCP associated.
+ * @param liveServersFromWALDir the live region servers from wal directory.
+ */
+ void findOutDeadServersAndProcess(Set<ServerName> deadServersFromPE,
+ Set<ServerName> liveServersFromWALDir) {
+ deadServersFromPE.forEach(deadservers::add);
+ liveServersFromWALDir.stream().filter(sn -> !onlineServers.containsKey(sn))
+ .forEach(this::expireServer);
+ }
+
+ /**
* Checks if the clock skew between the server and the master. If the clock skew exceeds the
* configured max, it will throw an exception; if it exceeds the configured warning threshold,
* it will log a warning but start normally.
@@ -350,7 +332,7 @@ public class ServerManager {
* @throws ClockOutOfSyncException if the skew exceeds the configured max value
*/
private void checkClockSkew(final ServerName serverName, final long serverCurrentTime)
- throws ClockOutOfSyncException {
+ throws ClockOutOfSyncException {
long skew = Math.abs(System.currentTimeMillis() - serverCurrentTime);
if (skew > maxSkew) {
String message = "Server " + serverName + " has been " +
@@ -370,9 +352,7 @@ public class ServerManager {
* If this server is on the dead list, reject it with a YouAreDeadException.
* If it was dead but came back with a new start code, remove the old entry
* from the dead list.
- * @param serverName
* @param what START or REPORT
- * @throws org.apache.hadoop.hbase.YouAreDeadException
*/
private void checkIsDead(final ServerName serverName, final String what)
throws YouAreDeadException {
@@ -548,13 +528,12 @@ public class ServerManager {
return ZKUtil.listChildrenNoWatch(zkw, zkw.znodePaths.rsZNode);
}
- /*
- * Expire the passed server. Add it to list of dead servers and queue a
- * shutdown processing.
- * @return True if we queued a ServerCrashProcedure else false if we did not (could happen
- * for many reasons including the fact that its this server that is going down or we already
- * have queued an SCP for this server or SCP processing is currently disabled because we are
- * in startup phase).
+ /**
+ * Expire the passed server. Add it to list of dead servers and queue a shutdown processing.
+ * @return True if we queued a ServerCrashProcedure else false if we did not (could happen for
+ * many reasons including the fact that its this server that is going down or we already
+ * have queued an SCP for this server or SCP processing is currently disabled because we
+ * are in startup phase).
*/
public synchronized boolean expireServer(final ServerName serverName) {
// THIS server is going down... can't handle our own expiration.
@@ -564,18 +543,6 @@ public class ServerManager {
}
return false;
}
- // No SCP handling during startup.
- if (!master.isServerCrashProcessingEnabled()) {
- LOG.info("Master doesn't enable ServerShutdownHandler during initialization, "
- + "delay expiring server " + serverName);
- // Even though we delay expire of this server, we still need to handle Meta's RIT
- // that are against the crashed server; since when we do RecoverMetaProcedure,
- // the SCP is not enabled yet and Meta's RIT may be suspend forever. See HBase-19287
- master.getAssignmentManager().handleMetaRITOnCrashedServer(serverName);
- this.queuedDeadServers.add(serverName);
- // Return true because though on SCP queued, there will be one queued later.
- return true;
- }
if (this.deadservers.isDeadServer(serverName)) {
LOG.warn("Expiration called on {} but crash processing already in progress", serverName);
return false;
@@ -620,52 +587,6 @@ public class ServerManager {
this.rsAdmins.remove(sn);
}
- public synchronized void processDeadServer(final ServerName serverName, boolean shouldSplitWal) {
- // When assignment manager is cleaning up the zookeeper nodes and rebuilding the
- // in-memory region states, region servers could be down. Meta table can and
- // should be re-assigned, log splitting can be done too. However, it is better to
- // wait till the cleanup is done before re-assigning user regions.
- //
- // We should not wait in the server shutdown handler thread since it can clog
- // the handler threads and meta table could not be re-assigned in case
- // the corresponding server is down. So we queue them up here instead.
- if (!master.getAssignmentManager().isFailoverCleanupDone()) {
- requeuedDeadServers.put(serverName, shouldSplitWal);
- return;
- }
-
- this.deadservers.add(serverName);
- master.getAssignmentManager().submitServerCrash(serverName, shouldSplitWal);
- }
-
- /**
- * Process the servers which died during master's initialization. It will be
- * called after HMaster#assignMeta and AssignmentManager#joinCluster.
- * */
- synchronized void processQueuedDeadServers() {
- if (!master.isServerCrashProcessingEnabled()) {
- LOG.info("Master hasn't enabled ServerShutdownHandler");
- }
- Iterator<ServerName> serverIterator = queuedDeadServers.iterator();
- while (serverIterator.hasNext()) {
- ServerName tmpServerName = serverIterator.next();
- expireServer(tmpServerName);
- serverIterator.remove();
- requeuedDeadServers.remove(tmpServerName);
- }
-
- if (!master.getAssignmentManager().isFailoverCleanupDone()) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("AssignmentManager failover cleanup not done.");
- }
- }
-
- for (Map.Entry<ServerName, Boolean> entry : requeuedDeadServers.entrySet()) {
- processDeadServer(entry.getKey(), entry.getValue());
- }
- requeuedDeadServers.clear();
- }
-
/*
* Remove the server from the drain list.
*/
@@ -930,13 +851,6 @@ public class ServerManager {
return new ArrayList<>(this.drainingServers);
}
- /**
- * @return A copy of the internal set of deadNotExpired servers.
- */
- Set<ServerName> getDeadNotExpiredServers() {
- return new HashSet<>(this.queuedDeadServers);
- }
-
public boolean isServerOnline(ServerName serverName) {
return serverName != null && onlineServers.containsKey(serverName);
}
@@ -948,9 +862,7 @@ public class ServerManager {
* master any more, for example, a very old previous instance).
*/
public synchronized boolean isServerDead(ServerName serverName) {
- return serverName == null || deadservers.isDeadServer(serverName)
- || queuedDeadServers.contains(serverName)
- || requeuedDeadServers.containsKey(serverName);
+ return serverName == null || deadservers.isDeadServer(serverName);
}
public void shutdownCluster() {
@@ -990,8 +902,6 @@ public class ServerManager {
final List<ServerName> drainingServersCopy = getDrainingServersList();
destServers.removeAll(drainingServersCopy);
- // Remove the deadNotExpired servers from the server list.
- removeDeadNotExpiredServers(destServers);
return destServers;
}
@@ -1002,23 +912,6 @@ public class ServerManager {
return createDestinationServersList(null);
}
- /**
- * Loop through the deadNotExpired server list and remove them from the
- * servers.
- * This function should be used carefully outside of this class. You should use a high level
- * method such as {@link #createDestinationServersList()} instead of managing you own list.
- */
- void removeDeadNotExpiredServers(List<ServerName> servers) {
- Set<ServerName> deadNotExpiredServersCopy = this.getDeadNotExpiredServers();
- if (!deadNotExpiredServersCopy.isEmpty()) {
- for (ServerName server : deadNotExpiredServersCopy) {
- LOG.debug("Removing dead but not expired server: " + server
- + " from eligible server pool.");
- servers.remove(server);
- }
- }
- }
-
/**
* To clear any dead server with same host name and port of any online server
*/