You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/02/15 05:50:16 UTC
hbase git commit: HBASE-20000 Remove the quantum logic in FairQueue,
always put high priority queue in front
Repository: hbase
Updated Branches:
refs/heads/master d0f2d18ca -> c18e7a963
HBASE-20000 Remove the quantum logic in FairQueue, always put high priority queue in front
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c18e7a96
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c18e7a96
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c18e7a96
Branch: refs/heads/master
Commit: c18e7a963d9c4dc862c4706f128a4e436111669c
Parents: d0f2d18
Author: zhangduo <zh...@apache.org>
Authored: Thu Feb 15 13:49:54 2018 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Thu Feb 15 13:49:54 2018 +0800
----------------------------------------------------------------------
.../org/apache/hadoop/hbase/util/AvlUtil.java | 14 +
.../hbase/master/procedure/FairQueue.java | 80 +++
.../master/procedure/MasterProcedureEnv.java | 2 +-
.../procedure/MasterProcedureScheduler.java | 560 +------------------
.../master/procedure/MasterProcedureUtil.java | 38 +-
.../hbase/master/procedure/PeerQueue.java | 54 ++
.../hadoop/hbase/master/procedure/Queue.java | 115 ++++
.../hbase/master/procedure/SchemaLocking.java | 214 +++++++
.../hbase/master/procedure/ServerQueue.java | 43 ++
.../hbase/master/procedure/TableQueue.java | 89 +++
...ProcedureSchedulerPerformanceEvaluation.java | 2 +-
.../procedure/TestMasterProcedureScheduler.java | 35 +-
...TestMasterProcedureSchedulerConcurrency.java | 6 +-
13 files changed, 676 insertions(+), 576 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/c18e7a96/hbase-common/src/main/java/org/apache/hadoop/hbase/util/AvlUtil.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/AvlUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/AvlUtil.java
index 7823360..6b6eaef 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/AvlUtil.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/AvlUtil.java
@@ -549,6 +549,20 @@ public final class AvlUtil {
}
/**
+ * @param head the head of the linked list
+ * @param base the node which we want to add the {@code node} before it
+ * @param node the node which we want to add it before the {@code base} node
+ */
+ public static <TNode extends AvlLinkedNode> TNode prepend(TNode head, TNode base, TNode node) {
+ assert !isLinked(node) : node + " is already linked";
+ node.iterNext = base;
+ node.iterPrev = base.iterPrev;
+ base.iterPrev.iterNext = node;
+ base.iterPrev = node;
+ return head == base ? node : head;
+ }
+
+ /**
* @param node the node to check
* @return true if the node is linked to a list, false otherwise
*/
http://git-wip-us.apache.org/repos/asf/hbase/blob/c18e7a96/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FairQueue.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FairQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FairQueue.java
new file mode 100644
index 0000000..ac8e577
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FairQueue.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.procedure;
+
+import org.apache.hadoop.hbase.util.AvlUtil.AvlIterableList;
+import org.apache.yetus.audience.InterfaceAudience;
+
+@InterfaceAudience.Private
+public class FairQueue<T extends Comparable<T>> {
+
+ private Queue<T> queueHead = null;
+ private int size = 0;
+
+ public boolean hasRunnables() {
+ return size > 0;
+ }
+
+ public void add(Queue<T> queue) {
+ // For normal priority queue, just append it to the tail
+ if (queueHead == null || queue.getPriority() == 1) {
+ queueHead = AvlIterableList.append(queueHead, queue);
+ size++;
+ return;
+ }
+ // Find the one which priority is less than us
+ // For now only TableQueue has priority, and there are only a small number of tables which
+ // have higher priority so this will not be an expensive operation.
+ Queue<T> base = queueHead;
+ do {
+ if (base.getPriority() < queue.getPriority()) {
+ queueHead = AvlIterableList.prepend(queueHead, base, queue);
+ size++;
+ return;
+ }
+ base = AvlIterableList.readNext(base);
+ } while (base != queueHead);
+ // no one is lower than us, append to the tail
+ queueHead = AvlIterableList.append(queueHead, queue);
+ size++;
+ }
+
+ public void remove(Queue<T> queue) {
+ queueHead = AvlIterableList.remove(queueHead, queue);
+ size--;
+ }
+
+ public Queue<T> poll() {
+ if (queueHead == null) {
+ return null;
+ }
+ Queue<T> q = queueHead;
+ do {
+ if (q.isAvailable()) {
+ if (q.getPriority() == 1) {
+ // for the normal priority queue, remove it and append it to the tail
+ queueHead = AvlIterableList.remove(queueHead, q);
+ queueHead = AvlIterableList.append(queueHead, q);
+ }
+ return q;
+ }
+ q = AvlIterableList.readNext(q);
+ } while (q != queueHead);
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c18e7a96/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java
index ae038a5..7fb187f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java
@@ -106,7 +106,7 @@ public class MasterProcedureEnv implements ConfigurationObserver {
public MasterProcedureEnv(final MasterServices master,
final RSProcedureDispatcher remoteDispatcher) {
this.master = master;
- this.procSched = new MasterProcedureScheduler(master.getConfiguration());
+ this.procSched = new MasterProcedureScheduler();
this.remoteDispatcher = remoteDispatcher;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c18e7a96/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
index 936540d..5cc9298 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
@@ -19,38 +19,27 @@
package org.apache.hadoop.hbase.master.procedure;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Arrays;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
-import java.util.function.Function;
-
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.master.locking.LockProcedure;
-import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface.PeerOperationType;
import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface.TableOperationType;
import org.apache.hadoop.hbase.procedure2.AbstractProcedureScheduler;
import org.apache.hadoop.hbase.procedure2.LockAndQueue;
-import org.apache.hadoop.hbase.procedure2.LockStatus;
-import org.apache.hadoop.hbase.procedure2.LockType;
import org.apache.hadoop.hbase.procedure2.LockedResource;
import org.apache.hadoop.hbase.procedure2.LockedResourceType;
import org.apache.hadoop.hbase.procedure2.Procedure;
-import org.apache.hadoop.hbase.procedure2.ProcedureDeque;
import org.apache.hadoop.hbase.util.AvlUtil.AvlIterableList;
import org.apache.hadoop.hbase.util.AvlUtil.AvlKeyComparator;
-import org.apache.hadoop.hbase.util.AvlUtil.AvlLinkedNode;
import org.apache.hadoop.hbase.util.AvlUtil.AvlTree;
import org.apache.hadoop.hbase.util.AvlUtil.AvlTreeIterator;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
/**
@@ -106,12 +95,12 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
public class MasterProcedureScheduler extends AbstractProcedureScheduler {
private static final Logger LOG = LoggerFactory.getLogger(MasterProcedureScheduler.class);
- private final static ServerQueueKeyComparator SERVER_QUEUE_KEY_COMPARATOR =
- new ServerQueueKeyComparator();
- private final static TableQueueKeyComparator TABLE_QUEUE_KEY_COMPARATOR =
- new TableQueueKeyComparator();
- private final static PeerQueueKeyComparator PEER_QUEUE_KEY_COMPARATOR =
- new PeerQueueKeyComparator();
+ private static final AvlKeyComparator<ServerQueue> SERVER_QUEUE_KEY_COMPARATOR =
+ (n, k) -> n.compareKey((ServerName) k);
+ private final static AvlKeyComparator<TableQueue> TABLE_QUEUE_KEY_COMPARATOR =
+ (n, k) -> n.compareKey((TableName) k);
+ private final static AvlKeyComparator<PeerQueue> PEER_QUEUE_KEY_COMPARATOR =
+ (n, k) -> n.compareKey((String) k);
private final FairQueue<ServerName> serverRunQueue = new FairQueue<>();
private final FairQueue<TableName> tableRunQueue = new FairQueue<>();
@@ -123,39 +112,6 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
private final SchemaLocking locking = new SchemaLocking();
- /**
- * Table priority is used when scheduling procedures from {@link #tableRunQueue}. A TableQueue
- * with priority 2 will get its procedures scheduled at twice the rate as compared to
- * TableQueue with priority 1. This should be enough to ensure system/meta get assigned out
- * before user-space tables. HBASE-18109 is where we conclude what is here is good enough.
- * Lets open new issue if we find it not enough.
- */
- private static class TablePriorities {
- final int metaTablePriority;
- final int userTablePriority;
- final int sysTablePriority;
-
- TablePriorities(Configuration conf) {
- metaTablePriority = conf.getInt("hbase.master.procedure.queue.meta.table.priority", 3);
- sysTablePriority = conf.getInt("hbase.master.procedure.queue.system.table.priority", 2);
- userTablePriority = conf.getInt("hbase.master.procedure.queue.user.table.priority", 1);
- }
-
- int getPriority(TableName tableName) {
- if (tableName.equals(TableName.META_TABLE_NAME)) {
- return metaTablePriority;
- } else if (tableName.isSystemTable()) {
- return sysTablePriority;
- }
- return userTablePriority;
- }
- }
- private final TablePriorities tablePriorities;
-
- public MasterProcedureScheduler(final Configuration conf) {
- tablePriorities = new TablePriorities(conf);
- }
-
@Override
public void yield(final Procedure proc) {
push(proc, isTableProcedure(proc), true);
@@ -216,13 +172,13 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
return pollResult;
}
- private <T extends Comparable<T>> Procedure doPoll(final FairQueue<T> fairq) {
+ private <T extends Comparable<T>> Procedure<?> doPoll(final FairQueue<T> fairq) {
final Queue<T> rq = fairq.poll();
if (rq == null || !rq.isAvailable()) {
return null;
}
- final Procedure pollResult = rq.peek();
+ final Procedure<?> pollResult = rq.peek();
if (pollResult == null) {
return null;
}
@@ -240,7 +196,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
// if the rq is in the fairq because of runnable child
// check if the next procedure is still a child.
// if not, remove the rq from the fairq and go back to the xlock state
- Procedure nextProc = rq.peek();
+ Procedure<?> nextProc = rq.peek();
if (nextProc != null && !Procedure.haveSameParent(nextProc, pollResult)) {
removeFromRunQueue(fairq, rq);
}
@@ -249,61 +205,11 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
return pollResult;
}
- private LockedResource createLockedResource(LockedResourceType resourceType,
- String resourceName, LockAndQueue queue) {
- LockType lockType;
- Procedure<?> exclusiveLockOwnerProcedure;
- int sharedLockCount;
-
- if (queue.hasExclusiveLock()) {
- lockType = LockType.EXCLUSIVE;
- exclusiveLockOwnerProcedure = queue.getExclusiveLockOwnerProcedure();
- sharedLockCount = 0;
- } else {
- lockType = LockType.SHARED;
- exclusiveLockOwnerProcedure = null;
- sharedLockCount = queue.getSharedLockCount();
- }
-
- List<Procedure<?>> waitingProcedures = new ArrayList<>();
-
- for (Procedure<?> procedure : queue) {
- if (!(procedure instanceof LockProcedure)) {
- continue;
- }
-
- waitingProcedures.add(procedure);
- }
-
- return new LockedResource(resourceType, resourceName, lockType,
- exclusiveLockOwnerProcedure, sharedLockCount, waitingProcedures);
- }
-
- private <T> void addToLockedResources(List<LockedResource> lockedResources,
- Map<T, LockAndQueue> locks, Function<T, String> keyTransformer,
- LockedResourceType resourcesType) {
- locks.entrySet().stream().filter(e -> e.getValue().isLocked())
- .map(
- e -> createLockedResource(resourcesType, keyTransformer.apply(e.getKey()), e.getValue()))
- .forEachOrdered(lockedResources::add);
- }
-
@Override
public List<LockedResource> getLocks() {
schedLock();
try {
- List<LockedResource> lockedResources = new ArrayList<>();
- addToLockedResources(lockedResources, locking.serverLocks, sn -> sn.getServerName(),
- LockedResourceType.SERVER);
- addToLockedResources(lockedResources, locking.namespaceLocks, Function.identity(),
- LockedResourceType.NAMESPACE);
- addToLockedResources(lockedResources, locking.tableLocks, tn -> tn.getNameAsString(),
- LockedResourceType.TABLE);
- addToLockedResources(lockedResources, locking.regionLocks, Function.identity(),
- LockedResourceType.REGION);
- addToLockedResources(lockedResources, locking.peerLocks, Function.identity(),
- LockedResourceType.PEER);
- return lockedResources;
+ return locking.getLocks();
} finally {
schedUnlock();
}
@@ -311,27 +217,9 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
@Override
public LockedResource getLockResource(LockedResourceType resourceType, String resourceName) {
- LockAndQueue queue = null;
schedLock();
try {
- switch (resourceType) {
- case SERVER:
- queue = locking.serverLocks.get(ServerName.valueOf(resourceName));
- break;
- case NAMESPACE:
- queue = locking.namespaceLocks.get(resourceName);
- break;
- case TABLE:
- queue = locking.tableLocks.get(TableName.valueOf(resourceName));
- break;
- case REGION:
- queue = locking.regionLocks.get(resourceName);
- break;
- case PEER:
- queue = locking.peerLocks.get(resourceName);
- break;
- }
- return queue != null ? createLockedResource(resourceType, resourceName, queue) : null;
+ return locking.getLockResource(resourceType, resourceName);
} finally {
schedUnlock();
}
@@ -348,7 +236,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
}
}
- protected void clearQueue() {
+ private void clearQueue() {
// Remove Servers
for (int i = 0; i < serverBuckets.length; ++i) {
clear(serverBuckets[i], serverRunQueue, SERVER_QUEUE_KEY_COMPARATOR);
@@ -450,7 +338,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
TableQueue node = AvlTree.get(tableMap, tableName, TABLE_QUEUE_KEY_COMPARATOR);
if (node != null) return node;
- node = new TableQueue(tableName, tablePriorities.getPriority(tableName),
+ node = new TableQueue(tableName, MasterProcedureUtil.getTablePriority(tableName),
locking.getTableLock(tableName), locking.getNamespaceLock(tableName.getNamespaceAsString()));
tableMap = AvlTree.insert(tableMap, node);
return node;
@@ -512,7 +400,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
locking.removePeerLock(peerId);
}
- private void tryCleanupPeerQueue(String peerId, Procedure procedure) {
+ private void tryCleanupPeerQueue(String peerId, Procedure<?> procedure) {
schedLock();
try {
PeerQueue queue = AvlTree.get(peerMap, peerId, PEER_QUEUE_KEY_COMPARATOR);
@@ -539,147 +427,12 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
}
// ============================================================================
- // Table and Server Queue Implementation
- // ============================================================================
- private static class ServerQueueKeyComparator implements AvlKeyComparator<ServerQueue> {
- @Override
- public int compareKey(ServerQueue node, Object key) {
- return node.compareKey((ServerName)key);
- }
- }
-
- public static class ServerQueue extends Queue<ServerName> {
- public ServerQueue(ServerName serverName, LockStatus serverLock) {
- super(serverName, serverLock);
- }
-
- @Override
- public boolean requireExclusiveLock(Procedure proc) {
- ServerProcedureInterface spi = (ServerProcedureInterface)proc;
- switch (spi.getServerOperationType()) {
- case CRASH_HANDLER:
- return true;
- default:
- break;
- }
- throw new UnsupportedOperationException("unexpected type " + spi.getServerOperationType());
- }
- }
-
- private static class TableQueueKeyComparator implements AvlKeyComparator<TableQueue> {
- @Override
- public int compareKey(TableQueue node, Object key) {
- return node.compareKey((TableName)key);
- }
- }
-
- public static class TableQueue extends Queue<TableName> {
- private final LockStatus namespaceLockStatus;
-
- public TableQueue(TableName tableName, int priority, LockStatus tableLock,
- LockStatus namespaceLockStatus) {
- super(tableName, priority, tableLock);
- this.namespaceLockStatus = namespaceLockStatus;
- }
-
- @Override
- public boolean isAvailable() {
- // if there are no items in the queue, or the namespace is locked.
- // we can't execute operation on this table
- if (isEmpty() || namespaceLockStatus.hasExclusiveLock()) {
- return false;
- }
-
- if (getLockStatus().hasExclusiveLock()) {
- // if we have an exclusive lock already taken
- // only child of the lock owner can be executed
- final Procedure nextProc = peek();
- return nextProc != null && getLockStatus().hasLockAccess(nextProc);
- }
-
- // no xlock
- return true;
- }
-
- @Override
- public boolean requireExclusiveLock(Procedure proc) {
- return requireTableExclusiveLock((TableProcedureInterface)proc);
- }
- }
-
- private static class PeerQueueKeyComparator implements AvlKeyComparator<PeerQueue> {
-
- @Override
- public int compareKey(PeerQueue node, Object key) {
- return node.compareKey((String) key);
- }
- }
-
- public static class PeerQueue extends Queue<String> {
-
- public PeerQueue(String peerId, LockStatus lockStatus) {
- super(peerId, lockStatus);
- }
-
- @Override
- public boolean requireExclusiveLock(Procedure proc) {
- return requirePeerExclusiveLock((PeerProcedureInterface) proc);
- }
-
- @Override
- public boolean isAvailable() {
- if (isEmpty()) {
- return false;
- }
- if (getLockStatus().hasExclusiveLock()) {
- // if we have an exclusive lock already taken
- // only child of the lock owner can be executed
- Procedure nextProc = peek();
- return nextProc != null && getLockStatus().hasLockAccess(nextProc);
- }
- return true;
- }
- }
-
- // ============================================================================
// Table Locking Helpers
// ============================================================================
/**
- * @param proc must not be null
- */
- private static boolean requireTableExclusiveLock(TableProcedureInterface proc) {
- switch (proc.getTableOperationType()) {
- case CREATE:
- case DELETE:
- case DISABLE:
- case ENABLE:
- return true;
- case EDIT:
- // we allow concurrent edit on the NS table
- return !proc.getTableName().equals(TableName.NAMESPACE_TABLE_NAME);
- case READ:
- return false;
- // region operations are using the shared-lock on the table
- // and then they will grab an xlock on the region.
- case REGION_SPLIT:
- case REGION_MERGE:
- case REGION_ASSIGN:
- case REGION_UNASSIGN:
- case REGION_EDIT:
- case REGION_GC:
- case MERGED_REGIONS_GC:
- return false;
- default:
- break;
- }
- throw new UnsupportedOperationException("unexpected type " +
- proc.getTableOperationType());
- }
-
- /**
* Get lock info for a resource of specified type and name and log details
*/
- protected void logLockedResource(LockedResourceType resourceType, String resourceName) {
+ private void logLockedResource(LockedResourceType resourceType, String resourceName) {
if (!LOG.isDebugEnabled()) {
return;
}
@@ -765,7 +518,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
return waitTableQueueSharedLock(procedure, table) == null;
}
- private TableQueue waitTableQueueSharedLock(final Procedure procedure, final TableName table) {
+ private TableQueue waitTableQueueSharedLock(final Procedure<?> procedure, final TableName table) {
schedLock();
try {
final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString());
@@ -821,7 +574,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
* other new operations pending for that table (e.g. a new create).
*/
@VisibleForTesting
- protected boolean markTableAsDeleted(final TableName table, final Procedure procedure) {
+ boolean markTableAsDeleted(final TableName table, final Procedure<?> procedure) {
schedLock();
try {
final TableQueue queue = getTableQueue(table);
@@ -1067,11 +820,6 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
// ============================================================================
// Peer Locking Helpers
// ============================================================================
-
- private static boolean requirePeerExclusiveLock(PeerProcedureInterface proc) {
- return proc.getPeerOperationType() != PeerOperationType.REFRESH;
- }
-
/**
* Try to acquire the exclusive lock on the specified peer.
* @see #wakePeerExclusiveLock(Procedure, String)
@@ -1114,279 +862,9 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
}
}
- // ============================================================================
- // Generic Helpers
- // ============================================================================
- private static abstract class Queue<TKey extends Comparable<TKey>>
- extends AvlLinkedNode<Queue<TKey>> {
-
- /**
- * @param proc must not be null
- */
- abstract boolean requireExclusiveLock(Procedure proc);
-
- private final TKey key;
- private final int priority;
- private final ProcedureDeque runnables = new ProcedureDeque();
- // Reference to status of lock on entity this queue represents.
- private final LockStatus lockStatus;
-
- public Queue(TKey key, LockStatus lockStatus) {
- this(key, 1, lockStatus);
- }
-
- public Queue(TKey key, int priority, LockStatus lockStatus) {
- this.key = key;
- this.priority = priority;
- this.lockStatus = lockStatus;
- }
-
- protected TKey getKey() {
- return key;
- }
-
- protected int getPriority() {
- return priority;
- }
-
- protected LockStatus getLockStatus() {
- return lockStatus;
- }
-
- // This should go away when we have the new AM and its events
- // and we move xlock to the lock-event-queue.
- public boolean isAvailable() {
- return !lockStatus.hasExclusiveLock() && !isEmpty();
- }
-
- // ======================================================================
- // Functions to handle procedure queue
- // ======================================================================
- public void add(final Procedure proc, final boolean addToFront) {
- if (addToFront) {
- runnables.addFirst(proc);
- } else {
- runnables.addLast(proc);
- }
- }
-
- public Procedure peek() {
- return runnables.peek();
- }
-
- public Procedure poll() {
- return runnables.poll();
- }
-
- public boolean isEmpty() {
- return runnables.isEmpty();
- }
-
- public int size() {
- return runnables.size();
- }
-
- // ======================================================================
- // Generic Helpers
- // ======================================================================
- public int compareKey(TKey cmpKey) {
- return key.compareTo(cmpKey);
- }
-
- @Override
- public int compareTo(Queue<TKey> other) {
- return compareKey(other.key);
- }
-
- @Override
- public String toString() {
- return String.format("%s(%s, xlock=%s sharedLock=%s size=%s)",
- getClass().getSimpleName(), key,
- lockStatus.hasExclusiveLock() ?
- "true (" + lockStatus.getExclusiveLockProcIdOwner() + ")" : "false",
- lockStatus.getSharedLockCount(), size());
- }
- }
-
- /**
- * Locks on namespaces, tables, and regions.
- * Since LockAndQueue implementation is NOT thread-safe, schedLock() guards all calls to these
- * locks.
- */
- private static class SchemaLocking {
- final Map<ServerName, LockAndQueue> serverLocks = new HashMap<>();
- final Map<String, LockAndQueue> namespaceLocks = new HashMap<>();
- final Map<TableName, LockAndQueue> tableLocks = new HashMap<>();
- // Single map for all regions irrespective of tables. Key is encoded region name.
- final Map<String, LockAndQueue> regionLocks = new HashMap<>();
- final Map<String, LockAndQueue> peerLocks = new HashMap<>();
-
- private <T> LockAndQueue getLock(Map<T, LockAndQueue> map, T key) {
- LockAndQueue lock = map.get(key);
- if (lock == null) {
- lock = new LockAndQueue();
- map.put(key, lock);
- }
- return lock;
- }
-
- LockAndQueue getTableLock(TableName tableName) {
- return getLock(tableLocks, tableName);
- }
-
- LockAndQueue removeTableLock(TableName tableName) {
- return tableLocks.remove(tableName);
- }
-
- LockAndQueue getNamespaceLock(String namespace) {
- return getLock(namespaceLocks, namespace);
- }
-
- LockAndQueue getRegionLock(String encodedRegionName) {
- return getLock(regionLocks, encodedRegionName);
- }
-
- LockAndQueue removeRegionLock(String encodedRegionName) {
- return regionLocks.remove(encodedRegionName);
- }
-
- LockAndQueue getServerLock(ServerName serverName) {
- return getLock(serverLocks, serverName);
- }
-
- LockAndQueue getPeerLock(String peerId) {
- return getLock(peerLocks, peerId);
- }
-
- LockAndQueue removePeerLock(String peerId) {
- return peerLocks.remove(peerId);
- }
-
- /**
- * Removes all locks by clearing the maps.
- * Used when procedure executor is stopped for failure and recovery testing.
- */
- @VisibleForTesting
- void clear() {
- serverLocks.clear();
- namespaceLocks.clear();
- tableLocks.clear();
- regionLocks.clear();
- peerLocks.clear();
- }
-
- @Override
- public String toString() {
- return "serverLocks=" + filterUnlocked(this.serverLocks) +
- ", namespaceLocks=" + filterUnlocked(this.namespaceLocks) +
- ", tableLocks=" + filterUnlocked(this.tableLocks) +
- ", regionLocks=" + filterUnlocked(this.regionLocks) +
- ", peerLocks=" + filterUnlocked(this.peerLocks);
- }
-
- private String filterUnlocked(Map<?, LockAndQueue> locks) {
- StringBuilder sb = new StringBuilder("{");
- int initialLength = sb.length();
- for (Map.Entry<?, LockAndQueue> entry: locks.entrySet()) {
- if (!entry.getValue().isLocked()) continue;
- if (sb.length() > initialLength) sb.append(", ");
- sb.append("{");
- sb.append(entry.getKey());
- sb.append("=");
- sb.append(entry.getValue());
- sb.append("}");
- }
- sb.append("}");
- return sb.toString();
- }
- }
-
- // ======================================================================
- // Helper Data Structures
- // ======================================================================
-
- private static class FairQueue<T extends Comparable<T>> {
- private final int quantum;
-
- private Queue<T> currentQueue = null;
- private Queue<T> queueHead = null;
- private int currentQuantum = 0;
- private int size = 0;
-
- public FairQueue() {
- this(1);
- }
-
- public FairQueue(int quantum) {
- this.quantum = quantum;
- }
-
- public boolean hasRunnables() {
- return size > 0;
- }
-
- public void add(Queue<T> queue) {
- queueHead = AvlIterableList.append(queueHead, queue);
- if (currentQueue == null) setNextQueue(queueHead);
- size++;
- }
-
- public void remove(Queue<T> queue) {
- Queue<T> nextQueue = AvlIterableList.readNext(queue);
- queueHead = AvlIterableList.remove(queueHead, queue);
- if (currentQueue == queue) {
- setNextQueue(queueHead != null ? nextQueue : null);
- }
- size--;
- }
-
- public Queue<T> poll() {
- if (currentQuantum == 0) {
- if (!nextQueue()) {
- return null; // nothing here
- }
- currentQuantum = calculateQuantum(currentQueue) - 1;
- } else {
- currentQuantum--;
- }
-
- // This should go away when we have the new AM and its events
- if (!currentQueue.isAvailable()) {
- Queue<T> lastQueue = currentQueue;
- do {
- if (!nextQueue())
- return null;
- } while (currentQueue != lastQueue && !currentQueue.isAvailable());
-
- currentQuantum = calculateQuantum(currentQueue) - 1;
- }
- return currentQueue;
- }
-
- private boolean nextQueue() {
- if (currentQueue == null) return false;
- currentQueue = AvlIterableList.readNext(currentQueue);
- return currentQueue != null;
- }
-
- private void setNextQueue(Queue<T> queue) {
- currentQueue = queue;
- if (queue != null) {
- currentQuantum = calculateQuantum(currentQueue);
- } else {
- currentQuantum = 0;
- }
- }
-
- private int calculateQuantum(final Queue queue) {
- return Math.max(1, queue.getPriority() * quantum); // TODO
- }
- }
-
/**
* For debugging. Expensive.
- * @throws IOException
- */
+ */
@VisibleForTesting
public String dumpLocks() throws IOException {
schedLock();
http://git-wip-us.apache.org/repos/asf/hbase/blob/c18e7a96/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureUtil.java
index 4afd711..51e2452 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureUtil.java
@@ -15,28 +15,25 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.hadoop.hbase.master.procedure;
import java.io.IOException;
import java.util.regex.Pattern;
-
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.yetus.audience.InterfaceStability;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.NonceKey;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation;
@InterfaceAudience.Private
@InterfaceStability.Evolving
public final class MasterProcedureUtil {
- private static final Logger LOG = LoggerFactory.getLogger(MasterProcedureUtil.class);
private MasterProcedureUtil() {}
@@ -102,7 +99,7 @@ public final class MasterProcedureUtil {
protected abstract void run() throws IOException;
protected abstract String getDescription();
- protected long submitProcedure(final Procedure proc) {
+ protected long submitProcedure(final Procedure<?> proc) {
assert procId == null : "submitProcedure() was already called, running procId=" + procId;
procId = getProcedureExecutor().submitProcedure(proc, nonceKey);
return procId;
@@ -157,4 +154,27 @@ public final class MasterProcedureUtil {
public static boolean validateProcedureWALFilename(String filename) {
return pattern.matcher(filename).matches();
}
+
+ /**
+ * Return the priority for the given table. Now meta table is 3, other system tables are 2, and
+ * user tables are 1.
+ */
+ public static int getTablePriority(TableName tableName) {
+ if (TableName.isMetaTableName(tableName)) {
+ return 3;
+ } else if (tableName.isSystemTable()) {
+ return 2;
+ } else {
+ return 1;
+ }
+ }
+
+ /**
+ * Return the total levels of table priority. Now we have 3 levels, for meta table, other system
+ * tables and user tables. Notice that the actual value of priority should be decreased from this
+ * value down to 1.
+ */
+ public static int getTablePriorityLevels() {
+ return 3;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c18e7a96/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerQueue.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerQueue.java
new file mode 100644
index 0000000..1ae0c2f
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerQueue.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.procedure;
+
+import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface.PeerOperationType;
+import org.apache.hadoop.hbase.procedure2.LockStatus;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.yetus.audience.InterfaceAudience;
+
+@InterfaceAudience.Private
+class PeerQueue extends Queue<String> {
+
+ public PeerQueue(String peerId, LockStatus lockStatus) {
+ super(peerId, lockStatus);
+ }
+
+ @Override
+ public boolean isAvailable() {
+ if (isEmpty()) {
+ return false;
+ }
+ if (getLockStatus().hasExclusiveLock()) {
+ // if we have an exclusive lock already taken
+ // only child of the lock owner can be executed
+ Procedure<?> nextProc = peek();
+ return nextProc != null && getLockStatus().hasLockAccess(nextProc);
+ }
+ return true;
+ }
+
+ @Override
+ public boolean requireExclusiveLock(Procedure<?> proc) {
+ return requirePeerExclusiveLock((PeerProcedureInterface) proc);
+ }
+
+ private static boolean requirePeerExclusiveLock(PeerProcedureInterface proc) {
+ return proc.getPeerOperationType() != PeerOperationType.REFRESH;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c18e7a96/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/Queue.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/Queue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/Queue.java
new file mode 100644
index 0000000..f7bea2a
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/Queue.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.procedure;
+
+import org.apache.hadoop.hbase.procedure2.LockStatus;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureDeque;
+import org.apache.hadoop.hbase.util.AvlUtil.AvlLinkedNode;
+import org.apache.yetus.audience.InterfaceAudience;
+
+@InterfaceAudience.Private
+abstract class Queue<TKey extends Comparable<TKey>> extends AvlLinkedNode<Queue<TKey>> {
+
+ /**
+ * @param proc must not be null
+ */
+ abstract boolean requireExclusiveLock(Procedure<?> proc);
+
+ private final TKey key;
+ private final int priority;
+ private final ProcedureDeque runnables = new ProcedureDeque();
+ // Reference to status of lock on entity this queue represents.
+ private final LockStatus lockStatus;
+
+ protected Queue(TKey key, LockStatus lockStatus) {
+ this(key, 1, lockStatus);
+ }
+
+ protected Queue(TKey key, int priority, LockStatus lockStatus) {
+ assert priority >= 1 : "priority must be greater than or equal to 1";
+ this.key = key;
+ this.priority = priority;
+ this.lockStatus = lockStatus;
+ }
+
+ protected TKey getKey() {
+ return key;
+ }
+
+ public int getPriority() {
+ return priority;
+ }
+
+ protected LockStatus getLockStatus() {
+ return lockStatus;
+ }
+
+ // This should go away when we have the new AM and its events
+ // and we move xlock to the lock-event-queue.
+ public boolean isAvailable() {
+ return !lockStatus.hasExclusiveLock() && !isEmpty();
+ }
+
+ // ======================================================================
+ // Functions to handle procedure queue
+ // ======================================================================
+ public void add(Procedure<?> proc, boolean addToFront) {
+ if (addToFront) {
+ runnables.addFirst(proc);
+ } else {
+ runnables.addLast(proc);
+ }
+ }
+
+ public Procedure<?> peek() {
+ return runnables.peek();
+ }
+
+ public Procedure<?> poll() {
+ return runnables.poll();
+ }
+
+ public boolean isEmpty() {
+ return runnables.isEmpty();
+ }
+
+ public int size() {
+ return runnables.size();
+ }
+
+ // ======================================================================
+ // Generic Helpers
+ // ======================================================================
+ public int compareKey(TKey cmpKey) {
+ return key.compareTo(cmpKey);
+ }
+
+ @Override
+ public int compareTo(Queue<TKey> other) {
+ return compareKey(other.key);
+ }
+
+ @Override
+ public String toString() {
+ return String.format("%s(%s, xlock=%s sharedLock=%s size=%s)", getClass().getSimpleName(), key,
+ lockStatus.hasExclusiveLock() ? "true (" + lockStatus.getExclusiveLockProcIdOwner() + ")"
+ : "false",
+ lockStatus.getSharedLockCount(), size());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c18e7a96/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SchemaLocking.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SchemaLocking.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SchemaLocking.java
new file mode 100644
index 0000000..5dcc121
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SchemaLocking.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.procedure;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.master.locking.LockProcedure;
+import org.apache.hadoop.hbase.procedure2.LockAndQueue;
+import org.apache.hadoop.hbase.procedure2.LockType;
+import org.apache.hadoop.hbase.procedure2.LockedResource;
+import org.apache.hadoop.hbase.procedure2.LockedResourceType;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * <p>
+ * Locks on namespaces, tables, and regions.
+ * </p>
+ * <p>
+ * Since LockAndQueue implementation is NOT thread-safe, schedLock() guards all calls to these
+ * locks.
+ * </p>
+ */
+@InterfaceAudience.Private
+class SchemaLocking {
+ private final Map<ServerName, LockAndQueue> serverLocks = new HashMap<>();
+ private final Map<String, LockAndQueue> namespaceLocks = new HashMap<>();
+ private final Map<TableName, LockAndQueue> tableLocks = new HashMap<>();
+ // Single map for all regions irrespective of tables. Key is encoded region name.
+ private final Map<String, LockAndQueue> regionLocks = new HashMap<>();
+ private final Map<String, LockAndQueue> peerLocks = new HashMap<>();
+
+ private <T> LockAndQueue getLock(Map<T, LockAndQueue> map, T key) {
+ LockAndQueue lock = map.get(key);
+ if (lock == null) {
+ lock = new LockAndQueue();
+ map.put(key, lock);
+ }
+ return lock;
+ }
+
+ LockAndQueue getTableLock(TableName tableName) {
+ return getLock(tableLocks, tableName);
+ }
+
+ LockAndQueue removeTableLock(TableName tableName) {
+ return tableLocks.remove(tableName);
+ }
+
+ LockAndQueue getNamespaceLock(String namespace) {
+ return getLock(namespaceLocks, namespace);
+ }
+
+ LockAndQueue getRegionLock(String encodedRegionName) {
+ return getLock(regionLocks, encodedRegionName);
+ }
+
+ LockAndQueue removeRegionLock(String encodedRegionName) {
+ return regionLocks.remove(encodedRegionName);
+ }
+
+ LockAndQueue getServerLock(ServerName serverName) {
+ return getLock(serverLocks, serverName);
+ }
+
+ LockAndQueue getPeerLock(String peerId) {
+ return getLock(peerLocks, peerId);
+ }
+
+ LockAndQueue removePeerLock(String peerId) {
+ return peerLocks.remove(peerId);
+ }
+
+ private LockedResource createLockedResource(LockedResourceType resourceType, String resourceName,
+ LockAndQueue queue) {
+ LockType lockType;
+ Procedure<?> exclusiveLockOwnerProcedure;
+ int sharedLockCount;
+
+ if (queue.hasExclusiveLock()) {
+ lockType = LockType.EXCLUSIVE;
+ exclusiveLockOwnerProcedure = queue.getExclusiveLockOwnerProcedure();
+ sharedLockCount = 0;
+ } else {
+ lockType = LockType.SHARED;
+ exclusiveLockOwnerProcedure = null;
+ sharedLockCount = queue.getSharedLockCount();
+ }
+
+ List<Procedure<?>> waitingProcedures = new ArrayList<>();
+
+ for (Procedure<?> procedure : queue) {
+ if (!(procedure instanceof LockProcedure)) {
+ continue;
+ }
+
+ waitingProcedures.add(procedure);
+ }
+
+ return new LockedResource(resourceType, resourceName, lockType, exclusiveLockOwnerProcedure,
+ sharedLockCount, waitingProcedures);
+ }
+
+ private <T> void addToLockedResources(List<LockedResource> lockedResources,
+ Map<T, LockAndQueue> locks, Function<T, String> keyTransformer,
+ LockedResourceType resourcesType) {
+ locks.entrySet().stream().filter(e -> e.getValue().isLocked())
+ .map(e -> createLockedResource(resourcesType, keyTransformer.apply(e.getKey()), e.getValue()))
+ .forEachOrdered(lockedResources::add);
+ }
+
+ /**
+ * List lock queues.
+ * @return the locks
+ */
+ List<LockedResource> getLocks() {
+ List<LockedResource> lockedResources = new ArrayList<>();
+ addToLockedResources(lockedResources, serverLocks, sn -> sn.getServerName(),
+ LockedResourceType.SERVER);
+ addToLockedResources(lockedResources, namespaceLocks, Function.identity(),
+ LockedResourceType.NAMESPACE);
+ addToLockedResources(lockedResources, tableLocks, tn -> tn.getNameAsString(),
+ LockedResourceType.TABLE);
+ addToLockedResources(lockedResources, regionLocks, Function.identity(),
+ LockedResourceType.REGION);
+ addToLockedResources(lockedResources, peerLocks, Function.identity(), LockedResourceType.PEER);
+ return lockedResources;
+ }
+
+ /**
+ * @return {@link LockedResource} for resource of specified type & name. null if resource is not
+ * locked.
+ */
+ LockedResource getLockResource(LockedResourceType resourceType, String resourceName) {
+ LockAndQueue queue;
+ switch (resourceType) {
+ case SERVER:
+ queue = serverLocks.get(ServerName.valueOf(resourceName));
+ break;
+ case NAMESPACE:
+ queue = namespaceLocks.get(resourceName);
+ break;
+ case TABLE:
+ queue = tableLocks.get(TableName.valueOf(resourceName));
+ break;
+ case REGION:
+ queue = regionLocks.get(resourceName);
+ break;
+ case PEER:
+ queue = peerLocks.get(resourceName);
+ break;
+ default:
+ queue = null;
+ break;
+ }
+ return queue != null ? createLockedResource(resourceType, resourceName, queue) : null;
+ }
+
+ /**
+ * Removes all locks by clearing the maps. Used when procedure executor is stopped for failure and
+ * recovery testing.
+ */
+ void clear() {
+ serverLocks.clear();
+ namespaceLocks.clear();
+ tableLocks.clear();
+ regionLocks.clear();
+ peerLocks.clear();
+ }
+
+ @Override
+ public String toString() {
+ return "serverLocks=" + filterUnlocked(this.serverLocks) + ", namespaceLocks=" +
+ filterUnlocked(this.namespaceLocks) + ", tableLocks=" + filterUnlocked(this.tableLocks) +
+ ", regionLocks=" + filterUnlocked(this.regionLocks) + ", peerLocks=" +
+ filterUnlocked(this.peerLocks);
+ }
+
+ private String filterUnlocked(Map<?, LockAndQueue> locks) {
+ StringBuilder sb = new StringBuilder("{");
+ int initialLength = sb.length();
+ for (Map.Entry<?, LockAndQueue> entry : locks.entrySet()) {
+ if (!entry.getValue().isLocked()) {
+ continue;
+ }
+ if (sb.length() > initialLength) {
+ sb.append(", ");
+ }
+ sb.append("{").append(entry.getKey()).append("=").append(entry.getValue()).append("}");
+ }
+ sb.append("}");
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c18e7a96/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java
new file mode 100644
index 0000000..5526f3b
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.procedure;
+
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.procedure2.LockStatus;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.yetus.audience.InterfaceAudience;
+
+@InterfaceAudience.Private
+class ServerQueue extends Queue<ServerName> {
+
+ public ServerQueue(ServerName serverName, LockStatus serverLock) {
+ super(serverName, serverLock);
+ }
+
+ @Override
+ public boolean requireExclusiveLock(Procedure<?> proc) {
+ ServerProcedureInterface spi = (ServerProcedureInterface) proc;
+ switch (spi.getServerOperationType()) {
+ case CRASH_HANDLER:
+ return true;
+ default:
+ break;
+ }
+ throw new UnsupportedOperationException("unexpected type " + spi.getServerOperationType());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c18e7a96/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableQueue.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableQueue.java
new file mode 100644
index 0000000..106dfc3
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableQueue.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.procedure;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.procedure2.LockStatus;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.yetus.audience.InterfaceAudience;
+
+@InterfaceAudience.Private
+class TableQueue extends Queue<TableName> {
+ private final LockStatus namespaceLockStatus;
+
+ public TableQueue(TableName tableName, int priority, LockStatus tableLock,
+ LockStatus namespaceLockStatus) {
+ super(tableName, priority, tableLock);
+ this.namespaceLockStatus = namespaceLockStatus;
+ }
+
+ @Override
+ public boolean isAvailable() {
+ // if there are no items in the queue, or the namespace is locked.
+ // we can't execute operation on this table
+ if (isEmpty() || namespaceLockStatus.hasExclusiveLock()) {
+ return false;
+ }
+
+ if (getLockStatus().hasExclusiveLock()) {
+ // if we have an exclusive lock already taken
+ // only child of the lock owner can be executed
+ final Procedure<?> nextProc = peek();
+ return nextProc != null && getLockStatus().hasLockAccess(nextProc);
+ }
+
+ // no xlock
+ return true;
+ }
+
+ @Override
+ public boolean requireExclusiveLock(Procedure<?> proc) {
+ return requireTableExclusiveLock((TableProcedureInterface) proc);
+ }
+
+ /**
+ * @param proc must not be null
+ */
+ private static boolean requireTableExclusiveLock(TableProcedureInterface proc) {
+ switch (proc.getTableOperationType()) {
+ case CREATE:
+ case DELETE:
+ case DISABLE:
+ case ENABLE:
+ return true;
+ case EDIT:
+ // we allow concurrent edit on the NS table
+ return !proc.getTableName().equals(TableName.NAMESPACE_TABLE_NAME);
+ case READ:
+ return false;
+ // region operations are using the shared-lock on the table
+ // and then they will grab an xlock on the region.
+ case REGION_SPLIT:
+ case REGION_MERGE:
+ case REGION_ASSIGN:
+ case REGION_UNASSIGN:
+ case REGION_EDIT:
+ case REGION_GC:
+ case MERGED_REGIONS_GC:
+ return false;
+ default:
+ break;
+ }
+ throw new UnsupportedOperationException("unexpected type " + proc.getTableOperationType());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c18e7a96/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureSchedulerPerformanceEvaluation.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureSchedulerPerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureSchedulerPerformanceEvaluation.java
index e5d3a79..d86d083 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureSchedulerPerformanceEvaluation.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureSchedulerPerformanceEvaluation.java
@@ -256,7 +256,7 @@ public class MasterProcedureSchedulerPerformanceEvaluation extends AbstractHBase
@Override
protected int doWork() throws Exception {
- procedureScheduler = new MasterProcedureScheduler(UTIL.getConfiguration());
+ procedureScheduler = new MasterProcedureScheduler();
procedureScheduler.start();
setupOperations();
http://git-wip-us.apache.org/repos/asf/hbase/blob/c18e7a96/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java
index 05bb637..65757db 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java
@@ -17,16 +17,17 @@
*/
package org.apache.hadoop.hbase.master.procedure;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.List;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
@@ -60,15 +61,13 @@ public class TestMasterProcedureScheduler {
private static final Logger LOG = LoggerFactory.getLogger(TestMasterProcedureScheduler.class);
private MasterProcedureScheduler queue;
- private Configuration conf;
@Rule
public TestName name = new TestName();
@Before
public void setUp() throws IOException {
- conf = HBaseConfiguration.create();
- queue = new MasterProcedureScheduler(conf);
+ queue = new MasterProcedureScheduler();
queue.start();
}
@@ -283,26 +282,24 @@ public class TestMasterProcedureScheduler {
// Fetch the 1st item and take the write lock
Procedure procNs1 = queue.poll();
assertEquals(1, procNs1.getProcId());
- assertEquals(false, queue.waitNamespaceExclusiveLock(procNs1, nsName1));
+ assertFalse(queue.waitNamespaceExclusiveLock(procNs1, nsName1));
- // System tables have 2 as default priority
+ // namespace table has higher priority so we still return procedure for it
Procedure procNs2 = queue.poll();
assertEquals(4, procNs2.getProcId());
- assertEquals(false, queue.waitNamespaceExclusiveLock(procNs2, nsName2));
+ assertFalse(queue.waitNamespaceExclusiveLock(procNs2, nsName2));
queue.wakeNamespaceExclusiveLock(procNs2, nsName2);
// add procNs2 back in the queue
queue.yield(procNs2);
- // table on ns1 is locked, so we get table on ns2
+ // again
procNs2 = queue.poll();
- assertEquals(3, procNs2.getProcId());
- assertEquals(false, queue.waitTableExclusiveLock(procNs2, tableName2));
+ assertEquals(4, procNs2.getProcId());
+ assertFalse(queue.waitNamespaceExclusiveLock(procNs2, nsName2));
- // ns2 is not available (TODO we may avoid this one)
- Procedure procNs2b = queue.poll();
- assertEquals(4, procNs2b.getProcId());
- assertEquals(true, queue.waitNamespaceExclusiveLock(procNs2b, nsName2));
+ // ns1 and ns2 are both locked so we get nothing
+ assertNull(queue.poll());
// release the ns1 lock
queue.wakeNamespaceExclusiveLock(procNs1, nsName1);
@@ -312,11 +309,11 @@ public class TestMasterProcedureScheduler {
assertEquals(2, procId);
// release ns2
- queue.wakeTableExclusiveLock(procNs2, tableName2);
+ queue.wakeNamespaceExclusiveLock(procNs2, nsName2);
- // we are now able to execute ns2
+ // we are now able to execute table of ns2
procId = queue.poll().getProcId();
- assertEquals(4, procId);
+ assertEquals(3, procId);
}
@Test
http://git-wip-us.apache.org/repos/asf/hbase/blob/c18e7a96/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureSchedulerConcurrency.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureSchedulerConcurrency.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureSchedulerConcurrency.java
index 9c5b602..1313cdb 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureSchedulerConcurrency.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureSchedulerConcurrency.java
@@ -24,9 +24,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface.PeerOperationType;
import org.apache.hadoop.hbase.master.procedure.TestMasterProcedureScheduler.TestPeerProcedure;
@@ -53,12 +51,10 @@ public class TestMasterProcedureSchedulerConcurrency {
LoggerFactory.getLogger(TestMasterProcedureSchedulerConcurrency.class);
private MasterProcedureScheduler queue;
- private Configuration conf;
@Before
public void setUp() throws IOException {
- conf = HBaseConfiguration.create();
- queue = new MasterProcedureScheduler(conf);
+ queue = new MasterProcedureScheduler();
queue.start();
}