You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2022/11/13 14:26:00 UTC
[hbase] 05/08: HBASE-27392 Add a new procedure type for implementing some global operations such as migration (#4803)
This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch HBASE-27109/table_based_rqs
in repository https://gitbox.apache.org/repos/asf/hbase.git
commit d2992c299bd02919eea21e6fd97d264137d67c6f
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Thu Sep 29 10:08:02 2022 +0800
HBASE-27392 Add a new procedure type for implementing some global operations such as migration (#4803)
Signed-off-by: Xin Sun <dd...@gmail.com>
---
.../hbase/procedure2/LockedResourceType.java | 3 +-
.../master/procedure/GlobalProcedureInterface.java | 15 ++-
.../hadoop/hbase/master/procedure/GlobalQueue.java | 21 ++--
.../master/procedure/MasterProcedureScheduler.java | 119 ++++++++++++++++++++-
.../hbase/master/procedure/SchemaLocking.java | 18 +++-
.../procedure/TestMasterProcedureScheduler.java | 48 +++++++++
6 files changed, 202 insertions(+), 22 deletions(-)
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockedResourceType.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockedResourceType.java
index 12f899d7565..40141017009 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockedResourceType.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockedResourceType.java
@@ -26,5 +26,6 @@ public enum LockedResourceType {
TABLE,
REGION,
PEER,
- META
+ META,
+ GLOBAL
}
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockedResourceType.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/GlobalProcedureInterface.java
similarity index 82%
copy from hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockedResourceType.java
copy to hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/GlobalProcedureInterface.java
index 12f899d7565..1ef168abfd8 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockedResourceType.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/GlobalProcedureInterface.java
@@ -15,16 +15,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hbase.procedure2;
+package org.apache.hadoop.hbase.master.procedure;
import org.apache.yetus.audience.InterfaceAudience;
+/**
+ * Procedure interface for global operations, such as migration.
+ */
@InterfaceAudience.Private
-public enum LockedResourceType {
- SERVER,
- NAMESPACE,
- TABLE,
- REGION,
- PEER,
- META
+public interface GlobalProcedureInterface {
+
+ String getGlobalId();
}
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockedResourceType.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/GlobalQueue.java
similarity index 69%
copy from hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockedResourceType.java
copy to hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/GlobalQueue.java
index 12f899d7565..1633dc4856e 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockedResourceType.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/GlobalQueue.java
@@ -15,16 +15,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hbase.procedure2;
+package org.apache.hadoop.hbase.master.procedure;
+import org.apache.hadoop.hbase.procedure2.LockStatus;
+import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
-public enum LockedResourceType {
- SERVER,
- NAMESPACE,
- TABLE,
- REGION,
- PEER,
- META
+public class GlobalQueue extends Queue<String> {
+
+ public GlobalQueue(String globalId, LockStatus lockStatus) {
+ super(globalId, lockStatus);
+ }
+
+ @Override
+ boolean requireExclusiveLock(Procedure<?> proc) {
+ return true;
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
index 866f2f6f403..fbf0eb8abf3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
@@ -22,6 +22,7 @@ import java.util.Arrays;
import java.util.List;
import java.util.function.Function;
import java.util.function.Supplier;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.TableName;
@@ -95,16 +96,20 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
(n, k) -> n.compareKey((String) k);
private final static AvlKeyComparator<MetaQueue> META_QUEUE_KEY_COMPARATOR =
(n, k) -> n.compareKey((TableName) k);
+ private final static AvlKeyComparator<GlobalQueue> GLOBAL_QUEUE_KEY_COMPARATOR =
+ (n, k) -> n.compareKey((String) k);
private final FairQueue<ServerName> serverRunQueue = new FairQueue<>();
private final FairQueue<TableName> tableRunQueue = new FairQueue<>();
private final FairQueue<String> peerRunQueue = new FairQueue<>();
private final FairQueue<TableName> metaRunQueue = new FairQueue<>();
+ private final FairQueue<String> globalRunQueue = new FairQueue<>();
private final ServerQueue[] serverBuckets = new ServerQueue[128];
private TableQueue tableMap = null;
private PeerQueue peerMap = null;
private MetaQueue metaMap = null;
+ private GlobalQueue globalMap = null;
private final SchemaLocking locking;
@@ -128,6 +133,8 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
doAdd(serverRunQueue, getServerQueue(spi.getServerName(), spi), proc, addFront);
} else if (isPeerProcedure(proc)) {
doAdd(peerRunQueue, getPeerQueue(getPeerId(proc)), proc, addFront);
+ } else if (isGlobalProcedure(proc)) {
+ doAdd(globalRunQueue, getGlobalQueue(getGlobalId(proc)), proc, addFront);
} else {
// TODO: at the moment we only have Table and Server procedures
// if you are implementing a non-table/non-server procedure, you have two options: create
@@ -163,14 +170,19 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
@Override
protected boolean queueHasRunnables() {
- return metaRunQueue.hasRunnables() || tableRunQueue.hasRunnables()
- || serverRunQueue.hasRunnables() || peerRunQueue.hasRunnables();
+ return globalRunQueue.hasRunnables() || metaRunQueue.hasRunnables()
+ || tableRunQueue.hasRunnables() || serverRunQueue.hasRunnables()
+ || peerRunQueue.hasRunnables();
}
@Override
protected Procedure dequeue() {
- // meta procedure is always the first priority
- Procedure<?> pollResult = doPoll(metaRunQueue);
+ // pull global first
+ Procedure<?> pollResult = doPoll(globalRunQueue);
+ // then meta procedure
+ if (pollResult == null) {
+ pollResult = doPoll(metaRunQueue);
+ }
// For now, let server handling have precedence over table handling; presumption is that it
// is more important handling crashed servers than it is running the
// enabling/disabling tables, etc.
@@ -268,6 +280,14 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
clear(peerMap, peerRunQueue, PEER_QUEUE_KEY_COMPARATOR);
peerMap = null;
+ // Remove Meta
+ clear(metaMap, metaRunQueue, META_QUEUE_KEY_COMPARATOR);
+ metaMap = null;
+
+ // Remove Global
+ clear(globalMap, globalRunQueue, GLOBAL_QUEUE_KEY_COMPARATOR);
+ globalMap = null;
+
assert size() == 0 : "expected queue size to be 0, got " + size();
}
@@ -300,6 +320,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
count += queueSize(tableMap);
count += queueSize(peerMap);
count += queueSize(metaMap);
+ count += queueSize(globalMap);
return count;
}
@@ -502,6 +523,51 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
return proc instanceof MetaProcedureInterface;
}
+ // ============================================================================
+ // Global Queue Lookup Helpers
+ // ============================================================================
+ private GlobalQueue getGlobalQueue(String globalId) {
+ GlobalQueue node = AvlTree.get(globalMap, globalId, GLOBAL_QUEUE_KEY_COMPARATOR);
+ if (node != null) {
+ return node;
+ }
+ node = new GlobalQueue(globalId, locking.getGlobalLock(globalId));
+ globalMap = AvlTree.insert(globalMap, node);
+ return node;
+ }
+
+ private void removeGlobalQueue(String globalId) {
+ globalMap = AvlTree.remove(globalMap, globalId, GLOBAL_QUEUE_KEY_COMPARATOR);
+ locking.removeGlobalLock(globalId);
+ }
+
+ private void tryCleanupGlobalQueue(String globalId, Procedure<?> procedure) {
+ schedLock();
+ try {
+ GlobalQueue queue = AvlTree.get(globalMap, globalId, GLOBAL_QUEUE_KEY_COMPARATOR);
+ if (queue == null) {
+ return;
+ }
+
+ final LockAndQueue lock = locking.getGlobalLock(globalId);
+ if (queue.isEmpty() && lock.tryExclusiveLock(procedure)) {
+ removeFromRunQueue(globalRunQueue, queue,
+ () -> "clean up global queue after " + procedure + " completed");
+ removeGlobalQueue(globalId);
+ }
+ } finally {
+ schedUnlock();
+ }
+ }
+
+ private static boolean isGlobalProcedure(Procedure<?> proc) {
+ return proc instanceof GlobalProcedureInterface;
+ }
+
+ private static String getGlobalId(Procedure<?> proc) {
+ return ((GlobalProcedureInterface) proc).getGlobalId();
+ }
+
// ============================================================================
// Table Locking Helpers
// ============================================================================
@@ -1006,6 +1072,51 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
}
}
+ // ============================================================================
+ // Global Locking Helpers
+ // ============================================================================
+ /**
+ * Try to acquire the share lock on global.
+ * @see #wakeGlobalExclusiveLock(Procedure, String)
+ * @param procedure the procedure trying to acquire the lock
+ * @return true if the procedure has to wait for global to be available
+ */
+ public boolean waitGlobalExclusiveLock(Procedure<?> procedure, String globalId) {
+ schedLock();
+ try {
+ final LockAndQueue lock = locking.getGlobalLock(globalId);
+ if (lock.tryExclusiveLock(procedure)) {
+ removeFromRunQueue(globalRunQueue, getGlobalQueue(globalId),
+ () -> procedure + " held shared lock");
+ return false;
+ }
+ waitProcedure(lock, procedure);
+ logLockedResource(LockedResourceType.GLOBAL, HConstants.EMPTY_STRING);
+ return true;
+ } finally {
+ schedUnlock();
+ }
+ }
+
+ /**
+ * Wake the procedures waiting for global.
+ * @see #waitGlobalExclusiveLock(Procedure, String)
+ * @param procedure the procedure releasing the lock
+ */
+ public void wakeGlobalExclusiveLock(Procedure<?> procedure, String globalId) {
+ schedLock();
+ try {
+ final LockAndQueue lock = locking.getGlobalLock(globalId);
+ lock.releaseExclusiveLock(procedure);
+ addToRunQueue(globalRunQueue, getGlobalQueue(globalId),
+ () -> procedure + " released shared lock");
+ int waitingCount = wakeWaitingProcedures(lock);
+ wakePollIfNeeded(waitingCount);
+ } finally {
+ schedUnlock();
+ }
+ }
+
/**
* For debugging. Expensive.
*/
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SchemaLocking.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SchemaLocking.java
index 13419ac455c..853d13b0c93 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SchemaLocking.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SchemaLocking.java
@@ -53,6 +53,7 @@ class SchemaLocking {
// Single map for all regions irrespective of tables. Key is encoded region name.
private final Map<String, LockAndQueue> regionLocks = new HashMap<>();
private final Map<String, LockAndQueue> peerLocks = new HashMap<>();
+ private final Map<String, LockAndQueue> globalLocks = new HashMap<>();
private final LockAndQueue metaLock;
public SchemaLocking(Function<Long, Procedure<?>> procedureRetriever) {
@@ -94,6 +95,10 @@ class SchemaLocking {
return metaLock;
}
+ LockAndQueue getGlobalLock(String globalId) {
+ return getLock(globalLocks, globalId);
+ }
+
LockAndQueue removeRegionLock(String encodedRegionName) {
return regionLocks.remove(encodedRegionName);
}
@@ -114,6 +119,10 @@ class SchemaLocking {
return peerLocks.remove(peerId);
}
+ LockAndQueue removeGlobalLock(String globalId) {
+ return globalLocks.remove(globalId);
+ }
+
private LockedResource createLockedResource(LockedResourceType resourceType, String resourceName,
LockAndQueue queue) {
LockType lockType;
@@ -164,6 +173,8 @@ class SchemaLocking {
addToLockedResources(lockedResources, peerLocks, Function.identity(), LockedResourceType.PEER);
addToLockedResources(lockedResources, ImmutableMap.of(TableName.META_TABLE_NAME, metaLock),
tn -> tn.getNameAsString(), LockedResourceType.META);
+ addToLockedResources(lockedResources, globalLocks, Function.identity(),
+ LockedResourceType.GLOBAL);
return lockedResources;
}
@@ -191,6 +202,10 @@ class SchemaLocking {
break;
case META:
queue = metaLock;
+ break;
+ case GLOBAL:
+ queue = globalLocks.get(resourceName);
+ break;
default:
queue = null;
break;
@@ -216,7 +231,8 @@ class SchemaLocking {
+ filterUnlocked(this.namespaceLocks) + ", tableLocks=" + filterUnlocked(this.tableLocks)
+ ", regionLocks=" + filterUnlocked(this.regionLocks) + ", peerLocks="
+ filterUnlocked(this.peerLocks) + ", metaLocks="
- + filterUnlocked(ImmutableMap.of(TableName.META_TABLE_NAME, metaLock));
+ + filterUnlocked(ImmutableMap.of(TableName.META_TABLE_NAME, metaLock)) + ", globalLocks="
+ + filterUnlocked(globalLocks);
}
private String filterUnlocked(Map<?, LockAndQueue> locks) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java
index f0edf73715e..0cf34126a94 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java
@@ -940,6 +940,21 @@ public class TestMasterProcedureScheduler {
}
}
+ public static class TestGlobalProcedure extends TestProcedure
+ implements GlobalProcedureInterface {
+ private final String globalId;
+
+ public TestGlobalProcedure(long procId, String globalId) {
+ super(procId);
+ this.globalId = globalId;
+ }
+
+ @Override
+ public String getGlobalId() {
+ return globalId;
+ }
+ }
+
private static LockProcedure createLockProcedure(LockType lockType, long procId)
throws Exception {
LockProcedure procedure = new LockProcedure();
@@ -1093,6 +1108,39 @@ public class TestMasterProcedureScheduler {
assertEquals(1, resource.getWaitingProcedures().size());
}
+ @Test
+ public void testListLocksGlobal() throws Exception {
+ String globalId = "1";
+ LockProcedure procedure = createExclusiveLockProcedure(4);
+ queue.waitGlobalExclusiveLock(procedure, globalId);
+
+ List<LockedResource> locks = queue.getLocks();
+ assertEquals(1, locks.size());
+
+ LockedResource resource = locks.get(0);
+ assertLockResource(resource, LockedResourceType.GLOBAL, globalId);
+ assertExclusiveLock(resource, procedure);
+ assertTrue(resource.getWaitingProcedures().isEmpty());
+
+ // Try to acquire the exclusive lock again with same procedure
+ assertFalse(queue.waitGlobalExclusiveLock(procedure, globalId));
+
+ // Try to acquire the exclusive lock again with new procedure
+ LockProcedure procedure2 = createExclusiveLockProcedure(5);
+ assertTrue(queue.waitGlobalExclusiveLock(procedure2, globalId));
+
+ // Same peerId, still only has 1 LockedResource
+ locks = queue.getLocks();
+ assertEquals(1, locks.size());
+
+ resource = locks.get(0);
+ assertLockResource(resource, LockedResourceType.GLOBAL, globalId);
+ // LockedResource owner still is the origin procedure
+ assertExclusiveLock(resource, procedure);
+ // The new procedure should in the waiting list
+ assertEquals(1, resource.getWaitingProcedures().size());
+ }
+
@Test
public void testListLocksWaiting() throws Exception {
LockProcedure procedure1 = createExclusiveLockProcedure(1);