You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by al...@apache.org on 2018/11/05 12:23:48 UTC
hbase git commit: HBASE-21423 Procedures for meta table/region should
be able to execute in separate workers
Repository: hbase
Updated Branches:
refs/heads/branch-2.0 5834a4f90 -> 6b8cfd276
HBASE-21423 Procedures for meta table/region should be able to execute in separate workers
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/6b8cfd27
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/6b8cfd27
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/6b8cfd27
Branch: refs/heads/branch-2.0
Commit: 6b8cfd276fc1e33c691f93575ed9ff2df06c08e2
Parents: 5834a4f
Author: Allan Yang <al...@apache.org>
Authored: Mon Nov 5 20:23:19 2018 +0800
Committer: Allan Yang <al...@apache.org>
Committed: Mon Nov 5 20:23:19 2018 +0800
----------------------------------------------------------------------
.../procedure2/AbstractProcedureScheduler.java | 29 ++-
.../hbase/procedure2/ProcedureExecutor.java | 64 ++++++-
.../hbase/procedure2/ProcedureScheduler.java | 17 ++
.../procedure2/SimpleProcedureScheduler.java | 2 +-
.../procedure2/ProcedureTestingUtility.java | 10 +-
.../hbase/procedure2/TestChildProcedures.java | 2 +-
.../hbase/procedure2/TestProcedureExecutor.java | 2 +-
.../procedure2/TestProcedureSuspended.java | 3 +-
.../hbase/procedure2/TestYieldProcedures.java | 4 +-
.../org/apache/hadoop/hbase/master/HMaster.java | 5 +-
.../procedure/MasterProcedureConstants.java | 7 +
.../procedure/MasterProcedureScheduler.java | 8 +-
.../TestSplitTableRegionProcedure.java | 2 +
.../master/procedure/TestProcedurePriority.java | 1 +
.../procedure/TestServerCrashProcedure.java | 2 +
.../procedure/TestTableDDLProcedureBase.java | 2 +
.../procedure/TestUrgentProcedureWorker.java | 188 +++++++++++++++++++
17 files changed, 327 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/6b8cfd27/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java
index 7ab1329..b2a2e5a 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java
@@ -139,20 +139,39 @@ public abstract class AbstractProcedureScheduler implements ProcedureScheduler {
* NOTE: this method is called with the sched lock held.
* @return the Procedure to execute, or null if nothing is available.
*/
- protected abstract Procedure dequeue();
+ protected Procedure dequeue() {
+ return dequeue(false);
+ }
+
+ protected abstract Procedure dequeue(boolean onlyUrgent);
+
+
+ @Override
+ public Procedure poll(boolean onlyUrgent) {
+ return poll(onlyUrgent, -1);
+ }
@Override
public Procedure poll() {
- return poll(-1);
+ return poll(false, -1);
+ }
+
+ @Override
+ public Procedure poll(boolean onlyUrgent, long timeout, TimeUnit unit) {
+ return poll(onlyUrgent, unit.toNanos(timeout));
}
@Override
public Procedure poll(long timeout, TimeUnit unit) {
- return poll(unit.toNanos(timeout));
+ return poll(false, unit.toNanos(timeout));
}
- @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP")
public Procedure poll(final long nanos) {
+ return poll(false, nanos);
+ }
+
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP")
+ public Procedure poll(final boolean onlyUrgent, final long nanos) {
schedLock();
try {
if (!running) {
@@ -173,7 +192,7 @@ public abstract class AbstractProcedureScheduler implements ProcedureScheduler {
return null;
}
}
- final Procedure pollResult = dequeue();
+ final Procedure pollResult = dequeue(onlyUrgent);
pollCalls++;
nullPollCalls += (pollResult == null) ? 1 : 0;
http://git-wip-us.apache.org/repos/asf/hbase/blob/6b8cfd27/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
index b1f3de3..3bd5e0f 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
@@ -209,6 +209,11 @@ public class ProcedureExecutor<TEnvironment> {
private CopyOnWriteArrayList<WorkerThread> workerThreads;
/**
+ * Worker thread only for urgent tasks.
+ */
+ private List<WorkerThread> urgentWorkerThreads;
+
+ /**
* Created in the {@link #init(int, boolean)} method. Terminated in {@link #join()} (FIX! Doing
* resource handling rather than observing in a #join is unexpected).
* Overridden when we do the ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery
@@ -218,6 +223,7 @@ public class ProcedureExecutor<TEnvironment> {
private int corePoolSize;
private int maxPoolSize;
+ private int urgentPoolSize;
private volatile long keepAliveTime;
@@ -558,12 +564,30 @@ public class ProcedureExecutor<TEnvironment> {
* is found on replay. otherwise false.
*/
public void init(int numThreads, boolean abortOnCorruption) throws IOException {
+ init(numThreads, 1, abortOnCorruption);
+ }
+
+ /**
+ * Initialize the procedure executor, but do not start workers. We will start them later.
+ * <p/>
+ * It calls ProcedureStore.recoverLease() and ProcedureStore.load() to recover the lease, and
+ * ensure a single executor, and start the procedure replay to resume and recover the previous
+ * pending and in-progress procedures.
+ * @param numThreads number of threads available for procedure execution.
+ * @param urgentNumThreads number of threads available for urgent procedure execution.
+ * @param abortOnCorruption true if you want to abort your service in case a corrupted procedure
+ * is found on replay. otherwise false.
+ */
+ public void init(int numThreads, int urgentNumThreads,
+ boolean abortOnCorruption) throws IOException {
// We have numThreads executor + one timer thread used for timing out
// procedures and triggering periodic procedures.
this.corePoolSize = numThreads;
this.maxPoolSize = 10 * numThreads;
- LOG.info("Starting {} core workers (bigger of cpus/4 or 16) with max (burst) worker count={}",
- corePoolSize, maxPoolSize);
+ this.urgentPoolSize = urgentNumThreads;
+ LOG.info("Starting {} core workers (bigger of cpus/4 or 16) with max (burst) worker "
+ + "count={}, start {} urgent thread(s)",
+ corePoolSize, maxPoolSize, urgentPoolSize);
this.threadGroup = new ThreadGroup("PEWorkerGroup");
this.timeoutExecutor = new TimeoutExecutorThread<>(this, threadGroup);
@@ -571,9 +595,14 @@ public class ProcedureExecutor<TEnvironment> {
// Create the workers
workerId.set(0);
workerThreads = new CopyOnWriteArrayList<>();
+ urgentWorkerThreads = new ArrayList<>();
for (int i = 0; i < corePoolSize; ++i) {
workerThreads.add(new WorkerThread(threadGroup));
}
+ for (int i = 0; i < urgentNumThreads; ++i) {
+ urgentWorkerThreads
+ .add(new WorkerThread(threadGroup, "UrgentPEWorker-", true));
+ }
long st, et;
@@ -608,12 +637,17 @@ public class ProcedureExecutor<TEnvironment> {
return;
}
// Start the executors. Here we must have the lastProcId set.
- LOG.trace("Start workers {}", workerThreads.size());
+ LOG.debug("Start workers {}, urgent workers", workerThreads.size(),
+ urgentWorkerThreads.size());
timeoutExecutor.start();
for (WorkerThread worker: workerThreads) {
worker.start();
}
+ for (WorkerThread worker: urgentWorkerThreads) {
+ worker.start();
+ }
+
// Internal chores
timeoutExecutor.add(new WorkerMonitor());
@@ -663,6 +697,11 @@ public class ProcedureExecutor<TEnvironment> {
worker.awaitTermination();
}
+ // stop the worker threads
+ for (WorkerThread worker: urgentWorkerThreads) {
+ worker.awaitTermination();
+ }
+
// Destroy the Thread Group for the executors
// TODO: Fix. #join is not place to destroy resources.
try {
@@ -700,7 +739,7 @@ public class ProcedureExecutor<TEnvironment> {
* @return the current number of worker threads.
*/
public int getWorkerThreadCount() {
- return workerThreads.size();
+ return workerThreads.size() + urgentWorkerThreads.size();
}
/**
@@ -710,6 +749,10 @@ public class ProcedureExecutor<TEnvironment> {
return corePoolSize;
}
+ public int getUrgentPoolSize() {
+ return urgentPoolSize;
+ }
+
public int getActiveExecutorCount() {
return activeExecutorCount.get();
}
@@ -1949,13 +1992,18 @@ public class ProcedureExecutor<TEnvironment> {
private class WorkerThread extends StoppableThread {
private final AtomicLong executionStartTime = new AtomicLong(Long.MAX_VALUE);
private volatile Procedure<TEnvironment> activeProcedure;
-
+ private boolean onlyPollUrgent = false;
public WorkerThread(ThreadGroup group) {
this(group, "PEWorker-");
}
protected WorkerThread(ThreadGroup group, String prefix) {
+ this(group, prefix, false);
+ }
+
+ protected WorkerThread(ThreadGroup group, String prefix, boolean onlyPollUrgent) {
super(group, prefix + workerId.incrementAndGet());
+ this.onlyPollUrgent = onlyPollUrgent;
setDaemon(true);
}
@@ -2000,7 +2048,11 @@ public class ProcedureExecutor<TEnvironment> {
} finally {
LOG.trace("Worker terminated.");
}
- workerThreads.remove(this);
+ if (onlyPollUrgent) {
+ urgentWorkerThreads.remove(this);
+ } else {
+ workerThreads.remove(this);
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/6b8cfd27/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java
index 9489f52..2d16849 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java
@@ -104,6 +104,13 @@ public interface ProcedureScheduler {
/**
* Fetch one Procedure from the queue
+ * @param onlyUrgent Only poll the urgent procedure to execute
+ * @return a Procedure
+ */
+ Procedure poll(boolean onlyUrgent);
+
+ /**
+ * Fetch one Procedure from the queue
* @param timeout how long to wait before giving up, in units of unit
* @param unit a TimeUnit determining how to interpret the timeout parameter
* @return the Procedure to execute, or null if nothing present.
@@ -111,6 +118,16 @@ public interface ProcedureScheduler {
Procedure poll(long timeout, TimeUnit unit);
/**
+ * Fetch one Procedure from the queue
+ * @param onlyUrgent Only poll the urgent procedure to execute
+ * @param timeout how long to wait before giving up, in units of unit
+ * @param unit a TimeUnit determining how to interpret the timeout parameter
+ * @return a Procedure
+ */
+ Procedure poll(boolean onlyUrgent, long timeout, TimeUnit unit);
+
+
+ /**
* List lock queues.
* @return the locks
*/
http://git-wip-us.apache.org/repos/asf/hbase/blob/6b8cfd27/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SimpleProcedureScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SimpleProcedureScheduler.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SimpleProcedureScheduler.java
index feab8be..3df0e20 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SimpleProcedureScheduler.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SimpleProcedureScheduler.java
@@ -43,7 +43,7 @@ public class SimpleProcedureScheduler extends AbstractProcedureScheduler {
}
@Override
- protected Procedure dequeue() {
+ protected Procedure dequeue(boolean onlyUrgent) {
return runnables.poll();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/6b8cfd27/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
index e82fc7d..1709f63 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
@@ -87,7 +87,12 @@ public class ProcedureTestingUtility {
public static void initAndStartWorkers(ProcedureExecutor<?> procExecutor, int numThreads,
boolean abortOnCorruption, boolean startWorkers) throws IOException {
- procExecutor.init(numThreads, abortOnCorruption);
+ initAndStartWorkers(procExecutor, numThreads, 1, abortOnCorruption, startWorkers);
+ }
+
+ public static void initAndStartWorkers(ProcedureExecutor<?> procExecutor, int numThreads,
+ int numUrgentThreads, boolean abortOnCorruption, boolean startWorkers) throws IOException {
+ procExecutor.init(numThreads, numUrgentThreads, abortOnCorruption);
if (startWorkers) {
procExecutor.startWorkers();
}
@@ -109,6 +114,7 @@ public class ProcedureTestingUtility {
final ProcedureStore procStore = procExecutor.getStore();
final int storeThreads = procExecutor.getCorePoolSize();
final int execThreads = procExecutor.getCorePoolSize();
+ final int urgentThreads = procExecutor.getUrgentPoolSize();
final ProcedureExecutor.Testing testing = procExecutor.testing;
if (avoidTestKillDuringRestart) {
@@ -130,7 +136,7 @@ public class ProcedureTestingUtility {
// re-start
LOG.info("RESTART - Start");
procStore.start(storeThreads);
- initAndStartWorkers(procExecutor, execThreads, failOnCorrupted, startWorkers);
+ initAndStartWorkers(procExecutor, execThreads, urgentThreads, failOnCorrupted, startWorkers);
if (startAction != null) {
startAction.call();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/6b8cfd27/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestChildProcedures.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestChildProcedures.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestChildProcedures.java
index 4f3c443..b837e82 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestChildProcedures.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestChildProcedures.java
@@ -69,7 +69,7 @@ public class TestChildProcedures {
procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), procEnv, procStore);
procExecutor.testing = new ProcedureExecutor.Testing();
procStore.start(PROCEDURE_EXECUTOR_SLOTS);
- ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true);
+ ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, 0, false, true);
}
@After
http://git-wip-us.apache.org/repos/asf/hbase/blob/6b8cfd27/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecutor.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecutor.java
index 7f130ca..2fe19f3 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecutor.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecutor.java
@@ -71,7 +71,7 @@ public class TestProcedureExecutor {
private void createNewExecutor(final Configuration conf, final int numThreads) throws Exception {
procExecutor = new ProcedureExecutor<>(conf, procEnv, procStore);
- ProcedureTestingUtility.initAndStartWorkers(procExecutor, numThreads, true);
+ ProcedureTestingUtility.initAndStartWorkers(procExecutor, numThreads, 0, false, true);
}
@Test
http://git-wip-us.apache.org/repos/asf/hbase/blob/6b8cfd27/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java
index c1c9187..ec9d27f 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java
@@ -62,7 +62,8 @@ public class TestProcedureSuspended {
procStore = new NoopProcedureStore();
procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), new TestProcEnv(), procStore);
procStore.start(PROCEDURE_EXECUTOR_SLOTS);
- ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true);
+ ProcedureTestingUtility
+ .initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, 0, false, true);
}
@After
http://git-wip-us.apache.org/repos/asf/hbase/blob/6b8cfd27/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java
index b5137b0..8a2e296 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java
@@ -74,7 +74,7 @@ public class TestYieldProcedures {
procExecutor =
new ProcedureExecutor<>(htu.getConfiguration(), new TestProcEnv(), procStore, procRunnables);
procStore.start(PROCEDURE_EXECUTOR_SLOTS);
- ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true);
+ ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, 0, false, true);
}
@After
@@ -379,6 +379,7 @@ public class TestYieldProcedures {
@Override
public Procedure poll() {
+ LOG.error("polled()");
pollCalls++;
return super.poll();
}
@@ -386,6 +387,7 @@ public class TestYieldProcedures {
@Override
public Procedure poll(long timeout, TimeUnit unit) {
pollCalls++;
+ LOG.error("polled(long timeout, TimeUnit unit)");
return super.poll(timeout, unit);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/6b8cfd27/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 5bab8cc..288b33f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -1396,6 +1396,9 @@ public class HMaster extends HRegionServer implements MasterServices {
int cpus = Runtime.getRuntime().availableProcessors();
final int numThreads = conf.getInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, Math.max(
(cpus > 0 ? cpus / 4 : 0), MasterProcedureConstants.DEFAULT_MIN_MASTER_PROCEDURE_THREADS));
+ final int urgentWorkers = conf
+ .getInt(MasterProcedureConstants.MASTER_URGENT_PROCEDURE_THREADS,
+ MasterProcedureConstants.DEFAULT_MASTER_URGENT_PROCEDURE_THREADS);
final boolean abortOnCorruption =
conf.getBoolean(MasterProcedureConstants.EXECUTOR_ABORT_ON_CORRUPTION,
MasterProcedureConstants.DEFAULT_EXECUTOR_ABORT_ON_CORRUPTION);
@@ -1403,7 +1406,7 @@ public class HMaster extends HRegionServer implements MasterServices {
// Just initialize it but do not start the workers, we will start the workers later by calling
// startProcedureExecutor. See the javadoc for finishActiveMasterInitialization for more
// details.
- procedureExecutor.init(numThreads, abortOnCorruption);
+ procedureExecutor.init(numThreads, urgentWorkers, abortOnCorruption);
procEnv.getRemoteDispatcher().start();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/6b8cfd27/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureConstants.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureConstants.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureConstants.java
index 495fab6..728ad43 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureConstants.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureConstants.java
@@ -28,6 +28,13 @@ public final class MasterProcedureConstants {
public static final String MASTER_PROCEDURE_THREADS = "hbase.master.procedure.threads";
public static final int DEFAULT_MIN_MASTER_PROCEDURE_THREADS = 16;
+ /** Number of threads used by the procedure executor for urgent proceudres
+ * For now, only meta table procedure is urgent
+ */
+ public static final String MASTER_URGENT_PROCEDURE_THREADS =
+ "hbase.master.urgent.procedure.threads";
+ public static final int DEFAULT_MASTER_URGENT_PROCEDURE_THREADS = 1;
+
/**
* Procedure replay sanity check. In case a WAL is missing or unreadable we
* may lose information about pending/running procedures.
http://git-wip-us.apache.org/repos/asf/hbase/blob/6b8cfd27/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
index 7bab7b3..b060763 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
@@ -124,7 +124,8 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
@Override
protected void enqueue(final Procedure proc, final boolean addFront) {
- if (isMetaProcedure(proc)) {
+ if (isMetaProcedure(proc) ||
+ (isTableProcedure(proc) && getTableName(proc).equals(TableName.META_TABLE_NAME))) {
doAdd(metaRunQueue, getMetaQueue(), proc, addFront);
} else if (isTableProcedure(proc)) {
doAdd(tableRunQueue, getTableQueue(getTableName(proc)), proc, addFront);
@@ -162,9 +163,12 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
}
@Override
- protected Procedure dequeue() {
+ protected Procedure dequeue(boolean onlyUrgent) {
// meta procedure is always the first priority
Procedure<?> pollResult = doPoll(metaRunQueue);
+ if (onlyUrgent) {
+ return pollResult;
+ }
// For now, let server handling have precedence over table handling; presumption is that it
// is more important handling crashed servers than it is running the
// enabling/disabling tables, etc.
http://git-wip-us.apache.org/repos/asf/hbase/blob/6b8cfd27/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestSplitTableRegionProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestSplitTableRegionProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestSplitTableRegionProcedure.java
index 1922848..7e37e44 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestSplitTableRegionProcedure.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestSplitTableRegionProcedure.java
@@ -97,6 +97,8 @@ public class TestSplitTableRegionProcedure {
private static void setupConf(Configuration conf) {
conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 1);
+ //testRecoveryAndDoubleExecution requires only one worker
+ conf.setInt(MasterProcedureConstants.MASTER_URGENT_PROCEDURE_THREADS, 0);
conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, 0);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/6b8cfd27/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestProcedurePriority.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestProcedurePriority.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestProcedurePriority.java
index 32fb173..9008dcc 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestProcedurePriority.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestProcedurePriority.java
@@ -106,6 +106,7 @@ public class TestProcedurePriority {
public static void setUp() throws Exception {
UTIL.getConfiguration().setLong(ProcedureExecutor.WORKER_KEEP_ALIVE_TIME_CONF_KEY, 5000);
UTIL.getConfiguration().setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 4);
+ UTIL.getConfiguration().setInt(MasterProcedureConstants.MASTER_URGENT_PROCEDURE_THREADS, 0);
UTIL.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, MyCP.class.getName());
UTIL.startMiniCluster(3);
CORE_POOL_SIZE =
http://git-wip-us.apache.org/repos/asf/hbase/blob/6b8cfd27/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerCrashProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerCrashProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerCrashProcedure.java
index 9f7fafe..cef60f3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerCrashProcedure.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerCrashProcedure.java
@@ -60,6 +60,8 @@ public class TestServerCrashProcedure {
private void setupConf(Configuration conf) {
conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 1);
+ //testxxxDoubleExecution requires only one worker
+ conf.setInt(MasterProcedureConstants.MASTER_URGENT_PROCEDURE_THREADS, 0);
conf.set("hbase.balancer.tablesOnMaster", "none");
conf.setInt("hbase.client.retries.number", 3);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/6b8cfd27/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestTableDDLProcedureBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestTableDDLProcedureBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestTableDDLProcedureBase.java
index f7cf640..9680627 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestTableDDLProcedureBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestTableDDLProcedureBase.java
@@ -39,6 +39,8 @@ public abstract class TestTableDDLProcedureBase {
private static void setupConf(Configuration conf) {
conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 1);
+ //testxxxDoubleExecution requires only one worker
+ conf.setInt(MasterProcedureConstants.MASTER_URGENT_PROCEDURE_THREADS, 0);
}
@BeforeClass
http://git-wip-us.apache.org/repos/asf/hbase/blob/6b8cfd27/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestUrgentProcedureWorker.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestUrgentProcedureWorker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestUrgentProcedureWorker.java
new file mode 100644
index 0000000..c7801e4
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestUrgentProcedureWorker.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUTKey WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
+import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
+import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
+import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
+import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
+import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@Category({MasterTests.class, SmallTests.class})
+public class TestUrgentProcedureWorker {
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestUrgentProcedureWorker.class);
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(TestUrgentProcedureWorker.class);
+ private static final int PROCEDURE_EXECUTOR_SLOTS = 1;
+ private static final CountDownLatch metaFinished = new CountDownLatch(1);
+
+ private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+ private static final TableName TABLE_NAME = TableName.valueOf("TestUrgentProcedureWorker");
+
+ private static WALProcedureStore procStore;
+
+ private static ProcedureExecutor<TestEnv> procExec;
+
+ private static final class TestEnv {
+ private final MasterProcedureScheduler scheduler;
+
+ public TestEnv(MasterProcedureScheduler scheduler) {
+ this.scheduler = scheduler;
+ }
+
+ public MasterProcedureScheduler getScheduler() {
+ return scheduler;
+ }
+ }
+
+ public static class WaitingMetaProcedure extends ProcedureTestingUtility.NoopProcedure<TestEnv>
+ implements TableProcedureInterface {
+
+
+ @Override
+ protected Procedure<TestEnv>[] execute(TestEnv env)
+ throws ProcedureYieldException, ProcedureSuspendedException,
+ InterruptedException {
+ metaFinished.await();
+ return null;
+ }
+
+ @Override
+ protected Procedure.LockState acquireLock(TestEnv env) {
+ if (env.getScheduler().waitTableExclusiveLock(this, getTableName())) {
+ return LockState.LOCK_EVENT_WAIT;
+ }
+ return LockState.LOCK_ACQUIRED;
+ }
+
+ @Override
+ protected void releaseLock(TestEnv env) {
+ env.getScheduler().wakeTableExclusiveLock(this, getTableName());
+ }
+
+ @Override
+ protected boolean holdLock(TestEnv env) {
+ return true;
+ }
+
+ @Override
+ public TableName getTableName() {
+ return TABLE_NAME;
+ }
+
+ @Override
+ public TableOperationType getTableOperationType() {
+ return TableOperationType.EDIT;
+ }
+ }
+
+ public static class MetaProcedure extends ProcedureTestingUtility.NoopProcedure<TestEnv>
+ implements TableProcedureInterface {
+
+
+ @Override
+ protected Procedure<TestEnv>[] execute(TestEnv env)
+ throws ProcedureYieldException, ProcedureSuspendedException,
+ InterruptedException {
+ metaFinished.countDown();
+ return null;
+ }
+
+ @Override
+ protected Procedure.LockState acquireLock(TestEnv env) {
+ if (env.getScheduler().waitTableExclusiveLock(this, getTableName())) {
+ return LockState.LOCK_EVENT_WAIT;
+ }
+ return LockState.LOCK_ACQUIRED;
+ }
+
+ @Override
+ protected void releaseLock(TestEnv env) {
+ env.getScheduler().wakeTableExclusiveLock(this, getTableName());
+ }
+
+ @Override
+ protected boolean holdLock(TestEnv env) {
+ return true;
+ }
+
+ @Override
+ public TableName getTableName() {
+ return TableName.META_TABLE_NAME;
+ }
+
+ @Override
+ public TableOperationType getTableOperationType() {
+ return TableOperationType.EDIT;
+ }
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws IOException {
+ UTIL.cleanupTestDir();
+ }
+
+ @BeforeClass
+ public static void setUp() throws IOException {
+ UTIL.getConfiguration().setInt("hbase.procedure.worker.stuck.threshold.msec", 6000000);
+ procStore = ProcedureTestingUtility.createWalStore(UTIL.getConfiguration(),
+ UTIL.getDataTestDir("TestUrgentProcedureWorker"));
+ procStore.start(1);
+ MasterProcedureScheduler scheduler = new MasterProcedureScheduler(pid -> null);
+ procExec = new ProcedureExecutor<>(UTIL.getConfiguration(), new TestEnv(scheduler), procStore,
+ scheduler);
+ procExec.init(1, false);
+ procExec.startWorkers();
+ }
+
+ @Test
+ public void test() throws Exception {
+ WaitingMetaProcedure waitingMetaProcedure = new WaitingMetaProcedure();
+ long waitProc = procExec.submitProcedure(waitingMetaProcedure);
+ MetaProcedure metaProcedure = new MetaProcedure();
+ long metaProc = procExec.submitProcedure(metaProcedure);
+ UTIL.waitFor(5000, () -> procExec.isFinished(waitProc));
+
+ }
+
+
+
+}