You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by du...@apache.org on 2023/05/04 11:42:13 UTC
[shardingsphere] branch master updated: Refactor ProcessOperationLockRegistry (#25450)
This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new d697a45517c Refactor ProcessOperationLockRegistry (#25450)
d697a45517c is described below
commit d697a45517c7ca85460a04d9cf4895999392f54f
Author: Liang Zhang <zh...@apache.org>
AuthorDate: Thu May 4 19:42:06 2023 +0800
Refactor ProcessOperationLockRegistry (#25450)
---
.../process/lock/ProcessOperationLockRegistry.java | 40 ++++++++++++++++++++--
...va => ProcessOperationLockReleaseStrategy.java} | 28 ++++-----------
.../subscriber/ClusterProcessListSubscriber.java | 30 ++++------------
.../subscriber/ProcessListChangedSubscriber.java | 11 ++----
.../ProcessListChangedSubscriberTest.java | 30 ++++++++--------
5 files changed, 66 insertions(+), 73 deletions(-)
diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/lock/ProcessOperationLockRegistry.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/lock/ProcessOperationLockRegistry.java
index 32cd0e54963..db240a52630 100644
--- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/lock/ProcessOperationLockRegistry.java
+++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/lock/ProcessOperationLockRegistry.java
@@ -18,7 +18,6 @@
package org.apache.shardingsphere.infra.executor.sql.process.lock;
import lombok.AccessLevel;
-import lombok.Getter;
import lombok.NoArgsConstructor;
import java.util.Map;
@@ -28,7 +27,6 @@ import java.util.concurrent.ConcurrentHashMap;
* Process operation lock registry.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
-@Getter
public final class ProcessOperationLockRegistry {
private static final ProcessOperationLockRegistry INSTANCE = new ProcessOperationLockRegistry();
@@ -36,11 +34,47 @@ public final class ProcessOperationLockRegistry {
private final Map<String, ProcessOperationLock> locks = new ConcurrentHashMap<>();
/**
- * Get process registry.
+ * Get process operation lock registry.
*
* @return got instance
*/
public static ProcessOperationLockRegistry getInstance() {
return INSTANCE;
}
+
+ /**
+ * Wait until release ready.
+ *
+ * @param lockId lock ID
+ * @param releaseStrategy process operation lock release strategy
+ * @return release ready or not
+ */
+ public boolean waitUntilReleaseReady(final String lockId, final ProcessOperationLockReleaseStrategy releaseStrategy) {
+ ProcessOperationLock lock = new ProcessOperationLock();
+ locks.put(lockId, lock);
+ lock.lock();
+ try {
+ while (!releaseStrategy.isReadyToRelease()) {
+ if (!lock.awaitDefaultTime()) {
+ return false;
+ }
+ }
+ return true;
+ } finally {
+ lock.unlock();
+ locks.remove(lockId);
+ }
+ }
+
+ /**
+ * Notify lock.
+ *
+ * @param lockId lock ID
+ */
+ public void notify(final String lockId) {
+ ProcessOperationLock lock = locks.get(lockId);
+ if (null != lock) {
+ lock.doNotify();
+ }
+ }
}
diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/lock/ProcessOperationLockRegistry.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/lock/ProcessOperationLockReleaseStrategy.java
similarity index 58%
copy from infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/lock/ProcessOperationLockRegistry.java
copy to infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/lock/ProcessOperationLockReleaseStrategy.java
index 32cd0e54963..3e1c9f7898c 100644
--- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/lock/ProcessOperationLockRegistry.java
+++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/lock/ProcessOperationLockReleaseStrategy.java
@@ -17,30 +17,16 @@
package org.apache.shardingsphere.infra.executor.sql.process.lock;
-import lombok.AccessLevel;
-import lombok.Getter;
-import lombok.NoArgsConstructor;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
/**
- * Process operation lock registry.
+ * Process operation lock release strategy.
*/
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
-@Getter
-public final class ProcessOperationLockRegistry {
-
- private static final ProcessOperationLockRegistry INSTANCE = new ProcessOperationLockRegistry();
-
- private final Map<String, ProcessOperationLock> locks = new ConcurrentHashMap<>();
+@FunctionalInterface
+public interface ProcessOperationLockReleaseStrategy {
/**
- * Get process registry.
- *
- * @return got instance
+ * Judge lock is ready to release.
+ *
+ * @return lock is ready to release or not
*/
- public static ProcessOperationLockRegistry getInstance() {
- return INSTANCE;
- }
+ boolean isReadyToRelease();
}
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ClusterProcessListSubscriber.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ClusterProcessListSubscriber.java
index 7d031976c14..a5c06ffe1f3 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ClusterProcessListSubscriber.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ClusterProcessListSubscriber.java
@@ -18,7 +18,6 @@
package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.process.subscriber;
import com.google.common.eventbus.Subscribe;
-import org.apache.shardingsphere.infra.executor.sql.process.lock.ProcessOperationLock;
import org.apache.shardingsphere.infra.executor.sql.process.lock.ProcessOperationLockRegistry;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
@@ -60,7 +59,7 @@ public final class ClusterProcessListSubscriber implements ProcessListSubscriber
boolean isCompleted = false;
try {
triggerPaths.forEach(each -> repository.persist(each, ""));
- isCompleted = waitAllInstancesReady(taskId, triggerPaths);
+ isCompleted = ProcessOperationLockRegistry.getInstance().waitUntilReleaseReady(taskId, () -> isReady(triggerPaths));
postShowProcessListData(taskId);
} finally {
repository.delete(ProcessNode.getProcessIdPath(taskId));
@@ -82,6 +81,10 @@ public final class ClusterProcessListSubscriber implements ProcessListSubscriber
.collect(Collectors.toList());
}
+ private boolean isReady(final Collection<String> paths) {
+ return paths.stream().noneMatch(each -> null != repository.getDirectly(each));
+ }
+
@Override
@Subscribe
public void killProcess(final KillProcessRequestEvent event) {
@@ -90,7 +93,7 @@ public final class ClusterProcessListSubscriber implements ProcessListSubscriber
boolean isCompleted = false;
try {
triggerPaths.forEach(each -> repository.persist(each, ""));
- isCompleted = waitAllInstancesReady(processId, triggerPaths);
+ isCompleted = ProcessOperationLockRegistry.getInstance().waitUntilReleaseReady(processId, () -> isReady(triggerPaths));
} finally {
if (!isCompleted) {
triggerPaths.forEach(repository::delete);
@@ -103,25 +106,4 @@ public final class ClusterProcessListSubscriber implements ProcessListSubscriber
.flatMap(each -> repository.getChildrenKeys(ComputeNode.getOnlineNodePath(each)).stream().map(onlinePath -> ComputeNode.getProcessKillInstanceIdNodePath(onlinePath, processId)))
.collect(Collectors.toList());
}
-
- private boolean waitAllInstancesReady(final String lockId, final Collection<String> paths) {
- ProcessOperationLock lock = new ProcessOperationLock();
- ProcessOperationLockRegistry.getInstance().getLocks().put(lockId, lock);
- lock.lock();
- try {
- while (!isReady(paths)) {
- if (!lock.awaitDefaultTime()) {
- return false;
- }
- }
- return true;
- } finally {
- lock.unlock();
- ProcessOperationLockRegistry.getInstance().getLocks().remove(lockId);
- }
- }
-
- private boolean isReady(final Collection<String> paths) {
- return paths.stream().noneMatch(each -> null != repository.getDirectly(each));
- }
}
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriber.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriber.java
index 4f9d34742d6..ff0c25fcd5b 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriber.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriber.java
@@ -20,7 +20,6 @@ package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.proc
import com.google.common.eventbus.Subscribe;
import org.apache.shardingsphere.infra.executor.sql.process.Process;
import org.apache.shardingsphere.infra.executor.sql.process.ProcessRegistry;
-import org.apache.shardingsphere.infra.executor.sql.process.lock.ProcessOperationLock;
import org.apache.shardingsphere.infra.executor.sql.process.lock.ProcessOperationLockRegistry;
import org.apache.shardingsphere.infra.executor.sql.process.yaml.swapper.YamlProcessListSwapper;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
@@ -80,10 +79,7 @@ public final class ProcessListChangedSubscriber {
*/
@Subscribe
public synchronized void completeToReportLocalProcesses(final ReportLocalProcessesCompletedEvent event) {
- ProcessOperationLock lock = ProcessOperationLockRegistry.getInstance().getLocks().get(event.getTaskId());
- if (null != lock) {
- lock.doNotify();
- }
+ ProcessOperationLockRegistry.getInstance().notify(event.getTaskId());
}
/**
@@ -113,9 +109,6 @@ public final class ProcessListChangedSubscriber {
*/
@Subscribe
public synchronized void completeToKillProcessInstance(final KillProcessInstanceCompleteEvent event) {
- ProcessOperationLock lock = ProcessOperationLockRegistry.getInstance().getLocks().get(event.getProcessId());
- if (null != lock) {
- lock.doNotify();
- }
+ ProcessOperationLockRegistry.getInstance().notify(event.getProcessId());
}
}
diff --git a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
index 1302b652be9..224efdd21b1 100644
--- a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
+++ b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
@@ -22,8 +22,8 @@ import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
import org.apache.shardingsphere.infra.executor.sql.process.Process;
import org.apache.shardingsphere.infra.executor.sql.process.ProcessRegistry;
-import org.apache.shardingsphere.infra.executor.sql.process.lock.ProcessOperationLock;
import org.apache.shardingsphere.infra.executor.sql.process.lock.ProcessOperationLockRegistry;
+import org.apache.shardingsphere.infra.executor.sql.process.lock.ProcessOperationLockReleaseStrategy;
import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaData;
import org.apache.shardingsphere.infra.instance.metadata.proxy.ProxyInstanceMetaData;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
@@ -58,6 +58,7 @@ import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
@@ -117,8 +118,6 @@ class ProcessListChangedSubscriberTest {
@Test
void assertCompleteToReportLocalProcesses() {
String taskId = "foo_id";
- ProcessOperationLock lock = new ProcessOperationLock();
- ProcessOperationLockRegistry.getInstance().getLocks().put(taskId, lock);
long startMillis = System.currentTimeMillis();
ExecutorService executorService = Executors.newFixedThreadPool(1);
executorService.submit(() -> {
@@ -128,11 +127,10 @@ class ProcessListChangedSubscriberTest {
}
subscriber.completeToReportLocalProcesses(new ReportLocalProcessesCompletedEvent(taskId));
});
- lockAndAwaitDefaultTime(lock);
+ waitUntilReleaseReady(taskId);
long currentTime = System.currentTimeMillis();
assertTrue(currentTime >= startMillis + 50L);
assertTrue(currentTime <= startMillis + 5000L);
- ProcessOperationLockRegistry.getInstance().getLocks().remove(taskId);
}
@Test
@@ -147,8 +145,6 @@ class ProcessListChangedSubscriberTest {
@Test
void assertCompleteToKillProcessInstance() {
String processId = "foo_id";
- ProcessOperationLock lock = new ProcessOperationLock();
- ProcessOperationLockRegistry.getInstance().getLocks().put(processId, lock);
long startMillis = System.currentTimeMillis();
ExecutorService executorService = Executors.newFixedThreadPool(1);
executorService.submit(() -> {
@@ -158,19 +154,21 @@ class ProcessListChangedSubscriberTest {
}
subscriber.completeToKillProcessInstance(new KillProcessInstanceCompleteEvent(processId));
});
- lockAndAwaitDefaultTime(lock);
+ waitUntilReleaseReady(processId);
long currentTime = System.currentTimeMillis();
assertTrue(currentTime >= startMillis + 50L);
assertTrue(currentTime <= startMillis + 5000L);
- ProcessOperationLockRegistry.getInstance().getLocks().remove(processId);
}
- private void lockAndAwaitDefaultTime(final ProcessOperationLock lock) {
- lock.lock();
- try {
- lock.awaitDefaultTime();
- } finally {
- lock.unlock();
- }
+ private static void waitUntilReleaseReady(final String lockId) {
+ ProcessOperationLockRegistry.getInstance().waitUntilReleaseReady(lockId, new ProcessOperationLockReleaseStrategy() {
+
+ private final AtomicBoolean firstTime = new AtomicBoolean(true);
+
+ @Override
+ public boolean isReadyToRelease() {
+ return !firstTime.getAndSet(false);
+ }
+ });
}
}