You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by du...@apache.org on 2023/05/04 11:42:13 UTC

[shardingsphere] branch master updated: Refactor ProcessOperationLockRegistry (#25450)

This is an automated email from the ASF dual-hosted git repository.

duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new d697a45517c Refactor ProcessOperationLockRegistry (#25450)
d697a45517c is described below

commit d697a45517c7ca85460a04d9cf4895999392f54f
Author: Liang Zhang <zh...@apache.org>
AuthorDate: Thu May 4 19:42:06 2023 +0800

    Refactor ProcessOperationLockRegistry (#25450)
---
 .../process/lock/ProcessOperationLockRegistry.java | 40 ++++++++++++++++++++--
 ...va => ProcessOperationLockReleaseStrategy.java} | 28 ++++-----------
 .../subscriber/ClusterProcessListSubscriber.java   | 30 ++++------------
 .../subscriber/ProcessListChangedSubscriber.java   | 11 ++----
 .../ProcessListChangedSubscriberTest.java          | 30 ++++++++--------
 5 files changed, 66 insertions(+), 73 deletions(-)

diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/lock/ProcessOperationLockRegistry.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/lock/ProcessOperationLockRegistry.java
index 32cd0e54963..db240a52630 100644
--- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/lock/ProcessOperationLockRegistry.java
+++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/lock/ProcessOperationLockRegistry.java
@@ -18,7 +18,6 @@
 package org.apache.shardingsphere.infra.executor.sql.process.lock;
 
 import lombok.AccessLevel;
-import lombok.Getter;
 import lombok.NoArgsConstructor;
 
 import java.util.Map;
@@ -28,7 +27,6 @@ import java.util.concurrent.ConcurrentHashMap;
  * Process operation lock registry.
  */
 @NoArgsConstructor(access = AccessLevel.PRIVATE)
-@Getter
 public final class ProcessOperationLockRegistry {
     
     private static final ProcessOperationLockRegistry INSTANCE = new ProcessOperationLockRegistry();
@@ -36,11 +34,47 @@ public final class ProcessOperationLockRegistry {
     private final Map<String, ProcessOperationLock> locks = new ConcurrentHashMap<>();
     
     /**
-     * Get process registry.
+     * Get process operation lock registry.
      *
      * @return got instance
      */
     public static ProcessOperationLockRegistry getInstance() {
         return INSTANCE;
     }
+    
+    /**
+     * Wait until release ready.
+     * 
+     * @param lockId lock ID
+     * @param releaseStrategy process operation lock release strategy
+     * @return release ready or not
+     */
+    public boolean waitUntilReleaseReady(final String lockId, final ProcessOperationLockReleaseStrategy releaseStrategy) {
+        ProcessOperationLock lock = new ProcessOperationLock();
+        locks.put(lockId, lock);
+        lock.lock();
+        try {
+            while (!releaseStrategy.isReadyToRelease()) {
+                if (!lock.awaitDefaultTime()) {
+                    return false;
+                }
+            }
+            return true;
+        } finally {
+            lock.unlock();
+            locks.remove(lockId);
+        }
+    }
+    
+    /**
+     * Notify lock.
+     * 
+     * @param lockId lock ID
+     */
+    public void notify(final String lockId) {
+        ProcessOperationLock lock = locks.get(lockId);
+        if (null != lock) {
+            lock.doNotify();
+        }
+    }
 }
diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/lock/ProcessOperationLockRegistry.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/lock/ProcessOperationLockReleaseStrategy.java
similarity index 58%
copy from infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/lock/ProcessOperationLockRegistry.java
copy to infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/lock/ProcessOperationLockReleaseStrategy.java
index 32cd0e54963..3e1c9f7898c 100644
--- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/lock/ProcessOperationLockRegistry.java
+++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/lock/ProcessOperationLockReleaseStrategy.java
@@ -17,30 +17,16 @@
 
 package org.apache.shardingsphere.infra.executor.sql.process.lock;
 
-import lombok.AccessLevel;
-import lombok.Getter;
-import lombok.NoArgsConstructor;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
 /**
- * Process operation lock registry.
+ * Process operation lock release strategy.
  */
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
-@Getter
-public final class ProcessOperationLockRegistry {
-    
-    private static final ProcessOperationLockRegistry INSTANCE = new ProcessOperationLockRegistry();
-    
-    private final Map<String, ProcessOperationLock> locks = new ConcurrentHashMap<>();
+@FunctionalInterface
+public interface ProcessOperationLockReleaseStrategy {
     
     /**
-     * Get process registry.
-     *
-     * @return got instance
+     * Judge lock is ready to release.
+     * 
+     * @return lock is ready to release or not
      */
-    public static ProcessOperationLockRegistry getInstance() {
-        return INSTANCE;
-    }
+    boolean isReadyToRelease();
 }
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ClusterProcessListSubscriber.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ClusterProcessListSubscriber.java
index 7d031976c14..a5c06ffe1f3 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ClusterProcessListSubscriber.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ClusterProcessListSubscriber.java
@@ -18,7 +18,6 @@
 package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.process.subscriber;
 
 import com.google.common.eventbus.Subscribe;
-import org.apache.shardingsphere.infra.executor.sql.process.lock.ProcessOperationLock;
 import org.apache.shardingsphere.infra.executor.sql.process.lock.ProcessOperationLockRegistry;
 import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
 import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
@@ -60,7 +59,7 @@ public final class ClusterProcessListSubscriber implements ProcessListSubscriber
         boolean isCompleted = false;
         try {
             triggerPaths.forEach(each -> repository.persist(each, ""));
-            isCompleted = waitAllInstancesReady(taskId, triggerPaths);
+            isCompleted = ProcessOperationLockRegistry.getInstance().waitUntilReleaseReady(taskId, () -> isReady(triggerPaths));
             postShowProcessListData(taskId);
         } finally {
             repository.delete(ProcessNode.getProcessIdPath(taskId));
@@ -82,6 +81,10 @@ public final class ClusterProcessListSubscriber implements ProcessListSubscriber
                 .collect(Collectors.toList());
     }
     
+    private boolean isReady(final Collection<String> paths) {
+        return paths.stream().noneMatch(each -> null != repository.getDirectly(each));
+    }
+    
     @Override
     @Subscribe
     public void killProcess(final KillProcessRequestEvent event) {
@@ -90,7 +93,7 @@ public final class ClusterProcessListSubscriber implements ProcessListSubscriber
         boolean isCompleted = false;
         try {
             triggerPaths.forEach(each -> repository.persist(each, ""));
-            isCompleted = waitAllInstancesReady(processId, triggerPaths);
+            isCompleted = ProcessOperationLockRegistry.getInstance().waitUntilReleaseReady(processId, () -> isReady(triggerPaths));
         } finally {
             if (!isCompleted) {
                 triggerPaths.forEach(repository::delete);
@@ -103,25 +106,4 @@ public final class ClusterProcessListSubscriber implements ProcessListSubscriber
                 .flatMap(each -> repository.getChildrenKeys(ComputeNode.getOnlineNodePath(each)).stream().map(onlinePath -> ComputeNode.getProcessKillInstanceIdNodePath(onlinePath, processId)))
                 .collect(Collectors.toList());
     }
-    
-    private boolean waitAllInstancesReady(final String lockId, final Collection<String> paths) {
-        ProcessOperationLock lock = new ProcessOperationLock();
-        ProcessOperationLockRegistry.getInstance().getLocks().put(lockId, lock);
-        lock.lock();
-        try {
-            while (!isReady(paths)) {
-                if (!lock.awaitDefaultTime()) {
-                    return false;
-                }
-            }
-            return true;
-        } finally {
-            lock.unlock();
-            ProcessOperationLockRegistry.getInstance().getLocks().remove(lockId);
-        }
-    }
-    
-    private boolean isReady(final Collection<String> paths) {
-        return paths.stream().noneMatch(each -> null != repository.getDirectly(each));
-    }
 }
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriber.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriber.java
index 4f9d34742d6..ff0c25fcd5b 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriber.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriber.java
@@ -20,7 +20,6 @@ package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.proc
 import com.google.common.eventbus.Subscribe;
 import org.apache.shardingsphere.infra.executor.sql.process.Process;
 import org.apache.shardingsphere.infra.executor.sql.process.ProcessRegistry;
-import org.apache.shardingsphere.infra.executor.sql.process.lock.ProcessOperationLock;
 import org.apache.shardingsphere.infra.executor.sql.process.lock.ProcessOperationLockRegistry;
 import org.apache.shardingsphere.infra.executor.sql.process.yaml.swapper.YamlProcessListSwapper;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
@@ -80,10 +79,7 @@ public final class ProcessListChangedSubscriber {
      */
     @Subscribe
     public synchronized void completeToReportLocalProcesses(final ReportLocalProcessesCompletedEvent event) {
-        ProcessOperationLock lock = ProcessOperationLockRegistry.getInstance().getLocks().get(event.getTaskId());
-        if (null != lock) {
-            lock.doNotify();
-        }
+        ProcessOperationLockRegistry.getInstance().notify(event.getTaskId());
     }
     
     /**
@@ -113,9 +109,6 @@ public final class ProcessListChangedSubscriber {
      */
     @Subscribe
     public synchronized void completeToKillProcessInstance(final KillProcessInstanceCompleteEvent event) {
-        ProcessOperationLock lock = ProcessOperationLockRegistry.getInstance().getLocks().get(event.getProcessId());
-        if (null != lock) {
-            lock.doNotify();
-        }
+        ProcessOperationLockRegistry.getInstance().notify(event.getProcessId());
     }
 }
diff --git a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
index 1302b652be9..224efdd21b1 100644
--- a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
+++ b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
@@ -22,8 +22,8 @@ import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
 import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
 import org.apache.shardingsphere.infra.executor.sql.process.Process;
 import org.apache.shardingsphere.infra.executor.sql.process.ProcessRegistry;
-import org.apache.shardingsphere.infra.executor.sql.process.lock.ProcessOperationLock;
 import org.apache.shardingsphere.infra.executor.sql.process.lock.ProcessOperationLockRegistry;
+import org.apache.shardingsphere.infra.executor.sql.process.lock.ProcessOperationLockReleaseStrategy;
 import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaData;
 import org.apache.shardingsphere.infra.instance.metadata.proxy.ProxyInstanceMetaData;
 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
@@ -58,6 +58,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
@@ -117,8 +118,6 @@ class ProcessListChangedSubscriberTest {
     @Test
     void assertCompleteToReportLocalProcesses() {
         String taskId = "foo_id";
-        ProcessOperationLock lock = new ProcessOperationLock();
-        ProcessOperationLockRegistry.getInstance().getLocks().put(taskId, lock);
         long startMillis = System.currentTimeMillis();
         ExecutorService executorService = Executors.newFixedThreadPool(1);
         executorService.submit(() -> {
@@ -128,11 +127,10 @@ class ProcessListChangedSubscriberTest {
             }
             subscriber.completeToReportLocalProcesses(new ReportLocalProcessesCompletedEvent(taskId));
         });
-        lockAndAwaitDefaultTime(lock);
+        waitUntilReleaseReady(taskId);
         long currentTime = System.currentTimeMillis();
         assertTrue(currentTime >= startMillis + 50L);
         assertTrue(currentTime <= startMillis + 5000L);
-        ProcessOperationLockRegistry.getInstance().getLocks().remove(taskId);
     }
     
     @Test
@@ -147,8 +145,6 @@ class ProcessListChangedSubscriberTest {
     @Test
     void assertCompleteToKillProcessInstance() {
         String processId = "foo_id";
-        ProcessOperationLock lock = new ProcessOperationLock();
-        ProcessOperationLockRegistry.getInstance().getLocks().put(processId, lock);
         long startMillis = System.currentTimeMillis();
         ExecutorService executorService = Executors.newFixedThreadPool(1);
         executorService.submit(() -> {
@@ -158,19 +154,21 @@ class ProcessListChangedSubscriberTest {
             }
             subscriber.completeToKillProcessInstance(new KillProcessInstanceCompleteEvent(processId));
         });
-        lockAndAwaitDefaultTime(lock);
+        waitUntilReleaseReady(processId);
         long currentTime = System.currentTimeMillis();
         assertTrue(currentTime >= startMillis + 50L);
         assertTrue(currentTime <= startMillis + 5000L);
-        ProcessOperationLockRegistry.getInstance().getLocks().remove(processId);
     }
     
-    private void lockAndAwaitDefaultTime(final ProcessOperationLock lock) {
-        lock.lock();
-        try {
-            lock.awaitDefaultTime();
-        } finally {
-            lock.unlock();
-        }
+    private static void waitUntilReleaseReady(final String lockId) {
+        ProcessOperationLockRegistry.getInstance().waitUntilReleaseReady(lockId, new ProcessOperationLockReleaseStrategy() {
+            
+            private final AtomicBoolean firstTime = new AtomicBoolean(true);
+            
+            @Override
+            public boolean isReadyToRelease() {
+                return !firstTime.getAndSet(false);
+            }
+        });
     }
 }