You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2018/08/04 03:24:52 UTC

hbase git commit: HBASE-21009 Backport to branch-2.0 HBASE-20739 Add priority for SCP Signed-off-by: Michael Stack

Repository: hbase
Updated Branches:
  refs/heads/branch-2.0 013ea3e3d -> ecabd9c32


HBASE-21009 Backport to branch-2.0 HBASE-20739 Add priority for SCP
Signed-off-by: Michael Stack <st...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ecabd9c3
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ecabd9c3
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ecabd9c3

Branch: refs/heads/branch-2.0
Commit: ecabd9c32ec5ce258df29058f3803fcc7fec35fd
Parents: 013ea3e
Author: zhangduo <zh...@apache.org>
Authored: Tue Jun 19 16:29:01 2018 +0800
Committer: Michael Stack <st...@apache.org>
Committed: Fri Aug 3 16:43:00 2018 -0700

----------------------------------------------------------------------
 .../hbase/master/procedure/FairQueue.java       |  6 +-
 .../procedure/MasterProcedureScheduler.java     | 59 +++++++++++++++++---
 .../master/procedure/MasterProcedureUtil.java   |  9 ++-
 .../hbase/master/procedure/SchemaLocking.java   |  4 ++
 .../hbase/master/procedure/ServerQueue.java     |  4 +-
 5 files changed, 65 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/ecabd9c3/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FairQueue.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FairQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FairQueue.java
index ac8e577..59a5ef8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FairQueue.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FairQueue.java
@@ -38,8 +38,10 @@ public class FairQueue<T extends Comparable<T>> {
       return;
     }
     // Find the one which priority is less than us
-    // For now only TableQueue has priority, and there are only a small number of tables which
-    // have higher priority so this will not be an expensive operation.
+    // For now only TableQueue and ServerQueue has priority. For TableQueue there are only a small
+    // number of tables which have higher priority, and for ServerQueue there is only one server
+    // which could carry meta which leads to a higher priority, so this will not be an expensive
+    // operation.
     Queue<T> base = queueHead;
     do {
       if (base.getPriority() < queue.getPriority()) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/ecabd9c3/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
index f981bde..8ff7fe9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
@@ -123,7 +123,8 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
     } else if (isTableProcedure(proc)) {
       doAdd(tableRunQueue, getTableQueue(getTableName(proc)), proc, addFront);
     } else if (isServerProcedure(proc)) {
-      doAdd(serverRunQueue, getServerQueue(getServerName(proc)), proc, addFront);
+      ServerProcedureInterface spi = (ServerProcedureInterface) proc;
+      doAdd(serverRunQueue, getServerQueue(spi.getServerName(), spi), proc, addFront);
     } else {
       // TODO: at the moment we only have Table and Server procedures
       // if you are implementing a non-table/non-server procedure, you have two options: create
@@ -301,8 +302,10 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
         markTableAsDeleted(iProcTable.getTableName(), proc);
         return;
       }
+    } else if (proc instanceof ServerProcedureInterface) {
+      tryCleanupServerQueue(getServerName(proc), proc);
     } else {
-      // No cleanup for ServerProcedureInterface types, yet.
+      // No cleanup for other procedure types, yet.
       return;
     }
   }
@@ -350,16 +353,52 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
   // ============================================================================
   //  Server Queue Lookup Helpers
   // ============================================================================
-  private ServerQueue getServerQueue(ServerName serverName) {
+  private ServerQueue getServerQueue(ServerName serverName, ServerProcedureInterface proc) {
     final int index = getBucketIndex(serverBuckets, serverName.hashCode());
     ServerQueue node = AvlTree.get(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR);
-    if (node != null) return node;
-
-    node = new ServerQueue(serverName, locking.getServerLock(serverName));
+    if (node != null) {
+      return node;
+    }
+    int priority;
+    if (proc != null) {
+      priority = MasterProcedureUtil.getServerPriority(proc);
+    } else {
+      LOG.warn("Usually this should not happen as proc can only be null when calling from " +
+        "wait/wake lock, which means at least we should have one procedure in the queue which " +
+        "wants to acquire the lock or just released the lock.");
+      priority = 1;
+    }
+    node = new ServerQueue(serverName, priority, locking.getServerLock(serverName));
     serverBuckets[index] = AvlTree.insert(serverBuckets[index], node);
     return node;
   }
 
+  private void removeServerQueue(ServerName serverName) {
+    int index = getBucketIndex(serverBuckets, serverName.hashCode());
+    serverBuckets[index] =
+      AvlTree.remove(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR);
+    locking.removeServerLock(serverName);
+  }
+
+  private void tryCleanupServerQueue(ServerName serverName, Procedure<?> proc) {
+    schedLock();
+    try {
+      int index = getBucketIndex(serverBuckets, serverName.hashCode());
+      ServerQueue node = AvlTree.get(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR);
+      if (node == null) {
+        return;
+      }
+
+      LockAndQueue lock = locking.getServerLock(serverName);
+      if (node.isEmpty() && lock.tryExclusiveLock(proc)) {
+        removeFromRunQueue(serverRunQueue, node);
+        removeServerQueue(serverName);
+      }
+    } finally {
+      schedUnlock();
+    }
+  }
+
   private static int getBucketIndex(Object[] buckets, int hashCode) {
     return Math.abs(hashCode) % buckets.length;
   }
@@ -749,7 +788,9 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
     try {
       final LockAndQueue lock = locking.getServerLock(serverName);
       if (lock.tryExclusiveLock(procedure)) {
-        removeFromRunQueue(serverRunQueue, getServerQueue(serverName));
+        // We do not need to create a new queue so just pass null, as in tests we may pass
+        // procedures other than ServerProcedureInterface
+        removeFromRunQueue(serverRunQueue, getServerQueue(serverName, null));
         return false;
       }
       waitProcedure(lock, procedure);
@@ -771,7 +812,9 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
     try {
       final LockAndQueue lock = locking.getServerLock(serverName);
       lock.releaseExclusiveLock(procedure);
-      addToRunQueue(serverRunQueue, getServerQueue(serverName));
+      // We do not need to create a new queue so just pass null, as in tests we may pass procedures
+      // other than ServerProcedureInterface
+      addToRunQueue(serverRunQueue, getServerQueue(serverName, null));
       int waitingCount = wakeWaitingProcedures(lock);
       wakePollIfNeeded(waitingCount);
     } finally {

http://git-wip-us.apache.org/repos/asf/hbase/blob/ecabd9c3/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureUtil.java
index 51e2452..587cc82 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureUtil.java
@@ -170,11 +170,10 @@ public final class MasterProcedureUtil {
   }
 
   /**
-   * Return the total levels of table priority. Now we have 3 levels, for meta table, other system
-   * tables and user tables. Notice that the actual value of priority should be decreased from this
-   * value down to 1.
+   * Return the priority for the given procedure. For now we only have two priorities, 100 for
+   * server carrying meta, and 1 for others.
    */
-  public static int getTablePriorityLevels() {
-    return 3;
+  public static int getServerPriority(ServerProcedureInterface proc) {
+    return proc.hasMetaTableRegion() ? 100 : 1;
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ecabd9c3/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SchemaLocking.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SchemaLocking.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SchemaLocking.java
index b0a7e99..71c6c90 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SchemaLocking.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SchemaLocking.java
@@ -93,6 +93,10 @@ class SchemaLocking {
     return getLock(serverLocks, serverName);
   }
 
+  LockAndQueue removeServerLock(ServerName serverName) {
+    return serverLocks.remove(serverName);
+  }
+
   private LockedResource createLockedResource(LockedResourceType resourceType, String resourceName,
       LockAndQueue queue) {
     LockType lockType;

http://git-wip-us.apache.org/repos/asf/hbase/blob/ecabd9c3/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java
index 5526f3b..3a1b3c4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java
@@ -25,8 +25,8 @@ import org.apache.yetus.audience.InterfaceAudience;
 @InterfaceAudience.Private
 class ServerQueue extends Queue<ServerName> {
 
-  public ServerQueue(ServerName serverName, LockStatus serverLock) {
-    super(serverName, serverLock);
+  public ServerQueue(ServerName serverName, int priority, LockStatus serverLock) {
+    super(serverName, priority, serverLock);
   }
 
   @Override