You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2018/08/04 03:24:52 UTC
hbase git commit: HBASE-21009 Backport to branch-2.0 HBASE-20739 Add
priority for SCP Signed-off-by: Michael Stack
Repository: hbase
Updated Branches:
refs/heads/branch-2.0 013ea3e3d -> ecabd9c32
HBASE-21009 Backport to branch-2.0 HBASE-20739 Add priority for SCP
Signed-off-by: Michael Stack <st...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ecabd9c3
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ecabd9c3
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ecabd9c3
Branch: refs/heads/branch-2.0
Commit: ecabd9c32ec5ce258df29058f3803fcc7fec35fd
Parents: 013ea3e
Author: zhangduo <zh...@apache.org>
Authored: Tue Jun 19 16:29:01 2018 +0800
Committer: Michael Stack <st...@apache.org>
Committed: Fri Aug 3 16:43:00 2018 -0700
----------------------------------------------------------------------
.../hbase/master/procedure/FairQueue.java | 6 +-
.../procedure/MasterProcedureScheduler.java | 59 +++++++++++++++++---
.../master/procedure/MasterProcedureUtil.java | 9 ++-
.../hbase/master/procedure/SchemaLocking.java | 4 ++
.../hbase/master/procedure/ServerQueue.java | 4 +-
5 files changed, 65 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/ecabd9c3/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FairQueue.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FairQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FairQueue.java
index ac8e577..59a5ef8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FairQueue.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FairQueue.java
@@ -38,8 +38,10 @@ public class FairQueue<T extends Comparable<T>> {
return;
}
// Find the one which priority is less than us
- // For now only TableQueue has priority, and there are only a small number of tables which
- // have higher priority so this will not be an expensive operation.
+ // For now only TableQueue and ServerQueue has priority. For TableQueue there are only a small
+ // number of tables which have higher priority, and for ServerQueue there is only one server
+ // which could carry meta which leads to a higher priority, so this will not be an expensive
+ // operation.
Queue<T> base = queueHead;
do {
if (base.getPriority() < queue.getPriority()) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/ecabd9c3/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
index f981bde..8ff7fe9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
@@ -123,7 +123,8 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
} else if (isTableProcedure(proc)) {
doAdd(tableRunQueue, getTableQueue(getTableName(proc)), proc, addFront);
} else if (isServerProcedure(proc)) {
- doAdd(serverRunQueue, getServerQueue(getServerName(proc)), proc, addFront);
+ ServerProcedureInterface spi = (ServerProcedureInterface) proc;
+ doAdd(serverRunQueue, getServerQueue(spi.getServerName(), spi), proc, addFront);
} else {
// TODO: at the moment we only have Table and Server procedures
// if you are implementing a non-table/non-server procedure, you have two options: create
@@ -301,8 +302,10 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
markTableAsDeleted(iProcTable.getTableName(), proc);
return;
}
+ } else if (proc instanceof ServerProcedureInterface) {
+ tryCleanupServerQueue(getServerName(proc), proc);
} else {
- // No cleanup for ServerProcedureInterface types, yet.
+ // No cleanup for other procedure types, yet.
return;
}
}
@@ -350,16 +353,52 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
// ============================================================================
// Server Queue Lookup Helpers
// ============================================================================
- private ServerQueue getServerQueue(ServerName serverName) {
+ private ServerQueue getServerQueue(ServerName serverName, ServerProcedureInterface proc) {
final int index = getBucketIndex(serverBuckets, serverName.hashCode());
ServerQueue node = AvlTree.get(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR);
- if (node != null) return node;
-
- node = new ServerQueue(serverName, locking.getServerLock(serverName));
+ if (node != null) {
+ return node;
+ }
+ int priority;
+ if (proc != null) {
+ priority = MasterProcedureUtil.getServerPriority(proc);
+ } else {
+ LOG.warn("Usually this should not happen as proc can only be null when calling from " +
+ "wait/wake lock, which means at least we should have one procedure in the queue which " +
+ "wants to acquire the lock or just released the lock.");
+ priority = 1;
+ }
+ node = new ServerQueue(serverName, priority, locking.getServerLock(serverName));
serverBuckets[index] = AvlTree.insert(serverBuckets[index], node);
return node;
}
+ private void removeServerQueue(ServerName serverName) {
+ int index = getBucketIndex(serverBuckets, serverName.hashCode());
+ serverBuckets[index] =
+ AvlTree.remove(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR);
+ locking.removeServerLock(serverName);
+ }
+
+ private void tryCleanupServerQueue(ServerName serverName, Procedure<?> proc) {
+ schedLock();
+ try {
+ int index = getBucketIndex(serverBuckets, serverName.hashCode());
+ ServerQueue node = AvlTree.get(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR);
+ if (node == null) {
+ return;
+ }
+
+ LockAndQueue lock = locking.getServerLock(serverName);
+ if (node.isEmpty() && lock.tryExclusiveLock(proc)) {
+ removeFromRunQueue(serverRunQueue, node);
+ removeServerQueue(serverName);
+ }
+ } finally {
+ schedUnlock();
+ }
+ }
+
private static int getBucketIndex(Object[] buckets, int hashCode) {
return Math.abs(hashCode) % buckets.length;
}
@@ -749,7 +788,9 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
try {
final LockAndQueue lock = locking.getServerLock(serverName);
if (lock.tryExclusiveLock(procedure)) {
- removeFromRunQueue(serverRunQueue, getServerQueue(serverName));
+ // We do not need to create a new queue so just pass null, as in tests we may pass
+ // procedures other than ServerProcedureInterface
+ removeFromRunQueue(serverRunQueue, getServerQueue(serverName, null));
return false;
}
waitProcedure(lock, procedure);
@@ -771,7 +812,9 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
try {
final LockAndQueue lock = locking.getServerLock(serverName);
lock.releaseExclusiveLock(procedure);
- addToRunQueue(serverRunQueue, getServerQueue(serverName));
+ // We do not need to create a new queue so just pass null, as in tests we may pass procedures
+ // other than ServerProcedureInterface
+ addToRunQueue(serverRunQueue, getServerQueue(serverName, null));
int waitingCount = wakeWaitingProcedures(lock);
wakePollIfNeeded(waitingCount);
} finally {
http://git-wip-us.apache.org/repos/asf/hbase/blob/ecabd9c3/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureUtil.java
index 51e2452..587cc82 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureUtil.java
@@ -170,11 +170,10 @@ public final class MasterProcedureUtil {
}
/**
- * Return the total levels of table priority. Now we have 3 levels, for meta table, other system
- * tables and user tables. Notice that the actual value of priority should be decreased from this
- * value down to 1.
+ * Return the priority for the given procedure. For now we only have two priorities, 100 for
+ * server carrying meta, and 1 for others.
*/
- public static int getTablePriorityLevels() {
- return 3;
+ public static int getServerPriority(ServerProcedureInterface proc) {
+ return proc.hasMetaTableRegion() ? 100 : 1;
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/ecabd9c3/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SchemaLocking.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SchemaLocking.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SchemaLocking.java
index b0a7e99..71c6c90 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SchemaLocking.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SchemaLocking.java
@@ -93,6 +93,10 @@ class SchemaLocking {
return getLock(serverLocks, serverName);
}
+ LockAndQueue removeServerLock(ServerName serverName) {
+ return serverLocks.remove(serverName);
+ }
+
private LockedResource createLockedResource(LockedResourceType resourceType, String resourceName,
LockAndQueue queue) {
LockType lockType;
http://git-wip-us.apache.org/repos/asf/hbase/blob/ecabd9c3/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java
index 5526f3b..3a1b3c4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java
@@ -25,8 +25,8 @@ import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
class ServerQueue extends Queue<ServerName> {
- public ServerQueue(ServerName serverName, LockStatus serverLock) {
- super(serverName, serverLock);
+ public ServerQueue(ServerName serverName, int priority, LockStatus serverLock) {
+ super(serverName, priority, serverLock);
}
@Override