You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2022/07/07 00:36:39 UTC

[hbase] branch master updated: Revert "HBASE-27157 Potential race condition in WorkerAssigner (#4577)"

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new 0116aff9c09 Revert "HBASE-27157 Potential race condition in WorkerAssigner (#4577)"
0116aff9c09 is described below

commit 0116aff9c098963cd8b3ca71598bab93a0424a1e
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Thu Jul 7 08:36:21 2022 +0800

    Revert "HBASE-27157 Potential race condition in WorkerAssigner (#4577)"
    
    This reverts commit f76d8554ca586f925a1514060e6437fcb0e477d6.
---
 .../hadoop/hbase/master/SplitWALManager.java       | 17 ++++++++----
 .../apache/hadoop/hbase/master/WorkerAssigner.java | 31 ++++++++++------------
 .../master/procedure/SnapshotVerifyProcedure.java  |  3 ++-
 .../hbase/master/procedure/SplitWALProcedure.java  |  2 +-
 .../hbase/master/snapshot/SnapshotManager.java     | 15 ++++++++---
 .../hadoop/hbase/master/TestSplitWALManager.java   |  5 ++--
 6 files changed, 43 insertions(+), 30 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java
index 32b2f4d21f2..58eeeef6c5d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
 import org.apache.hadoop.hbase.master.procedure.SplitWALProcedure;
 import org.apache.hadoop.hbase.procedure2.Procedure;
 import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
@@ -151,19 +152,25 @@ public class SplitWALManager {
    */
   public ServerName acquireSplitWALWorker(Procedure<?> procedure)
     throws ProcedureSuspendedException {
-    ServerName worker = splitWorkerAssigner.acquire(procedure);
-    LOG.debug("Acquired split WAL worker={}", worker);
-    return worker;
+    Optional<ServerName> worker = splitWorkerAssigner.acquire();
+    if (worker.isPresent()) {
+      LOG.debug("Acquired split WAL worker={}", worker.get());
+      return worker.get();
+    }
+    splitWorkerAssigner.suspend(procedure);
+    throw new ProcedureSuspendedException();
   }
 
   /**
    * After the worker finished the split WAL task, it will release the worker, and wake up all the
    * suspend procedures in the ProcedureEvent
-   * @param worker worker which is about to release
+   * @param worker    worker which is about to release
+   * @param scheduler scheduler which is to wake up the procedure event
    */
-  public void releaseSplitWALWorker(ServerName worker) {
+  public void releaseSplitWALWorker(ServerName worker, MasterProcedureScheduler scheduler) {
     LOG.debug("Release split WAL worker={}", worker);
     splitWorkerAssigner.release(worker);
+    splitWorkerAssigner.wake(scheduler);
   }
 
   /**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/WorkerAssigner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/WorkerAssigner.java
index b1f2558045d..b6df41acee2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/WorkerAssigner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/WorkerAssigner.java
@@ -23,9 +23,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
 import org.apache.hadoop.hbase.procedure2.Procedure;
 import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
-import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
 import org.apache.yetus.audience.InterfaceAudience;
 
 /**
@@ -51,7 +51,7 @@ public class WorkerAssigner implements ServerListener {
     }
   }
 
-  public synchronized ServerName acquire(Procedure<?> proc) throws ProcedureSuspendedException {
+  public synchronized Optional<ServerName> acquire() {
     List<ServerName> serverList = master.getServerManager().getOnlineServersList();
     Collections.shuffle(serverList);
     Optional<ServerName> worker = serverList.stream()
@@ -60,30 +60,27 @@ public class WorkerAssigner implements ServerListener {
       .findAny();
     worker.ifPresent(name -> currentWorkers.compute(name, (serverName,
       availableWorker) -> availableWorker == null ? maxTasks - 1 : availableWorker - 1));
-    if (worker.isPresent()) {
-      ServerName sn = worker.get();
-      currentWorkers.compute(sn, (serverName,
-        availableWorker) -> availableWorker == null ? maxTasks - 1 : availableWorker - 1);
-      return sn;
-    } else {
-      event.suspend();
-      event.suspendIfNotReady(proc);
-      throw new ProcedureSuspendedException();
-    }
+    return worker;
   }
 
   public synchronized void release(ServerName serverName) {
     currentWorkers.compute(serverName, (k, v) -> v == null ? null : v + 1);
+  }
+
+  public void suspend(Procedure<?> proc) {
+    event.suspend();
+    event.suspendIfNotReady(proc);
+  }
+
+  public void wake(MasterProcedureScheduler scheduler) {
     if (!event.isReady()) {
-      event.wake(master.getMasterProcedureExecutor().getEnvironment().getProcedureScheduler());
+      event.wake(scheduler);
     }
   }
 
   @Override
-  public synchronized void serverAdded(ServerName worker) {
-    if (!event.isReady()) {
-      event.wake(master.getMasterProcedureExecutor().getEnvironment().getProcedureScheduler());
-    }
+  public void serverAdded(ServerName worker) {
+    this.wake(master.getMasterProcedureExecutor().getEnvironment().getProcedureScheduler());
   }
 
   public synchronized void addUsedWorker(ServerName worker) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotVerifyProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotVerifyProcedure.java
index 8ec261d768c..651822ff5b2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotVerifyProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotVerifyProcedure.java
@@ -109,7 +109,8 @@ public class SnapshotVerifyProcedure extends ServerRemoteProcedure
       setFailure("verify-snapshot", e);
     } finally {
       // release the worker
-      env.getMasterServices().getSnapshotManager().releaseSnapshotVerifyWorker(this, targetServer);
+      env.getMasterServices().getSnapshotManager().releaseSnapshotVerifyWorker(this, targetServer,
+        env.getProcedureScheduler());
     }
   }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALProcedure.java
index 98c2c0ec693..699834f9c1d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALProcedure.java
@@ -90,7 +90,7 @@ public class SplitWALProcedure
           skipPersistence();
           throw new ProcedureSuspendedException();
         }
-        splitWALManager.releaseSplitWALWorker(worker);
+        splitWALManager.releaseSplitWALWorker(worker, env.getProcedureScheduler());
         if (!finished) {
           LOG.warn("Failed to split wal {} by server {}, retry...", walPath, worker);
           setNextState(MasterProcedureProtos.SplitWALState.ACQUIRE_SPLIT_WAL_WORKER);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java
index ab682aaeb70..69f4c52d74f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java
@@ -62,6 +62,7 @@ import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
 import org.apache.hadoop.hbase.master.cleaner.HFileLinkCleaner;
 import org.apache.hadoop.hbase.master.procedure.CloneSnapshotProcedure;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
 import org.apache.hadoop.hbase.master.procedure.RestoreSnapshotProcedure;
 import org.apache.hadoop.hbase.master.procedure.SnapshotProcedure;
@@ -1417,14 +1418,20 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
 
   public ServerName acquireSnapshotVerifyWorker(SnapshotVerifyProcedure procedure)
     throws ProcedureSuspendedException {
-    ServerName worker = verifyWorkerAssigner.acquire(procedure);
-    LOG.debug("{} Acquired verify snapshot worker={}", procedure, worker);
-    return worker;
+    Optional<ServerName> worker = verifyWorkerAssigner.acquire();
+    if (worker.isPresent()) {
+      LOG.debug("{} Acquired verify snapshot worker={}", procedure, worker.get());
+      return worker.get();
+    }
+    verifyWorkerAssigner.suspend(procedure);
+    throw new ProcedureSuspendedException();
   }
 
-  public void releaseSnapshotVerifyWorker(SnapshotVerifyProcedure procedure, ServerName worker) {
+  public void releaseSnapshotVerifyWorker(SnapshotVerifyProcedure procedure, ServerName worker,
+    MasterProcedureScheduler scheduler) {
     LOG.debug("{} Release verify snapshot worker={}", procedure, worker);
     verifyWorkerAssigner.release(worker);
+    verifyWorkerAssigner.wake(scheduler);
   }
 
   private void restoreWorkers() {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitWALManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitWALManager.java
index 4665b9c16f8..ea92f792279 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitWALManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitWALManager.java
@@ -115,7 +115,8 @@ public class TestSplitWALManager {
     Assert.assertNotNull(e);
     Assert.assertTrue(e instanceof ProcedureSuspendedException);
 
-    splitWALManager.releaseSplitWALWorker(server);
+    splitWALManager.releaseSplitWALWorker(server, TEST_UTIL.getHBaseCluster().getMaster()
+      .getMasterProcedureExecutor().getEnvironment().getProcedureScheduler());
     Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(3)));
   }
 
@@ -347,7 +348,7 @@ public class TestSplitWALManager {
           setNextState(MasterProcedureProtos.SplitWALState.RELEASE_SPLIT_WORKER);
           return Flow.HAS_MORE_STATE;
         case RELEASE_SPLIT_WORKER:
-          splitWALManager.releaseSplitWALWorker(worker);
+          splitWALManager.releaseSplitWALWorker(worker, env.getProcedureScheduler());
           return Flow.NO_MORE_STATE;
         default:
           throw new UnsupportedOperationException("unhandled state=" + state);