You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/10/29 12:44:04 UTC
hbase git commit: HBASE-21375 Revisit the lock and queue
implementation in MasterProcedureScheduler
Repository: hbase
Updated Branches:
refs/heads/master c0b994b0c -> d5e4faacc
HBASE-21375 Revisit the lock and queue implementation in MasterProcedureScheduler
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/d5e4faac
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/d5e4faac
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/d5e4faac
Branch: refs/heads/master
Commit: d5e4faacc354c1bc4d93efa71ca97ee3a056123e
Parents: c0b994b
Author: zhangduo <zh...@apache.org>
Authored: Mon Oct 29 19:56:49 2018 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Mon Oct 29 19:56:49 2018 +0800
----------------------------------------------------------------------
.../hadoop/hbase/procedure2/LockAndQueue.java | 88 +++---
.../hadoop/hbase/procedure2/LockStatus.java | 45 ++-
.../hadoop/hbase/procedure2/Procedure.java | 13 +-
.../hbase/procedure2/ProcedureExecutor.java | 28 +-
.../procedure2/ProcedureTestingUtility.java | 2 +-
.../hbase/procedure2/TestLockAndQueue.java | 68 ++++
.../master/procedure/MasterProcedureEnv.java | 3 +-
.../procedure/MasterProcedureScheduler.java | 85 ++---
.../hadoop/hbase/master/procedure/Queue.java | 15 +-
.../hbase/master/procedure/SchemaLocking.java | 11 +-
.../hbase/master/procedure/TableQueue.java | 16 +-
...ProcedureSchedulerPerformanceEvaluation.java | 2 +-
.../procedure/TestMasterProcedureScheduler.java | 308 ++++++++++---------
...TestMasterProcedureSchedulerConcurrency.java | 2 +-
.../procedure/TestSchedulerQueueDeadLock.java | 276 +++++++++++++++++
15 files changed, 659 insertions(+), 303 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/d5e4faac/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockAndQueue.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockAndQueue.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockAndQueue.java
index 4365a2c..bfeb739 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockAndQueue.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockAndQueue.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.procedure2;
+import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Stream;
import org.apache.yetus.audience.InterfaceAudience;
@@ -26,39 +27,36 @@ import org.apache.yetus.audience.InterfaceAudience;
* Locking for mutual exclusion between procedures. Used only by procedure framework internally.
* {@link LockAndQueue} has two purposes:
* <ol>
- * <li>Acquire/release exclusive/shared locks.</li>
- * <li>Maintains a list of procedures waiting on this lock.
- * {@link LockAndQueue} extends {@link ProcedureDeque} class. Blocked Procedures are added
- * to our super Deque. Using inheritance over composition to keep the Deque of waiting
- * Procedures is unusual, but we do it this way because in certain cases, there will be
- * millions of regions. This layout uses less memory.
+ * <li>Acquire/release exclusive/shared locks.</li>
+ * <li>Maintains a list of procedures waiting on this lock. {@link LockAndQueue} extends
+ * {@link ProcedureDeque} class. Blocked Procedures are added to our super Deque. Using inheritance
+ * over composition to keep the Deque of waiting Procedures is unusual, but we do it this way
+ * because in certain cases, there will be millions of regions. This layout uses less memory.
* </ol>
- *
- * <p>NOT thread-safe. Needs external concurrency control: e.g. uses in MasterProcedureScheduler are
- * guarded by schedLock().
- * <br>
+ * <p/>
+ * NOT thread-safe. Needs external concurrency control: e.g. uses in MasterProcedureScheduler are
+ * guarded by schedLock(). <br/>
* There is no need of 'volatile' keyword for member variables because of memory synchronization
* guarantees of locks (see 'Memory Synchronization',
- * http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/locks/Lock.html)
- * <br>
- * We do not implement Lock interface because we need exclusive and shared locking, and also
- * because try-lock functions require procedure id.
- * <br>
+ * http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/locks/Lock.html) <br/>
+ * We do not implement Lock interface because we need exclusive and shared locking, and also because
+ * try-lock functions require procedure id. <br/>
* We do not use ReentrantReadWriteLock directly because of its high memory overhead.
*/
@InterfaceAudience.Private
public class LockAndQueue implements LockStatus {
+
+ private final Function<Long, Procedure<?>> procedureRetriever;
private final ProcedureDeque queue = new ProcedureDeque();
private Procedure<?> exclusiveLockOwnerProcedure = null;
private int sharedLock = 0;
// ======================================================================
- // Lock Status
+ // Lock Status
// ======================================================================
- @Override
- public boolean isLocked() {
- return hasExclusiveLock() || sharedLock > 0;
+ public LockAndQueue(Function<Long, Procedure<?>> procedureRetriever) {
+ this.procedureRetriever = procedureRetriever;
}
@Override
@@ -67,21 +65,31 @@ public class LockAndQueue implements LockStatus {
}
@Override
- public boolean isLockOwner(long procId) {
- return getExclusiveLockProcIdOwner() == procId;
- }
-
- @Override
- public boolean hasParentLock(Procedure<?> proc) {
- // TODO: need to check all the ancestors. need to passed in the procedures
- // to find the ancestors.
- return proc.hasParent() &&
- (isLockOwner(proc.getParentProcId()) || isLockOwner(proc.getRootProcId()));
- }
-
- @Override
public boolean hasLockAccess(Procedure<?> proc) {
- return isLockOwner(proc.getProcId()) || hasParentLock(proc);
+ if (exclusiveLockOwnerProcedure == null) {
+ return false;
+ }
+ long lockOwnerId = exclusiveLockOwnerProcedure.getProcId();
+ if (proc.getProcId() == lockOwnerId) {
+ return true;
+ }
+ if (!proc.hasParent()) {
+ return false;
+ }
+ // fast path to check root procedure
+ if (proc.getRootProcId() == lockOwnerId) {
+ return true;
+ }
+ // check ancestors
+ for (Procedure<?> p = proc;;) {
+ if (p.getParentProcId() == lockOwnerId) {
+ return true;
+ }
+ p = procedureRetriever.apply(p.getParentProcId());
+ if (p == null || !p.hasParent()) {
+ return false;
+ }
+ }
}
@Override
@@ -90,21 +98,12 @@ public class LockAndQueue implements LockStatus {
}
@Override
- public long getExclusiveLockProcIdOwner() {
- if (exclusiveLockOwnerProcedure == null) {
- return Long.MIN_VALUE;
- } else {
- return exclusiveLockOwnerProcedure.getProcId();
- }
- }
-
- @Override
public int getSharedLockCount() {
return sharedLock;
}
// ======================================================================
- // try/release Shared/Exclusive lock
+ // try/release Shared/Exclusive lock
// ======================================================================
/**
@@ -143,7 +142,8 @@ public class LockAndQueue implements LockStatus {
* @return whether we should wake the procedures waiting on the lock here.
*/
public boolean releaseExclusiveLock(Procedure<?> proc) {
- if (!isLockOwner(proc.getProcId())) {
+ if (exclusiveLockOwnerProcedure == null ||
+ exclusiveLockOwnerProcedure.getProcId() != proc.getProcId()) {
// We are not the lock owner, it is probably inherited from the parent procedures.
return false;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/d5e4faac/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockStatus.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockStatus.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockStatus.java
index 031b8bb..33d2a38 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockStatus.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockStatus.java
@@ -21,25 +21,52 @@ package org.apache.hadoop.hbase.procedure2;
import org.apache.yetus.audience.InterfaceAudience;
/**
- * Interface to get status of a Lock without getting access to acquire/release lock.
- * Currently used in MasterProcedureScheduler where we want to give Queues access to lock's
- * status for scheduling purposes, but not the ability to acquire/release it.
+ * Interface to get status of a Lock without getting access to acquire/release lock. Currently used
+ * in MasterProcedureScheduler where we want to give Queues access to lock's status for scheduling
+ * purposes, but not the ability to acquire/release it.
*/
@InterfaceAudience.Private
public interface LockStatus {
- boolean isLocked();
- boolean hasExclusiveLock();
-
- boolean isLockOwner(long procId);
+ /**
+ * Return whether this lock has already been held,
+ * <p/>
+ * Notice that, holding the exclusive lock or shared lock are both considered as locked, i.e, this
+ * method usually equals to {@code hasExclusiveLock() || getSharedLockCount() > 0}.
+ */
+ default boolean isLocked() {
+ return hasExclusiveLock() || getSharedLockCount() > 0;
+ }
- boolean hasParentLock(Procedure<?> proc);
+ /**
+ * Whether the exclusive lock has been held.
+ */
+ boolean hasExclusiveLock();
+ /**
+ * Return true if the procedure itself holds the exclusive lock, or any ancestors of the give
+ * procedure hold the exclusive lock.
+ */
boolean hasLockAccess(Procedure<?> proc);
+ /**
+ * Get the procedure which holds the exclusive lock.
+ */
Procedure<?> getExclusiveLockOwnerProcedure();
- long getExclusiveLockProcIdOwner();
+ /**
+ * Return the id of the procedure which holds the exclusive lock, if exists. Or a negative value
+ * which means no one holds the exclusive lock.
+ * <p/>
+ * Notice that, in HBase, we assume that the procedure id is positive, or at least non-negative.
+ */
+ default long getExclusiveLockProcIdOwner() {
+ Procedure<?> proc = getExclusiveLockOwnerProcedure();
+ return proc != null ? proc.getProcId() : -1L;
+ }
+ /**
+ * Get the number of procedures which hold the shared lock.
+ */
int getSharedLockCount();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/d5e4faac/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
index a85ccb1..9934dc3 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
@@ -329,7 +329,6 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
* @see #holdLock(Object)
* @return true if the procedure has the lock, false otherwise.
*/
- @VisibleForTesting
public final boolean hasLock() {
return locked;
}
@@ -711,12 +710,20 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
/**
* Will only be called when loading procedures from procedure store, where we need to record
* whether the procedure has already held a lock. Later we will call
- * {@link #doAcquireLock(Object, ProcedureStore)} to actually acquire the lock.
+ * {@link #restoreLock(Object, ProcedureStore)} to actually acquire the lock.
*/
final void lockedWhenLoading() {
this.lockedWhenLoading = true;
}
+ /**
+ * Can only be called when restarting, before the procedure actually being executed, as after we
+ * actually call the {@link #doAcquireLock(Object, ProcedureStore)} method, we will reset
+ * {@link #lockedWhenLoading} to false.
+ * <p/>
+ * Now it is only used in the ProcedureScheduler to determine whether we should put a Procedure in
+ * front of a queue.
+ */
public boolean isLockedWhenLoading() {
return lockedWhenLoading;
}
@@ -990,7 +997,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
// this can happen if the parent stores the sub procedures but before it can
// release its lock, the master restarts
if (getState() == ProcedureState.WAITING && !holdLock(env)) {
- LOG.debug("{} is in WAITING STATE, and holdLock= false, skip acquiring lock.", this);
+ LOG.debug("{} is in WAITING STATE, and holdLock=false, skip acquiring lock.", this);
lockedWhenLoading = false;
return;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/d5e4faac/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
index 01d2b2b..9515a7a 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
@@ -637,14 +637,15 @@ public class ProcedureExecutor<TEnvironment> {
proc.afterReplay(getEnvironment());
}
});
+ // 4. restore locks
+ restoreLocks();
- // 4. Push the procedures to the timeout executor
+ // 5. Push the procedures to the timeout executor
waitingTimeoutList.forEach(proc -> {
proc.afterReplay(getEnvironment());
timeoutExecutor.add(proc);
});
- // 5. restore locks
- restoreLocks();
+
// 6. Push the procedure to the scheduler
failedList.forEach(scheduler::addBack);
runnableList.forEach(p -> {
@@ -652,26 +653,7 @@ public class ProcedureExecutor<TEnvironment> {
if (!p.hasParent()) {
sendProcedureLoadedNotification(p.getProcId());
}
- // If the procedure holds the lock, put the procedure in front
- // If its parent holds the lock, put the procedure in front
- // TODO. Is that possible that its ancestor holds the lock?
- // For now, the deepest procedure hierarchy is:
- // ModifyTableProcedure -> ReopenTableProcedure ->
- // MoveTableProcedure -> Unassign/AssignProcedure
- // But ModifyTableProcedure and ReopenTableProcedure won't hold the lock
- // So, check parent lock is enough(a tricky case is resovled by HBASE-21384).
- // If some one change or add new procedures making 'grandpa' procedure
- // holds the lock, but parent procedure don't hold the lock, there will
- // be a problem here. We have to check one procedure's ancestors.
- // And we need to change LockAndQueue.hasParentLock(Procedure<?> proc) method
- // to check all ancestors too.
- if (p.isLockedWhenLoading() || (p.hasParent() && procedures
- .get(p.getParentProcId()).isLockedWhenLoading())) {
- scheduler.addFront(p, false);
- } else {
- // if it was not, it can wait.
- scheduler.addBack(p, false);
- }
+ scheduler.addBack(p);
});
// After all procedures put into the queue, signal the worker threads.
// Otherwise, there is a race condition. See HBASE-21364.
http://git-wip-us.apache.org/repos/asf/hbase/blob/d5e4faac/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
index 95e0320..5a7a664 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
@@ -403,7 +403,7 @@ public class ProcedureTestingUtility {
public NoopProcedure() {}
@Override
- protected Procedure[] execute(TEnv env)
+ protected Procedure<TEnv>[] execute(TEnv env)
throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
return null;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/d5e4faac/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestLockAndQueue.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestLockAndQueue.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestLockAndQueue.java
new file mode 100644
index 0000000..9f24403
--- /dev/null
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestLockAndQueue.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure2;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ MasterTests.class, SmallTests.class })
+public class TestLockAndQueue {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestLockAndQueue.class);
+
+ @Test
+ public void testHasLockAccess() {
+ Map<Long, NoopProcedure<Void>> procMap = new HashMap<>();
+ for (long i = 1; i <= 10; i++) {
+ NoopProcedure<Void> proc = new NoopProcedure<>();
+ proc.setProcId(i);
+ if (i > 1) {
+ proc.setParentProcId(i - 1);
+ proc.setRootProcId(1);
+ }
+ procMap.put(i, proc);
+ }
+ LockAndQueue laq = new LockAndQueue(procMap::get);
+ for (long i = 1; i <= 10; i++) {
+ assertFalse(laq.hasLockAccess(procMap.get(i)));
+ }
+ for (long i = 1; i <= 10; i++) {
+ NoopProcedure<Void> procHasLock = procMap.get(i);
+ laq.tryExclusiveLock(procHasLock);
+ for (long j = 1; j < i; j++) {
+ assertFalse(laq.hasLockAccess(procMap.get(j)));
+ }
+ for (long j = i; j <= 10; j++) {
+ assertTrue(laq.hasLockAccess(procMap.get(j)));
+ }
+ laq.releaseExclusiveLock(procHasLock);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/d5e4faac/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java
index cd40234..4fcf7e0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java
@@ -83,7 +83,8 @@ public class MasterProcedureEnv implements ConfigurationObserver {
public MasterProcedureEnv(final MasterServices master,
final RSProcedureDispatcher remoteDispatcher) {
this.master = master;
- this.procSched = new MasterProcedureScheduler();
+ this.procSched = new MasterProcedureScheduler(
+ procId -> master.getMasterProcedureExecutor().getProcedure(procId));
this.remoteDispatcher = remoteDispatcher;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/d5e4faac/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
index 5ee5f49..6e15acb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.master.procedure;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
+import java.util.function.Function;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.TableName;
@@ -28,6 +29,7 @@ import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface.TableOperationType;
import org.apache.hadoop.hbase.procedure2.AbstractProcedureScheduler;
import org.apache.hadoop.hbase.procedure2.LockAndQueue;
+import org.apache.hadoop.hbase.procedure2.LockStatus;
import org.apache.hadoop.hbase.procedure2.LockedResource;
import org.apache.hadoop.hbase.procedure2.LockedResourceType;
import org.apache.hadoop.hbase.procedure2.Procedure;
@@ -113,11 +115,15 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
private PeerQueue peerMap = null;
private MetaQueue metaMap = null;
- private final SchemaLocking locking = new SchemaLocking();
+ private final SchemaLocking locking;
+
+ public MasterProcedureScheduler(Function<Long, Procedure<?>> procedureRetriever) {
+ locking = new SchemaLocking(procedureRetriever);
+ }
@Override
public void yield(final Procedure proc) {
- push(proc, isTableProcedure(proc), true);
+ push(proc, false, true);
}
@Override
@@ -141,18 +147,17 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
}
}
- private <T extends Comparable<T>> void doAdd(final FairQueue<T> fairq,
- final Queue<T> queue, final Procedure<?> proc, final boolean addFront) {
- if (!queue.getLockStatus().hasExclusiveLock()) {
- // if the queue was not remove for an xlock execution,put the queue back into execution
- queue.add(proc, addFront);
- addToRunQueue(fairq, queue);
- } else if (queue.getLockStatus().hasLockAccess(proc)) {
- // always add it to front as the have the lock access.
- queue.add(proc, true);
+ private <T extends Comparable<T>> void doAdd(FairQueue<T> fairq, Queue<T> queue,
+ Procedure<?> proc, boolean addFront) {
+ queue.add(proc, addFront);
+ // For the following conditions, we will put the queue back into execution
+ // 1. The procedure has already held the lock, or the lock has been restored when restarting,
+ // which means it can be executed immediately.
+ // 2. The exclusive lock for this queue has not been held.
+ // 3. The given procedure has the exclusive lock permission for this queue.
+ if (proc.hasLock() || proc.isLockedWhenLoading() || !queue.getLockStatus().hasExclusiveLock() ||
+ queue.getLockStatus().hasLockAccess(proc)) {
addToRunQueue(fairq, queue);
- } else {
- queue.add(proc, addFront);
}
}
@@ -181,38 +186,40 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
return pollResult;
}
- private <T extends Comparable<T>> Procedure<?> doPoll(final FairQueue<T> fairq) {
- final Queue<T> rq = fairq.poll();
- if (rq == null || !rq.isAvailable()) {
- return null;
+ private <T extends Comparable<T>> boolean isLockReady(Procedure<?> proc, Queue<T> rq) {
+ LockStatus s = rq.getLockStatus();
+ // if we have the lock access, we are ready
+ if (s.hasLockAccess(proc)) {
+ return true;
}
+ boolean xlockReq = rq.requireExclusiveLock(proc);
+ // if we need to hold the xlock, then we need to make sure that no one holds any lock, including
+ // the shared lock, otherwise, we just need to make sure that no one holds the xlock
+ return xlockReq ? !s.isLocked() : !s.hasExclusiveLock();
+ }
- final Procedure<?> pollResult = rq.peek();
- if (pollResult == null) {
- return null;
- }
- final boolean xlockReq = rq.requireExclusiveLock(pollResult);
- if (xlockReq && rq.getLockStatus().isLocked() && !rq.getLockStatus().hasLockAccess(pollResult)) {
- // someone is already holding the lock (e.g. shared lock). avoid a yield
- removeFromRunQueue(fairq, rq);
+ private <T extends Comparable<T>> Procedure<?> doPoll(final FairQueue<T> fairq) {
+ Queue<T> rq = fairq.poll();
+ if (rq == null || !rq.isAvailable()) {
return null;
}
-
- rq.poll();
- if (rq.isEmpty() || xlockReq) {
- removeFromRunQueue(fairq, rq);
- } else if (rq.getLockStatus().hasParentLock(pollResult)) {
- // if the rq is in the fairq because of runnable child
- // check if the next procedure is still a child.
- // if not, remove the rq from the fairq and go back to the xlock state
- Procedure<?> nextProc = rq.peek();
- if (nextProc != null && !Procedure.haveSameParent(nextProc, pollResult)
- && nextProc.getRootProcId() != pollResult.getRootProcId()) {
- removeFromRunQueue(fairq, rq);
+ // loop until we find out a procedure which is ready to run, or if we have checked all the
+ // procedures, then we give up and remove the queue from run queue.
+ for (int i = 0, n = rq.size(); i < n; i++) {
+ Procedure<?> proc = rq.poll();
+ if (isLockReady(proc, rq)) {
+ // the queue is empty, remove from run queue
+ if (rq.isEmpty()) {
+ removeFromRunQueue(fairq, rq);
+ }
+ return proc;
}
+ // we are not ready to run, add back and try the next procedure
+ rq.add(proc, false);
}
-
- return pollResult;
+ // no procedure is ready for execution, remove from run queue
+ removeFromRunQueue(fairq, rq);
+ return null;
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/d5e4faac/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/Queue.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/Queue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/Queue.java
index 43e66d0..c80e98f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/Queue.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/Queue.java
@@ -60,21 +60,8 @@ abstract class Queue<TKey extends Comparable<TKey>> extends AvlLinkedNode<Queue<
return lockStatus;
}
- // This should go away when we have the new AM and its events
- // and we move xlock to the lock-event-queue.
public boolean isAvailable() {
- if (isEmpty()) {
- return false;
- }
- if (getLockStatus().hasExclusiveLock()) {
- // If we have an exclusive lock already taken, only child of the lock owner can be executed
- // And now we will restore locks when master restarts, so it is possible that the procedure
- // which is holding the lock is also in the queue, so we need to use hasLockAccess here
- // instead of hasParentLock
- Procedure<?> nextProc = peek();
- return nextProc != null && getLockStatus().hasLockAccess(nextProc);
- }
- return true;
+ return !isEmpty();
}
// ======================================================================
http://git-wip-us.apache.org/repos/asf/hbase/blob/d5e4faac/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SchemaLocking.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SchemaLocking.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SchemaLocking.java
index afd9185..70e7c59 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SchemaLocking.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SchemaLocking.java
@@ -45,18 +45,25 @@ import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
*/
@InterfaceAudience.Private
class SchemaLocking {
+
+ private final Function<Long, Procedure<?>> procedureRetriever;
private final Map<ServerName, LockAndQueue> serverLocks = new HashMap<>();
private final Map<String, LockAndQueue> namespaceLocks = new HashMap<>();
private final Map<TableName, LockAndQueue> tableLocks = new HashMap<>();
// Single map for all regions irrespective of tables. Key is encoded region name.
private final Map<String, LockAndQueue> regionLocks = new HashMap<>();
private final Map<String, LockAndQueue> peerLocks = new HashMap<>();
- private final LockAndQueue metaLock = new LockAndQueue();
+ private final LockAndQueue metaLock;
+
+ public SchemaLocking(Function<Long, Procedure<?>> procedureRetriever) {
+ this.procedureRetriever = procedureRetriever;
+ this.metaLock = new LockAndQueue(procedureRetriever);
+ }
private <T> LockAndQueue getLock(Map<T, LockAndQueue> map, T key) {
LockAndQueue lock = map.get(key);
if (lock == null) {
- lock = new LockAndQueue();
+ lock = new LockAndQueue(procedureRetriever);
map.put(key, lock);
}
return lock;
http://git-wip-us.apache.org/repos/asf/hbase/blob/d5e4faac/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableQueue.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableQueue.java
index 106dfc3..81c883b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableQueue.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableQueue.java
@@ -34,21 +34,7 @@ class TableQueue extends Queue<TableName> {
@Override
public boolean isAvailable() {
- // if there are no items in the queue, or the namespace is locked.
- // we can't execute operation on this table
- if (isEmpty() || namespaceLockStatus.hasExclusiveLock()) {
- return false;
- }
-
- if (getLockStatus().hasExclusiveLock()) {
- // if we have an exclusive lock already taken
- // only child of the lock owner can be executed
- final Procedure<?> nextProc = peek();
- return nextProc != null && getLockStatus().hasLockAccess(nextProc);
- }
-
- // no xlock
- return true;
+ return !isEmpty() && !namespaceLockStatus.hasExclusiveLock();
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/d5e4faac/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureSchedulerPerformanceEvaluation.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureSchedulerPerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureSchedulerPerformanceEvaluation.java
index 767f30f..ae874d5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureSchedulerPerformanceEvaluation.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureSchedulerPerformanceEvaluation.java
@@ -256,7 +256,7 @@ public class MasterProcedureSchedulerPerformanceEvaluation extends AbstractHBase
@Override
protected int doWork() throws Exception {
- procedureScheduler = new MasterProcedureScheduler();
+ procedureScheduler = new MasterProcedureScheduler(pid -> null);
procedureScheduler.start();
setupOperations();
http://git-wip-us.apache.org/repos/asf/hbase/blob/d5e4faac/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java
index 957c583..66e72d0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java
@@ -29,7 +29,6 @@ import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
@@ -55,12 +54,12 @@ import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-@Category({MasterTests.class, SmallTests.class})
+@Category({ MasterTests.class, SmallTests.class })
public class TestMasterProcedureScheduler {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestMasterProcedureScheduler.class);
+ HBaseClassTestRule.forClass(TestMasterProcedureScheduler.class);
private static final Logger LOG = LoggerFactory.getLogger(TestMasterProcedureScheduler.class);
@@ -71,7 +70,7 @@ public class TestMasterProcedureScheduler {
@Before
public void setUp() throws IOException {
- queue = new MasterProcedureScheduler();
+ queue = new MasterProcedureScheduler(pid -> null);
queue.start();
}
@@ -104,9 +103,9 @@ public class TestMasterProcedureScheduler {
for (int j = 1; j <= NUM_ITEMS; ++j) {
for (int i = 1; i <= NUM_TABLES; ++i) {
- Procedure proc = queue.poll();
+ Procedure<?> proc = queue.poll();
assertTrue(proc != null);
- TableName tableName = ((TestTableProcedure)proc).getTableName();
+ TableName tableName = ((TestTableProcedure) proc).getTableName();
queue.waitTableExclusiveLock(proc, tableName);
queue.wakeTableExclusiveLock(proc, tableName);
queue.completionCleanup(proc);
@@ -118,32 +117,32 @@ public class TestMasterProcedureScheduler {
for (int i = 1; i <= NUM_TABLES; ++i) {
final TableName tableName = TableName.valueOf(String.format("test-%04d", i));
- final TestTableProcedure dummyProc = new TestTableProcedure(100, tableName,
- TableProcedureInterface.TableOperationType.DELETE);
+ final TestTableProcedure dummyProc =
+ new TestTableProcedure(100, tableName, TableProcedureInterface.TableOperationType.DELETE);
// complete the table deletion
assertTrue(queue.markTableAsDeleted(tableName, dummyProc));
}
}
/**
- * Check that the table queue is not deletable until every procedure
- * in-progress is completed (this is a special case for write-locks).
+ * Check that the table queue is not deletable until every procedure in-progress is completed
+ * (this is a special case for write-locks).
*/
@Test
public void testCreateDeleteTableOperationsWithWriteLock() throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName());
- final TestTableProcedure dummyProc = new TestTableProcedure(100, tableName,
- TableProcedureInterface.TableOperationType.DELETE);
+ final TestTableProcedure dummyProc =
+ new TestTableProcedure(100, tableName, TableProcedureInterface.TableOperationType.DELETE);
- queue.addBack(new TestTableProcedure(1, tableName,
- TableProcedureInterface.TableOperationType.EDIT));
+ queue.addBack(
+ new TestTableProcedure(1, tableName, TableProcedureInterface.TableOperationType.EDIT));
// table can't be deleted because one item is in the queue
assertFalse(queue.markTableAsDeleted(tableName, dummyProc));
// fetch item and take a lock
- Procedure proc = queue.poll();
+ Procedure<?> proc = queue.poll();
assertEquals(1, proc.getProcId());
// take the xlock
assertEquals(false, queue.waitTableExclusiveLock(proc, tableName));
@@ -157,29 +156,30 @@ public class TestMasterProcedureScheduler {
}
/**
- * Check that the table queue is not deletable until every procedure
- * in-progress is completed (this is a special case for read-locks).
+ * Check that the table queue is not deletable until every procedure in-progress is completed
+ * (this is a special case for read-locks).
*/
@Test
public void testCreateDeleteTableOperationsWithReadLock() throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName());
final int nitems = 2;
- final TestTableProcedure dummyProc = new TestTableProcedure(100, tableName,
- TableProcedureInterface.TableOperationType.DELETE);
+ final TestTableProcedure dummyProc =
+ new TestTableProcedure(100, tableName, TableProcedureInterface.TableOperationType.DELETE);
for (int i = 1; i <= nitems; ++i) {
- queue.addBack(new TestTableProcedure(i, tableName,
- TableProcedureInterface.TableOperationType.READ));
+ queue.addBack(
+ new TestTableProcedure(i, tableName, TableProcedureInterface.TableOperationType.READ));
}
// table can't be deleted because one item is in the queue
assertFalse(queue.markTableAsDeleted(tableName, dummyProc));
- Procedure[] procs = new Procedure[nitems];
+ Procedure<?>[] procs = new Procedure[nitems];
for (int i = 0; i < nitems; ++i) {
// fetch item and take a lock
- Procedure proc = procs[i] = queue.poll();
+ Procedure<?> proc = queue.poll();
+ procs[i] = proc;
assertEquals(i + 1, proc.getProcId());
// take the rlock
assertEquals(false, queue.waitTableSharedLock(proc, tableName));
@@ -206,19 +206,15 @@ public class TestMasterProcedureScheduler {
@Test
public void testVerifyRwLocks() throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName());
- queue.addBack(new TestTableProcedure(1, tableName,
- TableProcedureInterface.TableOperationType.EDIT));
- queue.addBack(new TestTableProcedure(2, tableName,
- TableProcedureInterface.TableOperationType.READ));
- queue.addBack(new TestTableProcedure(3, tableName,
- TableProcedureInterface.TableOperationType.EDIT));
- queue.addBack(new TestTableProcedure(4, tableName,
- TableProcedureInterface.TableOperationType.READ));
- queue.addBack(new TestTableProcedure(5, tableName,
- TableProcedureInterface.TableOperationType.READ));
+ queue.addBack(
+ new TestTableProcedure(1, tableName, TableProcedureInterface.TableOperationType.EDIT));
+ queue.addBack(
+ new TestTableProcedure(2, tableName, TableProcedureInterface.TableOperationType.READ));
+ queue.addBack(
+ new TestTableProcedure(3, tableName, TableProcedureInterface.TableOperationType.EDIT));
// Fetch the 1st item and take the write lock
- Procedure proc = queue.poll();
+ Procedure<?> proc = queue.poll();
assertEquals(1, proc.getProcId());
assertEquals(false, queue.waitTableExclusiveLock(proc, tableName));
@@ -229,7 +225,7 @@ public class TestMasterProcedureScheduler {
queue.wakeTableExclusiveLock(proc, tableName);
// Fetch the 2nd item and take the read lock
- Procedure rdProc = queue.poll();
+ Procedure<?> rdProc = queue.poll();
assertEquals(2, rdProc.getProcId());
assertEquals(false, queue.waitTableSharedLock(rdProc, tableName));
@@ -239,8 +235,13 @@ public class TestMasterProcedureScheduler {
// release the rdlock of item 2 and take the wrlock for the 3d item
queue.wakeTableSharedLock(rdProc, tableName);
+ queue.addBack(
+ new TestTableProcedure(4, tableName, TableProcedureInterface.TableOperationType.READ));
+ queue.addBack(
+ new TestTableProcedure(5, tableName, TableProcedureInterface.TableOperationType.READ));
+
// Fetch the 3rd item and take the write lock
- Procedure wrProc = queue.poll();
+ Procedure<?> wrProc = queue.poll();
assertEquals(false, queue.waitTableExclusiveLock(wrProc, tableName));
// Fetch 4th item and verify that the lock can't be acquired
@@ -255,7 +256,7 @@ public class TestMasterProcedureScheduler {
assertEquals(false, queue.waitTableSharedLock(rdProc, tableName));
// Fetch the 4th item and take the read lock
- Procedure rdProc2 = queue.poll();
+ Procedure<?> rdProc2 = queue.poll();
assertEquals(5, rdProc2.getProcId());
assertEquals(false, queue.waitTableSharedLock(rdProc2, tableName));
@@ -274,22 +275,22 @@ public class TestMasterProcedureScheduler {
String nsName2 = "ns2";
TableName tableName1 = TableName.valueOf(nsName1, name.getMethodName());
TableName tableName2 = TableName.valueOf(nsName2, name.getMethodName());
- queue.addBack(new TestNamespaceProcedure(1, nsName1,
- TableProcedureInterface.TableOperationType.EDIT));
- queue.addBack(new TestTableProcedure(2, tableName1,
- TableProcedureInterface.TableOperationType.EDIT));
- queue.addBack(new TestTableProcedure(3, tableName2,
- TableProcedureInterface.TableOperationType.EDIT));
- queue.addBack(new TestNamespaceProcedure(4, nsName2,
- TableProcedureInterface.TableOperationType.EDIT));
+ queue.addBack(
+ new TestNamespaceProcedure(1, nsName1, TableProcedureInterface.TableOperationType.EDIT));
+ queue.addBack(
+ new TestTableProcedure(2, tableName1, TableProcedureInterface.TableOperationType.EDIT));
+ queue.addBack(
+ new TestTableProcedure(3, tableName2, TableProcedureInterface.TableOperationType.EDIT));
+ queue.addBack(
+ new TestNamespaceProcedure(4, nsName2, TableProcedureInterface.TableOperationType.EDIT));
// Fetch the 1st item and take the write lock
- Procedure procNs1 = queue.poll();
+ Procedure<?> procNs1 = queue.poll();
assertEquals(1, procNs1.getProcId());
assertFalse(queue.waitNamespaceExclusiveLock(procNs1, nsName1));
// namespace table has higher priority so we still return procedure for it
- Procedure procNs2 = queue.poll();
+ Procedure<?> procNs2 = queue.poll();
assertEquals(4, procNs2.getProcId());
assertFalse(queue.waitNamespaceExclusiveLock(procNs2, nsName2));
queue.wakeNamespaceExclusiveLock(procNs2, nsName2);
@@ -324,13 +325,13 @@ public class TestMasterProcedureScheduler {
public void testVerifyNamespaceXLock() throws Exception {
String nsName = "ns1";
TableName tableName = TableName.valueOf(nsName, name.getMethodName());
- queue.addBack(new TestNamespaceProcedure(1, nsName,
- TableProcedureInterface.TableOperationType.CREATE));
- queue.addBack(new TestTableProcedure(2, tableName,
- TableProcedureInterface.TableOperationType.READ));
+ queue.addBack(
+ new TestNamespaceProcedure(1, nsName, TableProcedureInterface.TableOperationType.CREATE));
+ queue.addBack(
+ new TestTableProcedure(2, tableName, TableProcedureInterface.TableOperationType.READ));
// Fetch the ns item and take the xlock
- Procedure proc = queue.poll();
+ Procedure<?> proc = queue.poll();
assertEquals(1, proc.getProcId());
assertEquals(false, queue.waitNamespaceExclusiveLock(proc, nsName));
@@ -349,17 +350,16 @@ public class TestMasterProcedureScheduler {
@Test
public void testXLockWaitingForExecutingSharedLockToRelease() {
final TableName tableName = TableName.valueOf(name.getMethodName());
- final HRegionInfo regionA = new HRegionInfo(tableName, Bytes.toBytes("a"), Bytes.toBytes("b"));
+ final RegionInfo regionA = RegionInfoBuilder.newBuilder(tableName)
+ .setStartKey(Bytes.toBytes("a")).setEndKey(Bytes.toBytes("b")).build();
queue.addBack(new TestRegionProcedure(1, tableName,
- TableProcedureInterface.TableOperationType.REGION_ASSIGN, regionA));
- queue.addBack(new TestTableProcedure(2, tableName,
- TableProcedureInterface.TableOperationType.EDIT));
- queue.addBack(new TestRegionProcedure(3, tableName,
- TableProcedureInterface.TableOperationType.REGION_UNASSIGN, regionA));
+ TableProcedureInterface.TableOperationType.REGION_ASSIGN, regionA));
+ queue.addBack(
+ new TestTableProcedure(2, tableName, TableProcedureInterface.TableOperationType.EDIT));
// Fetch the 1st item and take the shared lock
- Procedure proc = queue.poll();
+ Procedure<?> proc = queue.poll();
assertEquals(1, proc.getProcId());
assertEquals(false, queue.waitRegion(proc, regionA));
@@ -374,6 +374,9 @@ public class TestMasterProcedureScheduler {
assertEquals(2, proc.getProcId());
assertEquals(false, queue.waitTableExclusiveLock(proc, tableName));
+ queue.addBack(new TestRegionProcedure(3, tableName,
+ TableProcedureInterface.TableOperationType.REGION_UNASSIGN, regionA));
+
// everything is locked by the table operation
assertEquals(null, queue.poll(0));
@@ -393,23 +396,26 @@ public class TestMasterProcedureScheduler {
@Test
public void testVerifyRegionLocks() throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName());
- final HRegionInfo regionA = new HRegionInfo(tableName, Bytes.toBytes("a"), Bytes.toBytes("b"));
- final HRegionInfo regionB = new HRegionInfo(tableName, Bytes.toBytes("b"), Bytes.toBytes("c"));
- final HRegionInfo regionC = new HRegionInfo(tableName, Bytes.toBytes("c"), Bytes.toBytes("d"));
-
- queue.addBack(new TestTableProcedure(1, tableName,
- TableProcedureInterface.TableOperationType.EDIT));
+ final RegionInfo regionA = RegionInfoBuilder.newBuilder(tableName)
+ .setStartKey(Bytes.toBytes("a")).setEndKey(Bytes.toBytes("b")).build();
+ final RegionInfo regionB = RegionInfoBuilder.newBuilder(tableName)
+ .setStartKey(Bytes.toBytes("b")).setEndKey(Bytes.toBytes("c")).build();
+ final RegionInfo regionC = RegionInfoBuilder.newBuilder(tableName)
+ .setStartKey(Bytes.toBytes("c")).setEndKey(Bytes.toBytes("d")).build();
+
+ queue.addBack(
+ new TestTableProcedure(1, tableName, TableProcedureInterface.TableOperationType.EDIT));
queue.addBack(new TestRegionProcedure(2, tableName,
- TableProcedureInterface.TableOperationType.REGION_MERGE, regionA, regionB));
+ TableProcedureInterface.TableOperationType.REGION_MERGE, regionA, regionB));
queue.addBack(new TestRegionProcedure(3, tableName,
- TableProcedureInterface.TableOperationType.REGION_SPLIT, regionA));
+ TableProcedureInterface.TableOperationType.REGION_SPLIT, regionA));
queue.addBack(new TestRegionProcedure(4, tableName,
- TableProcedureInterface.TableOperationType.REGION_SPLIT, regionB));
+ TableProcedureInterface.TableOperationType.REGION_SPLIT, regionB));
queue.addBack(new TestRegionProcedure(5, tableName,
- TableProcedureInterface.TableOperationType.REGION_UNASSIGN, regionC));
+ TableProcedureInterface.TableOperationType.REGION_UNASSIGN, regionC));
// Fetch the 1st item and take the write lock
- Procedure proc = queue.poll();
+ Procedure<?> proc = queue.poll();
assertEquals(1, proc.getProcId());
assertEquals(false, queue.waitTableExclusiveLock(proc, tableName));
@@ -420,24 +426,24 @@ public class TestMasterProcedureScheduler {
queue.wakeTableExclusiveLock(proc, tableName);
// Fetch the 2nd item and the the lock on regionA and regionB
- Procedure mergeProc = queue.poll();
+ Procedure<?> mergeProc = queue.poll();
assertEquals(2, mergeProc.getProcId());
assertEquals(false, queue.waitRegions(mergeProc, tableName, regionA, regionB));
// Fetch the 3rd item and the try to lock region A which will fail
// because already locked. this procedure will go in waiting.
// (this stuff will be explicit until we get rid of the zk-lock)
- Procedure procA = queue.poll();
+ Procedure<?> procA = queue.poll();
assertEquals(3, procA.getProcId());
assertEquals(true, queue.waitRegions(procA, tableName, regionA));
// Fetch the 4th item, same story as the 3rd
- Procedure procB = queue.poll();
+ Procedure<?> procB = queue.poll();
assertEquals(4, procB.getProcId());
assertEquals(true, queue.waitRegions(procB, tableName, regionB));
// Fetch the 5th item, since it is a non-locked region we are able to execute it
- Procedure procC = queue.poll();
+ Procedure<?> procC = queue.poll();
assertEquals(5, procC.getProcId());
assertEquals(false, queue.waitRegions(procC, tableName, regionC));
@@ -466,15 +472,18 @@ public class TestMasterProcedureScheduler {
@Test
public void testVerifySubProcRegionLocks() throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName());
- final HRegionInfo regionA = new HRegionInfo(tableName, Bytes.toBytes("a"), Bytes.toBytes("b"));
- final HRegionInfo regionB = new HRegionInfo(tableName, Bytes.toBytes("b"), Bytes.toBytes("c"));
- final HRegionInfo regionC = new HRegionInfo(tableName, Bytes.toBytes("c"), Bytes.toBytes("d"));
+ final RegionInfo regionA = RegionInfoBuilder.newBuilder(tableName)
+ .setStartKey(Bytes.toBytes("a")).setEndKey(Bytes.toBytes("b")).build();
+ final RegionInfo regionB = RegionInfoBuilder.newBuilder(tableName)
+ .setStartKey(Bytes.toBytes("b")).setEndKey(Bytes.toBytes("c")).build();
+ final RegionInfo regionC = RegionInfoBuilder.newBuilder(tableName)
+ .setStartKey(Bytes.toBytes("c")).setEndKey(Bytes.toBytes("d")).build();
- queue.addBack(new TestTableProcedure(1, tableName,
- TableProcedureInterface.TableOperationType.ENABLE));
+ queue.addBack(
+ new TestTableProcedure(1, tableName, TableProcedureInterface.TableOperationType.ENABLE));
// Fetch the 1st item from the queue, "the root procedure" and take the table lock
- Procedure rootProc = queue.poll();
+ Procedure<?> rootProc = queue.poll();
assertEquals(1, rootProc.getProcId());
assertEquals(false, queue.waitTableExclusiveLock(rootProc, tableName));
assertEquals(null, queue.poll(0));
@@ -482,14 +491,13 @@ public class TestMasterProcedureScheduler {
// Execute the 1st step of the root-proc.
// we should get 3 sub-proc back, one for each region.
// (this step is done by the executor/rootProc, we are simulating it)
- Procedure[] subProcs = new Procedure[] {
+ Procedure<?>[] subProcs = new Procedure[] {
new TestRegionProcedure(1, 2, tableName,
TableProcedureInterface.TableOperationType.REGION_EDIT, regionA),
new TestRegionProcedure(1, 3, tableName,
TableProcedureInterface.TableOperationType.REGION_EDIT, regionB),
new TestRegionProcedure(1, 4, tableName,
- TableProcedureInterface.TableOperationType.REGION_EDIT, regionC),
- };
+ TableProcedureInterface.TableOperationType.REGION_EDIT, regionC), };
// at this point the rootProc is going in a waiting state
// and the sub-procedures will be added in the queue.
@@ -502,7 +510,7 @@ public class TestMasterProcedureScheduler {
// we should be able to fetch and execute all the sub-procs,
// since they are operating on different regions
for (int i = 0; i < subProcs.length; ++i) {
- TestRegionProcedure regionProc = (TestRegionProcedure)queue.poll(0);
+ TestRegionProcedure regionProc = (TestRegionProcedure) queue.poll(0);
assertEquals(subProcs[i].getProcId(), regionProc.getProcId());
assertEquals(false, queue.waitRegions(regionProc, tableName, regionProc.getRegionInfo()));
}
@@ -512,7 +520,7 @@ public class TestMasterProcedureScheduler {
// release all the region locks
for (int i = 0; i < subProcs.length; ++i) {
- TestRegionProcedure regionProc = (TestRegionProcedure)subProcs[i];
+ TestRegionProcedure regionProc = (TestRegionProcedure) subProcs[i];
queue.wakeRegions(regionProc, tableName, regionProc.getRegionInfo());
}
@@ -526,27 +534,28 @@ public class TestMasterProcedureScheduler {
@Test
public void testInheritedRegionXLock() {
final TableName tableName = TableName.valueOf(name.getMethodName());
- final HRegionInfo region = new HRegionInfo(tableName, Bytes.toBytes("a"), Bytes.toBytes("b"));
+ final RegionInfo region = RegionInfoBuilder.newBuilder(tableName)
+ .setStartKey(Bytes.toBytes("a")).setEndKey(Bytes.toBytes("b")).build();
queue.addBack(new TestRegionProcedure(1, tableName,
- TableProcedureInterface.TableOperationType.REGION_SPLIT, region));
+ TableProcedureInterface.TableOperationType.REGION_SPLIT, region));
queue.addBack(new TestRegionProcedure(1, 2, tableName,
- TableProcedureInterface.TableOperationType.REGION_UNASSIGN, region));
+ TableProcedureInterface.TableOperationType.REGION_UNASSIGN, region));
queue.addBack(new TestRegionProcedure(3, tableName,
- TableProcedureInterface.TableOperationType.REGION_EDIT, region));
+ TableProcedureInterface.TableOperationType.REGION_EDIT, region));
// fetch the root proc and take the lock on the region
- Procedure rootProc = queue.poll();
+ Procedure<?> rootProc = queue.poll();
assertEquals(1, rootProc.getProcId());
assertEquals(false, queue.waitRegion(rootProc, region));
// fetch the sub-proc and take the lock on the region (inherited lock)
- Procedure childProc = queue.poll();
+ Procedure<?> childProc = queue.poll();
assertEquals(2, childProc.getProcId());
assertEquals(false, queue.waitRegion(childProc, region));
// proc-3 will be fetched but it can't take the lock
- Procedure proc = queue.poll();
+ Procedure<?> proc = queue.poll();
assertEquals(3, proc.getProcId());
assertEquals(true, queue.waitRegion(proc, region));
@@ -570,16 +579,16 @@ public class TestMasterProcedureScheduler {
public void testSuspendedProcedure() throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName());
- queue.addBack(new TestTableProcedure(1, tableName,
- TableProcedureInterface.TableOperationType.READ));
- queue.addBack(new TestTableProcedure(2, tableName,
- TableProcedureInterface.TableOperationType.READ));
+ queue.addBack(
+ new TestTableProcedure(1, tableName, TableProcedureInterface.TableOperationType.READ));
+ queue.addBack(
+ new TestTableProcedure(2, tableName, TableProcedureInterface.TableOperationType.READ));
- Procedure proc = queue.poll();
+ Procedure<?> proc = queue.poll();
assertEquals(1, proc.getProcId());
// suspend
- ProcedureEvent event = new ProcedureEvent("testSuspendedProcedureEvent");
+ ProcedureEvent<?> event = new ProcedureEvent<>("testSuspendedProcedureEvent");
assertEquals(true, event.suspendIfNotReady(proc));
proc = queue.poll();
@@ -594,51 +603,50 @@ public class TestMasterProcedureScheduler {
assertEquals(null, queue.poll(0));
}
- private static HRegionInfo[] generateRegionInfo(final TableName tableName) {
- return new HRegionInfo[] {
- new HRegionInfo(tableName, Bytes.toBytes("a"), Bytes.toBytes("b")),
- new HRegionInfo(tableName, Bytes.toBytes("b"), Bytes.toBytes("c")),
- new HRegionInfo(tableName, Bytes.toBytes("c"), Bytes.toBytes("d")),
- };
+ private static RegionInfo[] generateRegionInfo(final TableName tableName) {
+ return new RegionInfo[] {
+ RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes("a"))
+ .setEndKey(Bytes.toBytes("b")).build(),
+ RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes("b"))
+ .setEndKey(Bytes.toBytes("c")).build(),
+ RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes("c"))
+ .setEndKey(Bytes.toBytes("d")).build() };
}
@Test
public void testParentXLockAndChildrenSharedLock() throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName());
- final HRegionInfo[] regions = generateRegionInfo(tableName);
+ final RegionInfo[] regions = generateRegionInfo(tableName);
final TestRegionProcedure[] childProcs = new TestRegionProcedure[regions.length];
for (int i = 0; i < regions.length; ++i) {
childProcs[i] = new TestRegionProcedure(1, 2 + i, tableName,
- TableProcedureInterface.TableOperationType.REGION_ASSIGN, regions[i]);
+ TableProcedureInterface.TableOperationType.REGION_ASSIGN, regions[i]);
}
testInheritedXLockAndChildrenSharedLock(tableName,
new TestTableProcedure(1, tableName, TableProcedureInterface.TableOperationType.CREATE),
- childProcs
- );
+ childProcs);
}
@Test
public void testRootXLockAndChildrenSharedLock() throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName());
- final HRegionInfo[] regions = generateRegionInfo(tableName);
+ final RegionInfo[] regions = generateRegionInfo(tableName);
final TestRegionProcedure[] childProcs = new TestRegionProcedure[regions.length];
for (int i = 0; i < regions.length; ++i) {
childProcs[i] = new TestRegionProcedure(1, 2, 3 + i, tableName,
- TableProcedureInterface.TableOperationType.REGION_ASSIGN, regions[i]);
+ TableProcedureInterface.TableOperationType.REGION_ASSIGN, regions[i]);
}
testInheritedXLockAndChildrenSharedLock(tableName,
new TestTableProcedure(1, tableName, TableProcedureInterface.TableOperationType.CREATE),
- childProcs
- );
+ childProcs);
}
private void testInheritedXLockAndChildrenSharedLock(final TableName tableName,
- final TestTableProcedure rootProc, final TestRegionProcedure[] childProcs)
- throws Exception {
+ final TestTableProcedure rootProc, final TestRegionProcedure[] childProcs) throws Exception {
queue.addBack(rootProc);
// fetch and acquire first xlock proc
- Procedure parentProc = queue.poll();
+ Procedure<?> parentProc = queue.poll();
assertEquals(rootProc, parentProc);
assertEquals(false, queue.waitTableExclusiveLock(parentProc, tableName));
@@ -648,12 +656,12 @@ public class TestMasterProcedureScheduler {
}
// add another xlock procedure (no parent)
- queue.addBack(new TestTableProcedure(100, tableName,
- TableProcedureInterface.TableOperationType.EDIT));
+ queue.addBack(
+ new TestTableProcedure(100, tableName, TableProcedureInterface.TableOperationType.EDIT));
// fetch and execute child
for (int i = 0; i < childProcs.length; ++i) {
- TestRegionProcedure childProc = (TestRegionProcedure)queue.poll();
+ TestRegionProcedure childProc = (TestRegionProcedure) queue.poll();
LOG.debug("fetch children " + childProc);
assertEquals(false, queue.waitRegions(childProc, tableName, childProc.getRegionInfo()));
queue.wakeRegions(childProc, tableName, childProc.getRegionInfo());
@@ -666,7 +674,7 @@ public class TestMasterProcedureScheduler {
queue.wakeTableExclusiveLock(parentProc, tableName);
// fetch the other xlock proc
- Procedure proc = queue.poll();
+ Procedure<?> proc = queue.poll();
assertEquals(100, proc.getProcId());
assertEquals(false, queue.waitTableExclusiveLock(proc, tableName));
queue.wakeTableExclusiveLock(proc, tableName);
@@ -677,8 +685,7 @@ public class TestMasterProcedureScheduler {
final TableName tableName = TableName.valueOf(name.getMethodName());
testInheritedXLockAndChildrenXLock(tableName,
new TestTableProcedure(1, tableName, TableProcedureInterface.TableOperationType.EDIT),
- new TestTableProcedure(1, 2, tableName, TableProcedureInterface.TableOperationType.EDIT)
- );
+ new TestTableProcedure(1, 2, tableName, TableProcedureInterface.TableOperationType.EDIT));
}
@Test
@@ -687,8 +694,7 @@ public class TestMasterProcedureScheduler {
// simulate 3 procedures: 1 (root), (2) child of root, (3) child of proc-2
testInheritedXLockAndChildrenXLock(tableName,
new TestTableProcedure(1, tableName, TableProcedureInterface.TableOperationType.EDIT),
- new TestTableProcedure(1, 2, 3, tableName, TableProcedureInterface.TableOperationType.EDIT)
- );
+ new TestTableProcedure(1, 2, 3, tableName, TableProcedureInterface.TableOperationType.EDIT));
}
private void testInheritedXLockAndChildrenXLock(final TableName tableName,
@@ -696,7 +702,7 @@ public class TestMasterProcedureScheduler {
queue.addBack(rootProc);
// fetch and acquire first xlock proc
- Procedure parentProc = queue.poll();
+ Procedure<?> parentProc = queue.poll();
assertEquals(rootProc, parentProc);
assertEquals(false, queue.waitTableExclusiveLock(parentProc, tableName));
@@ -704,7 +710,7 @@ public class TestMasterProcedureScheduler {
queue.addFront(childProc);
// fetch the other xlock proc
- Procedure proc = queue.poll();
+ Procedure<?> proc = queue.poll();
assertEquals(childProc, proc);
assertEquals(false, queue.waitTableExclusiveLock(proc, tableName));
queue.wakeTableExclusiveLock(proc, tableName);
@@ -717,13 +723,13 @@ public class TestMasterProcedureScheduler {
public void testYieldWithXLockHeld() throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName());
- queue.addBack(new TestTableProcedure(1, tableName,
- TableProcedureInterface.TableOperationType.EDIT));
- queue.addBack(new TestTableProcedure(2, tableName,
- TableProcedureInterface.TableOperationType.EDIT));
+ queue.addBack(
+ new TestTableProcedure(1, tableName, TableProcedureInterface.TableOperationType.EDIT));
+ queue.addBack(
+ new TestTableProcedure(2, tableName, TableProcedureInterface.TableOperationType.EDIT));
// fetch from the queue and acquire xlock for the first proc
- Procedure proc = queue.poll();
+ Procedure<?> proc = queue.poll();
assertEquals(1, proc.getProcId());
assertEquals(false, queue.waitTableExclusiveLock(proc, tableName));
@@ -748,20 +754,20 @@ public class TestMasterProcedureScheduler {
public void testYieldWithSharedLockHeld() throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName());
- queue.addBack(new TestTableProcedure(1, tableName,
- TableProcedureInterface.TableOperationType.READ));
- queue.addBack(new TestTableProcedure(2, tableName,
- TableProcedureInterface.TableOperationType.READ));
- queue.addBack(new TestTableProcedure(3, tableName,
- TableProcedureInterface.TableOperationType.EDIT));
+ queue.addBack(
+ new TestTableProcedure(1, tableName, TableProcedureInterface.TableOperationType.READ));
+ queue.addBack(
+ new TestTableProcedure(2, tableName, TableProcedureInterface.TableOperationType.READ));
+ queue.addBack(
+ new TestTableProcedure(3, tableName, TableProcedureInterface.TableOperationType.EDIT));
// fetch and acquire the first shared-lock
- Procedure proc1 = queue.poll();
+ Procedure<?> proc1 = queue.poll();
assertEquals(1, proc1.getProcId());
assertEquals(false, queue.waitTableSharedLock(proc1, tableName));
// fetch and acquire the second shared-lock
- Procedure proc2 = queue.poll();
+ Procedure<?> proc2 = queue.poll();
assertEquals(2, proc2.getProcId());
assertEquals(false, queue.waitTableSharedLock(proc2, tableName));
@@ -769,8 +775,8 @@ public class TestMasterProcedureScheduler {
assertEquals(null, queue.poll(0));
// put the procs back in the queue
- queue.yield(proc2);
queue.yield(proc1);
+ queue.yield(proc2);
// fetch from the queue, it should fetch the ones with just added back
proc1 = queue.poll();
@@ -782,12 +788,11 @@ public class TestMasterProcedureScheduler {
queue.wakeTableSharedLock(proc1, tableName);
queue.wakeTableSharedLock(proc2, tableName);
- Procedure proc3 = queue.poll();
+ Procedure<?> proc3 = queue.poll();
assertEquals(3, proc3.getProcId());
}
- public static class TestTableProcedure extends TestProcedure
- implements TableProcedureInterface {
+ public static class TestTableProcedure extends TestProcedure implements TableProcedureInterface {
private final TableOperationType opType;
private final TableName tableName;
@@ -810,6 +815,7 @@ public class TestMasterProcedureScheduler {
this.tableName = tableName;
this.opType = opType;
}
+
@Override
public TableName getTableName() {
return tableName;
@@ -830,14 +836,15 @@ public class TestMasterProcedureScheduler {
}
public static class TestTableProcedureWithEvent extends TestTableProcedure {
- private final ProcedureEvent event;
+ private final ProcedureEvent<?> event;
- public TestTableProcedureWithEvent(long procId, TableName tableName, TableOperationType opType) {
+ public TestTableProcedureWithEvent(long procId, TableName tableName,
+ TableOperationType opType) {
super(procId, tableName, opType);
- event = new ProcedureEvent(tableName + " procId=" + procId);
+ event = new ProcedureEvent<>(tableName + " procId=" + procId);
}
- public ProcedureEvent getEvent() {
+ public ProcedureEvent<?> getEvent() {
return event;
}
}
@@ -933,7 +940,8 @@ public class TestMasterProcedureScheduler {
}
}
- private static LockProcedure createLockProcedure(LockType lockType, long procId) throws Exception {
+ private static LockProcedure createLockProcedure(LockType lockType, long procId)
+ throws Exception {
LockProcedure procedure = new LockProcedure();
Field typeField = LockProcedure.class.getDeclaredField("type");
@@ -1001,7 +1009,7 @@ public class TestMasterProcedureScheduler {
LockedResource tableResource = locks.get(1);
assertLockResource(tableResource, LockedResourceType.TABLE,
- TableName.NAMESPACE_TABLE_NAME.getNameAsString());
+ TableName.NAMESPACE_TABLE_NAME.getNameAsString());
assertSharedLock(tableResource, 1);
assertTrue(tableResource.getWaitingProcedures().isEmpty());
}
@@ -1028,7 +1036,8 @@ public class TestMasterProcedureScheduler {
@Test
public void testListLocksRegion() throws Exception {
LockProcedure procedure = createExclusiveLockProcedure(3);
- HRegionInfo regionInfo = new HRegionInfo(TableName.valueOf("ns3", "table3"));
+ RegionInfo regionInfo =
+ RegionInfoBuilder.newBuilder(TableName.valueOf("ns3", "table3")).build();
queue.waitRegion(procedure, regionInfo);
@@ -1144,4 +1153,3 @@ public class TestMasterProcedureScheduler {
queue.wakeTableExclusiveLock(parentProc, tableName);
}
}
-
http://git-wip-us.apache.org/repos/asf/hbase/blob/d5e4faac/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureSchedulerConcurrency.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureSchedulerConcurrency.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureSchedulerConcurrency.java
index 1313cdb..7a43f75 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureSchedulerConcurrency.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureSchedulerConcurrency.java
@@ -54,7 +54,7 @@ public class TestMasterProcedureSchedulerConcurrency {
@Before
public void setUp() throws IOException {
- queue = new MasterProcedureScheduler();
+ queue = new MasterProcedureScheduler(pid -> null);
queue.start();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/d5e4faac/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestSchedulerQueueDeadLock.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestSchedulerQueueDeadLock.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestSchedulerQueueDeadLock.java
new file mode 100644
index 0000000..5fc08b5
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestSchedulerQueueDeadLock.java
@@ -0,0 +1,276 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.procedure;
+
+import java.io.IOException;
+import java.util.concurrent.Semaphore;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
+import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
+import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
+import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
+import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category({ MasterTests.class, LargeTests.class })
+public class TestSchedulerQueueDeadLock {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestSchedulerQueueDeadLock.class);
+
+ private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+ private static final TableName TABLE_NAME = TableName.valueOf("deadlock");
+
+ private static final class TestEnv {
+ private final MasterProcedureScheduler scheduler;
+
+ public TestEnv(MasterProcedureScheduler scheduler) {
+ this.scheduler = scheduler;
+ }
+
+ public MasterProcedureScheduler getScheduler() {
+ return scheduler;
+ }
+ }
+
+ public static class TableSharedProcedure extends NoopProcedure<TestEnv>
+ implements TableProcedureInterface {
+
+ private final Semaphore latch = new Semaphore(0);
+
+ @Override
+ protected Procedure<TestEnv>[] execute(TestEnv env)
+ throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
+ latch.acquire();
+ return null;
+ }
+
+ @Override
+ protected LockState acquireLock(TestEnv env) {
+ if (env.getScheduler().waitTableSharedLock(this, getTableName())) {
+ return LockState.LOCK_EVENT_WAIT;
+ }
+ return LockState.LOCK_ACQUIRED;
+ }
+
+ @Override
+ protected void releaseLock(TestEnv env) {
+ env.getScheduler().wakeTableSharedLock(this, getTableName());
+ }
+
+ @Override
+ protected boolean holdLock(TestEnv env) {
+ return true;
+ }
+
+ @Override
+ public TableName getTableName() {
+ return TABLE_NAME;
+ }
+
+ @Override
+ public TableOperationType getTableOperationType() {
+ return TableOperationType.READ;
+ }
+ }
+
+ public static class TableExclusiveProcedure extends NoopProcedure<TestEnv>
+ implements TableProcedureInterface {
+
+ private final Semaphore latch = new Semaphore(0);
+
+ @Override
+ protected Procedure<TestEnv>[] execute(TestEnv env)
+ throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
+ latch.acquire();
+ return null;
+ }
+
+ @Override
+ protected LockState acquireLock(TestEnv env) {
+ if (env.getScheduler().waitTableExclusiveLock(this, getTableName())) {
+ return LockState.LOCK_EVENT_WAIT;
+ }
+ return LockState.LOCK_ACQUIRED;
+ }
+
+ @Override
+ protected void releaseLock(TestEnv env) {
+ env.getScheduler().wakeTableExclusiveLock(this, getTableName());
+ }
+
+ @Override
+ protected boolean holdLock(TestEnv env) {
+ return true;
+ }
+
+ @Override
+ public TableName getTableName() {
+ return TABLE_NAME;
+ }
+
+ @Override
+ public TableOperationType getTableOperationType() {
+ return TableOperationType.EDIT;
+ }
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws IOException {
+ UTIL.cleanupTestDir();
+ }
+
+ private WALProcedureStore procStore;
+
+ private ProcedureExecutor<TestEnv> procExec;
+
+ @Rule
+ public final TestName name = new TestName();
+
+ @Before
+ public void setUp() throws IOException {
+ UTIL.getConfiguration().setInt("hbase.procedure.worker.stuck.threshold.msec", 6000000);
+ procStore = ProcedureTestingUtility.createWalStore(UTIL.getConfiguration(),
+ UTIL.getDataTestDir(name.getMethodName()));
+ procStore.start(1);
+ MasterProcedureScheduler scheduler = new MasterProcedureScheduler(pid -> null);
+ procExec = new ProcedureExecutor<>(UTIL.getConfiguration(), new TestEnv(scheduler), procStore,
+ scheduler);
+ procExec.init(1, false);
+ }
+
+ @After
+ public void tearDown() {
+ procExec.stop();
+ procStore.stop(false);
+ }
+
+ public static final class TableSharedProcedureWithId extends TableSharedProcedure {
+
+ @Override
+ protected void setProcId(long procId) {
+ // this is a hack to make this procedure be loaded after the procedure below as we will sort
+ // the procedures by id when loading.
+ super.setProcId(2L);
+ }
+ }
+
+ public static final class TableExclusiveProcedureWithId extends TableExclusiveProcedure {
+
+ @Override
+ protected void setProcId(long procId) {
+ // this is a hack to make this procedure be loaded before the procedure above as we will
+ // sort the procedures by id when loading.
+ super.setProcId(1L);
+ }
+ }
+
+ @Test
+ public void testTableProcedureDeadLockAfterRestarting() throws Exception {
+ // let the shared procedure run first, but let it have a greater procId so when loading it will
+ // be loaded at last.
+ long procId1 = procExec.submitProcedure(new TableSharedProcedureWithId());
+ long procId2 = procExec.submitProcedure(new TableExclusiveProcedureWithId());
+ procExec.startWorkers();
+ UTIL.waitFor(10000,
+ () -> ((TableSharedProcedure) procExec.getProcedure(procId1)).latch.hasQueuedThreads());
+
+ ProcedureTestingUtility.restart(procExec);
+
+ ((TableSharedProcedure) procExec.getProcedure(procId1)).latch.release();
+ ((TableExclusiveProcedure) procExec.getProcedure(procId2)).latch.release();
+
+ UTIL.waitFor(10000, () -> procExec.isFinished(procId1));
+ UTIL.waitFor(10000, () -> procExec.isFinished(procId2));
+ }
+
+ public static final class TableShardParentProcedure extends NoopProcedure<TestEnv>
+ implements TableProcedureInterface {
+
+ private boolean scheduled;
+
+ @Override
+ protected Procedure<TestEnv>[] execute(TestEnv env)
+ throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
+ if (!scheduled) {
+ scheduled = true;
+ return new Procedure[] { new TableSharedProcedure() };
+ }
+ return null;
+ }
+
+ @Override
+ protected LockState acquireLock(TestEnv env) {
+ if (env.getScheduler().waitTableSharedLock(this, getTableName())) {
+ return LockState.LOCK_EVENT_WAIT;
+ }
+ return LockState.LOCK_ACQUIRED;
+ }
+
+ @Override
+ protected void releaseLock(TestEnv env) {
+ env.getScheduler().wakeTableSharedLock(this, getTableName());
+ }
+
+ @Override
+ protected boolean holdLock(TestEnv env) {
+ return true;
+ }
+
+ @Override
+ public TableName getTableName() {
+ return TABLE_NAME;
+ }
+
+ @Override
+ public TableOperationType getTableOperationType() {
+ return TableOperationType.READ;
+ }
+ }
+
+ @Test
+ public void testTableProcedureSubProcedureDeadLock() throws Exception {
+ // the shared procedure will also schedule a shared procedure, but after the exclusive procedure
+ long procId1 = procExec.submitProcedure(new TableShardParentProcedure());
+ long procId2 = procExec.submitProcedure(new TableExclusiveProcedure());
+ procExec.startWorkers();
+ UTIL.waitFor(10000,
+ () -> procExec.getProcedures().stream().anyMatch(p -> p instanceof TableSharedProcedure));
+ procExec.getProcedures().stream().filter(p -> p instanceof TableSharedProcedure)
+ .map(p -> (TableSharedProcedure) p).forEach(p -> p.latch.release());
+ ((TableExclusiveProcedure) procExec.getProcedure(procId2)).latch.release();
+
+ UTIL.waitFor(10000, () -> procExec.isFinished(procId1));
+ UTIL.waitFor(10000, () -> procExec.isFinished(procId2));
+ }
+}