You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2023/05/15 14:54:05 UTC

[hbase] 05/16: HBASE-27392 Add a new procedure type for implementing some global operations such as migration (#4803)

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit c01c8e45b479897ce13cdbe690b7b3a30f5a948b
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Thu Sep 29 10:08:02 2022 +0800

    HBASE-27392 Add a new procedure type for implementing some global operations such as migration (#4803)
    
    Signed-off-by: Xin Sun <dd...@gmail.com>
---
 .../hbase/procedure2/LockedResourceType.java       |   3 +-
 .../master/procedure/GlobalProcedureInterface.java |  15 ++-
 .../hadoop/hbase/master/procedure/GlobalQueue.java |  21 ++--
 .../master/procedure/MasterProcedureScheduler.java | 119 ++++++++++++++++++++-
 .../hbase/master/procedure/SchemaLocking.java      |  18 +++-
 .../procedure/TestMasterProcedureScheduler.java    |  48 +++++++++
 6 files changed, 202 insertions(+), 22 deletions(-)

diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockedResourceType.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockedResourceType.java
index 12f899d7565..40141017009 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockedResourceType.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockedResourceType.java
@@ -26,5 +26,6 @@ public enum LockedResourceType {
   TABLE,
   REGION,
   PEER,
-  META
+  META,
+  GLOBAL
 }
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockedResourceType.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/GlobalProcedureInterface.java
similarity index 82%
copy from hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockedResourceType.java
copy to hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/GlobalProcedureInterface.java
index 12f899d7565..1ef168abfd8 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockedResourceType.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/GlobalProcedureInterface.java
@@ -15,16 +15,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hbase.procedure2;
+package org.apache.hadoop.hbase.master.procedure;
 
 import org.apache.yetus.audience.InterfaceAudience;
 
+/**
+ * Procedure interface for global operations, such as migration.
+ */
 @InterfaceAudience.Private
-public enum LockedResourceType {
-  SERVER,
-  NAMESPACE,
-  TABLE,
-  REGION,
-  PEER,
-  META
+public interface GlobalProcedureInterface {
+
+  String getGlobalId();
 }
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockedResourceType.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/GlobalQueue.java
similarity index 69%
copy from hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockedResourceType.java
copy to hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/GlobalQueue.java
index 12f899d7565..1633dc4856e 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockedResourceType.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/GlobalQueue.java
@@ -15,16 +15,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hbase.procedure2;
+package org.apache.hadoop.hbase.master.procedure;
 
+import org.apache.hadoop.hbase.procedure2.LockStatus;
+import org.apache.hadoop.hbase.procedure2.Procedure;
 import org.apache.yetus.audience.InterfaceAudience;
 
 @InterfaceAudience.Private
-public enum LockedResourceType {
-  SERVER,
-  NAMESPACE,
-  TABLE,
-  REGION,
-  PEER,
-  META
+public class GlobalQueue extends Queue<String> {
+
+  public GlobalQueue(String globalId, LockStatus lockStatus) {
+    super(globalId, lockStatus);
+  }
+
+  @Override
+  boolean requireExclusiveLock(Procedure<?> proc) {
+    return true;
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
index 866f2f6f403..fbf0eb8abf3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
@@ -22,6 +22,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.function.Function;
 import java.util.function.Supplier;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableExistsException;
 import org.apache.hadoop.hbase.TableName;
@@ -95,16 +96,20 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
     (n, k) -> n.compareKey((String) k);
   private final static AvlKeyComparator<MetaQueue> META_QUEUE_KEY_COMPARATOR =
     (n, k) -> n.compareKey((TableName) k);
+  private final static AvlKeyComparator<GlobalQueue> GLOBAL_QUEUE_KEY_COMPARATOR =
+    (n, k) -> n.compareKey((String) k);
 
   private final FairQueue<ServerName> serverRunQueue = new FairQueue<>();
   private final FairQueue<TableName> tableRunQueue = new FairQueue<>();
   private final FairQueue<String> peerRunQueue = new FairQueue<>();
   private final FairQueue<TableName> metaRunQueue = new FairQueue<>();
+  private final FairQueue<String> globalRunQueue = new FairQueue<>();
 
   private final ServerQueue[] serverBuckets = new ServerQueue[128];
   private TableQueue tableMap = null;
   private PeerQueue peerMap = null;
   private MetaQueue metaMap = null;
+  private GlobalQueue globalMap = null;
 
   private final SchemaLocking locking;
 
@@ -128,6 +133,8 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
       doAdd(serverRunQueue, getServerQueue(spi.getServerName(), spi), proc, addFront);
     } else if (isPeerProcedure(proc)) {
       doAdd(peerRunQueue, getPeerQueue(getPeerId(proc)), proc, addFront);
+    } else if (isGlobalProcedure(proc)) {
+      doAdd(globalRunQueue, getGlobalQueue(getGlobalId(proc)), proc, addFront);
     } else {
       // TODO: at the moment we only have Table and Server procedures
       // if you are implementing a non-table/non-server procedure, you have two options: create
@@ -163,14 +170,19 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
 
   @Override
   protected boolean queueHasRunnables() {
-    return metaRunQueue.hasRunnables() || tableRunQueue.hasRunnables()
-      || serverRunQueue.hasRunnables() || peerRunQueue.hasRunnables();
+    return globalRunQueue.hasRunnables() || metaRunQueue.hasRunnables()
+      || tableRunQueue.hasRunnables() || serverRunQueue.hasRunnables()
+      || peerRunQueue.hasRunnables();
   }
 
   @Override
   protected Procedure dequeue() {
-    // meta procedure is always the first priority
-    Procedure<?> pollResult = doPoll(metaRunQueue);
+    // pull global first
+    Procedure<?> pollResult = doPoll(globalRunQueue);
+    // then meta procedure
+    if (pollResult == null) {
+      pollResult = doPoll(metaRunQueue);
+    }
     // For now, let server handling have precedence over table handling; presumption is that it
     // is more important handling crashed servers than it is running the
     // enabling/disabling tables, etc.
@@ -268,6 +280,14 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
     clear(peerMap, peerRunQueue, PEER_QUEUE_KEY_COMPARATOR);
     peerMap = null;
 
+    // Remove Meta
+    clear(metaMap, metaRunQueue, META_QUEUE_KEY_COMPARATOR);
+    metaMap = null;
+
+    // Remove Global
+    clear(globalMap, globalRunQueue, GLOBAL_QUEUE_KEY_COMPARATOR);
+    globalMap = null;
+
     assert size() == 0 : "expected queue size to be 0, got " + size();
   }
 
@@ -300,6 +320,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
     count += queueSize(tableMap);
     count += queueSize(peerMap);
     count += queueSize(metaMap);
+    count += queueSize(globalMap);
     return count;
   }
 
@@ -502,6 +523,51 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
     return proc instanceof MetaProcedureInterface;
   }
 
+  // ============================================================================
+  // Global Queue Lookup Helpers
+  // ============================================================================
+  private GlobalQueue getGlobalQueue(String globalId) {
+    GlobalQueue node = AvlTree.get(globalMap, globalId, GLOBAL_QUEUE_KEY_COMPARATOR);
+    if (node != null) {
+      return node;
+    }
+    node = new GlobalQueue(globalId, locking.getGlobalLock(globalId));
+    globalMap = AvlTree.insert(globalMap, node);
+    return node;
+  }
+
+  private void removeGlobalQueue(String globalId) {
+    globalMap = AvlTree.remove(globalMap, globalId, GLOBAL_QUEUE_KEY_COMPARATOR);
+    locking.removeGlobalLock(globalId);
+  }
+
+  private void tryCleanupGlobalQueue(String globalId, Procedure<?> procedure) {
+    schedLock();
+    try {
+      GlobalQueue queue = AvlTree.get(globalMap, globalId, GLOBAL_QUEUE_KEY_COMPARATOR);
+      if (queue == null) {
+        return;
+      }
+
+      final LockAndQueue lock = locking.getGlobalLock(globalId);
+      if (queue.isEmpty() && lock.tryExclusiveLock(procedure)) {
+        removeFromRunQueue(globalRunQueue, queue,
+          () -> "clean up global queue after " + procedure + " completed");
+        removeGlobalQueue(globalId);
+      }
+    } finally {
+      schedUnlock();
+    }
+  }
+
+  private static boolean isGlobalProcedure(Procedure<?> proc) {
+    return proc instanceof GlobalProcedureInterface;
+  }
+
+  private static String getGlobalId(Procedure<?> proc) {
+    return ((GlobalProcedureInterface) proc).getGlobalId();
+  }
+
   // ============================================================================
   // Table Locking Helpers
   // ============================================================================
@@ -1006,6 +1072,51 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
     }
   }
 
+  // ============================================================================
+  // Global Locking Helpers
+  // ============================================================================
+  /**
+   * Try to acquire the share lock on global.
+   * @see #wakeGlobalExclusiveLock(Procedure, String)
+   * @param procedure the procedure trying to acquire the lock
+   * @return true if the procedure has to wait for global to be available
+   */
+  public boolean waitGlobalExclusiveLock(Procedure<?> procedure, String globalId) {
+    schedLock();
+    try {
+      final LockAndQueue lock = locking.getGlobalLock(globalId);
+      if (lock.tryExclusiveLock(procedure)) {
+        removeFromRunQueue(globalRunQueue, getGlobalQueue(globalId),
+          () -> procedure + " held shared lock");
+        return false;
+      }
+      waitProcedure(lock, procedure);
+      logLockedResource(LockedResourceType.GLOBAL, HConstants.EMPTY_STRING);
+      return true;
+    } finally {
+      schedUnlock();
+    }
+  }
+
+  /**
+   * Wake the procedures waiting for global.
+   * @see #waitGlobalExclusiveLock(Procedure, String)
+   * @param procedure the procedure releasing the lock
+   */
+  public void wakeGlobalExclusiveLock(Procedure<?> procedure, String globalId) {
+    schedLock();
+    try {
+      final LockAndQueue lock = locking.getGlobalLock(globalId);
+      lock.releaseExclusiveLock(procedure);
+      addToRunQueue(globalRunQueue, getGlobalQueue(globalId),
+        () -> procedure + " released shared lock");
+      int waitingCount = wakeWaitingProcedures(lock);
+      wakePollIfNeeded(waitingCount);
+    } finally {
+      schedUnlock();
+    }
+  }
+
   /**
    * For debugging. Expensive.
    */
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SchemaLocking.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SchemaLocking.java
index 13419ac455c..853d13b0c93 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SchemaLocking.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SchemaLocking.java
@@ -53,6 +53,7 @@ class SchemaLocking {
   // Single map for all regions irrespective of tables. Key is encoded region name.
   private final Map<String, LockAndQueue> regionLocks = new HashMap<>();
   private final Map<String, LockAndQueue> peerLocks = new HashMap<>();
+  private final Map<String, LockAndQueue> globalLocks = new HashMap<>();
   private final LockAndQueue metaLock;
 
   public SchemaLocking(Function<Long, Procedure<?>> procedureRetriever) {
@@ -94,6 +95,10 @@ class SchemaLocking {
     return metaLock;
   }
 
+  LockAndQueue getGlobalLock(String globalId) {
+    return getLock(globalLocks, globalId);
+  }
+
   LockAndQueue removeRegionLock(String encodedRegionName) {
     return regionLocks.remove(encodedRegionName);
   }
@@ -114,6 +119,10 @@ class SchemaLocking {
     return peerLocks.remove(peerId);
   }
 
+  LockAndQueue removeGlobalLock(String globalId) {
+    return globalLocks.remove(globalId);
+  }
+
   private LockedResource createLockedResource(LockedResourceType resourceType, String resourceName,
     LockAndQueue queue) {
     LockType lockType;
@@ -164,6 +173,8 @@ class SchemaLocking {
     addToLockedResources(lockedResources, peerLocks, Function.identity(), LockedResourceType.PEER);
     addToLockedResources(lockedResources, ImmutableMap.of(TableName.META_TABLE_NAME, metaLock),
       tn -> tn.getNameAsString(), LockedResourceType.META);
+    addToLockedResources(lockedResources, globalLocks, Function.identity(),
+      LockedResourceType.GLOBAL);
     return lockedResources;
   }
 
@@ -191,6 +202,10 @@ class SchemaLocking {
         break;
       case META:
         queue = metaLock;
+        break;
+      case GLOBAL:
+        queue = globalLocks.get(resourceName);
+        break;
       default:
         queue = null;
         break;
@@ -216,7 +231,8 @@ class SchemaLocking {
       + filterUnlocked(this.namespaceLocks) + ", tableLocks=" + filterUnlocked(this.tableLocks)
       + ", regionLocks=" + filterUnlocked(this.regionLocks) + ", peerLocks="
       + filterUnlocked(this.peerLocks) + ", metaLocks="
-      + filterUnlocked(ImmutableMap.of(TableName.META_TABLE_NAME, metaLock));
+      + filterUnlocked(ImmutableMap.of(TableName.META_TABLE_NAME, metaLock)) + ", globalLocks="
+      + filterUnlocked(globalLocks);
   }
 
   private String filterUnlocked(Map<?, LockAndQueue> locks) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java
index f0edf73715e..0cf34126a94 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java
@@ -940,6 +940,21 @@ public class TestMasterProcedureScheduler {
     }
   }
 
+  public static class TestGlobalProcedure extends TestProcedure
+    implements GlobalProcedureInterface {
+    private final String globalId;
+
+    public TestGlobalProcedure(long procId, String globalId) {
+      super(procId);
+      this.globalId = globalId;
+    }
+
+    @Override
+    public String getGlobalId() {
+      return globalId;
+    }
+  }
+
   private static LockProcedure createLockProcedure(LockType lockType, long procId)
     throws Exception {
     LockProcedure procedure = new LockProcedure();
@@ -1093,6 +1108,39 @@ public class TestMasterProcedureScheduler {
     assertEquals(1, resource.getWaitingProcedures().size());
   }
 
+  @Test
+  public void testListLocksGlobal() throws Exception {
+    String globalId = "1";
+    LockProcedure procedure = createExclusiveLockProcedure(4);
+    queue.waitGlobalExclusiveLock(procedure, globalId);
+
+    List<LockedResource> locks = queue.getLocks();
+    assertEquals(1, locks.size());
+
+    LockedResource resource = locks.get(0);
+    assertLockResource(resource, LockedResourceType.GLOBAL, globalId);
+    assertExclusiveLock(resource, procedure);
+    assertTrue(resource.getWaitingProcedures().isEmpty());
+
+    // Try to acquire the exclusive lock again with same procedure
+    assertFalse(queue.waitGlobalExclusiveLock(procedure, globalId));
+
+    // Try to acquire the exclusive lock again with new procedure
+    LockProcedure procedure2 = createExclusiveLockProcedure(5);
+    assertTrue(queue.waitGlobalExclusiveLock(procedure2, globalId));
+
+    // Same peerId, still only has 1 LockedResource
+    locks = queue.getLocks();
+    assertEquals(1, locks.size());
+
+    resource = locks.get(0);
+    assertLockResource(resource, LockedResourceType.GLOBAL, globalId);
+    // LockedResource owner still is the origin procedure
+    assertExclusiveLock(resource, procedure);
+    // The new procedure should in the waiting list
+    assertEquals(1, resource.getWaitingProcedures().size());
+  }
+
   @Test
   public void testListLocksWaiting() throws Exception {
     LockProcedure procedure1 = createExclusiveLockProcedure(1);