You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2023/05/02 16:48:37 UTC
[shardingsphere] branch master updated: Add ProcessListSubscriber interface (#25432)
This is an automated email from the ASF dual-hosted git repository.
zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 814de015e75 Add ProcessListSubscriber interface (#25432)
814de015e75 is described below
commit 814de015e757dd859f2ca6911bff88660bc5c816
Author: Liang Zhang <zh...@apache.org>
AuthorDate: Wed May 3 00:48:30 2023 +0800
Add ProcessListSubscriber interface (#25432)
* Refactor ClusterProcessListSubscriber
* Add ProcessListSubscriber interface
* Add ProcessListSubscriber interface
---
.../process/ShowProcessListResponseEvent.java | 2 +-
.../ProcessListSubscriber.java} | 29 ++++++++++++-----
.../subscriber/ClusterProcessListSubscriber.java | 38 ++++++++--------------
.../ClusterProcessListSubscriberTest.java | 6 ++--
.../StandaloneProcessListSubscriber.java | 20 ++++--------
.../StandaloneProcessListSubscriberTest.java | 2 +-
.../admin/executor/ShowProcessListExecutor.java | 2 +-
7 files changed, 44 insertions(+), 55 deletions(-)
diff --git a/mode/core/src/main/java/org/apache/shardingsphere/mode/event/process/ShowProcessListResponseEvent.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/event/process/ShowProcessListResponseEvent.java
index 8e90327bf7b..8250b24d0e3 100644
--- a/mode/core/src/main/java/org/apache/shardingsphere/mode/event/process/ShowProcessListResponseEvent.java
+++ b/mode/core/src/main/java/org/apache/shardingsphere/mode/event/process/ShowProcessListResponseEvent.java
@@ -29,5 +29,5 @@ import java.util.Collection;
@Getter
public final class ShowProcessListResponseEvent {
- private final Collection<String> batchProcessContexts;
+ private final Collection<String> processListContexts;
}
diff --git a/mode/core/src/main/java/org/apache/shardingsphere/mode/event/process/ShowProcessListResponseEvent.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/process/ProcessListSubscriber.java
similarity index 54%
copy from mode/core/src/main/java/org/apache/shardingsphere/mode/event/process/ShowProcessListResponseEvent.java
copy to mode/core/src/main/java/org/apache/shardingsphere/mode/process/ProcessListSubscriber.java
index 8e90327bf7b..383a964a508 100644
--- a/mode/core/src/main/java/org/apache/shardingsphere/mode/event/process/ShowProcessListResponseEvent.java
+++ b/mode/core/src/main/java/org/apache/shardingsphere/mode/process/ProcessListSubscriber.java
@@ -15,19 +15,30 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.mode.event.process;
+package org.apache.shardingsphere.mode.process;
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.mode.event.process.KillProcessRequestEvent;
+import org.apache.shardingsphere.mode.event.process.ShowProcessListRequestEvent;
-import java.util.Collection;
+import java.sql.SQLException;
/**
- * Show process list response event.
+ * Processlist subscriber.
*/
-@RequiredArgsConstructor
-@Getter
-public final class ShowProcessListResponseEvent {
+public interface ProcessListSubscriber {
- private final Collection<String> batchProcessContexts;
+ /**
+ * Post show process list data.
+ *
+ * @param event show process list request event
+ */
+ void postShowProcessListData(ShowProcessListRequestEvent event);
+
+ /**
+ * Kill process.
+ *
+ * @param event kill process request event
+ * @throws SQLException SQL exception
+ */
+ void killProcess(KillProcessRequestEvent event) throws SQLException;
}
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ClusterProcessListSubscriber.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ClusterProcessListSubscriber.java
index 50ffc0ef5f2..31bfa25c42e 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ClusterProcessListSubscriber.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ClusterProcessListSubscriber.java
@@ -23,15 +23,14 @@ import org.apache.shardingsphere.infra.executor.sql.process.lock.ShowProcessList
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import org.apache.shardingsphere.metadata.persist.node.ComputeNode;
+import org.apache.shardingsphere.metadata.persist.node.ProcessNode;
import org.apache.shardingsphere.mode.event.process.KillProcessRequestEvent;
import org.apache.shardingsphere.mode.event.process.ShowProcessListRequestEvent;
import org.apache.shardingsphere.mode.event.process.ShowProcessListResponseEvent;
-import org.apache.shardingsphere.metadata.persist.node.ProcessNode;
+import org.apache.shardingsphere.mode.process.ProcessListSubscriber;
import org.apache.shardingsphere.mode.spi.PersistRepository;
import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
@@ -41,7 +40,7 @@ import java.util.stream.Stream;
* Cluster processlist subscriber.
*/
@SuppressWarnings("UnstableApiUsage")
-public final class ClusterProcessListSubscriber {
+public final class ClusterProcessListSubscriber implements ProcessListSubscriber {
private final PersistRepository repository;
@@ -53,20 +52,16 @@ public final class ClusterProcessListSubscriber {
eventBusContext.register(this);
}
- /**
- * Load show process list data.
- *
- * @param event get children request event.
- */
+ @Override
@Subscribe
- public void loadShowProcessListData(final ShowProcessListRequestEvent event) {
+ public void postShowProcessListData(final ShowProcessListRequestEvent event) {
String processId = new UUID(ThreadLocalRandom.current().nextLong(), ThreadLocalRandom.current().nextLong()).toString().replace("-", "");
boolean triggerIsComplete = false;
Collection<String> triggerPaths = getTriggerPaths(processId);
try {
triggerPaths.forEach(each -> repository.persist(each, ""));
triggerIsComplete = waitAllNodeDataReady(processId, triggerPaths);
- sendShowProcessList(processId);
+ postShowProcessListData(processId);
} finally {
repository.delete(ProcessNode.getProcessIdPath(processId));
if (!triggerIsComplete) {
@@ -75,26 +70,19 @@ public final class ClusterProcessListSubscriber {
}
}
+ private void postShowProcessListData(final String processId) {
+ Collection<String> yamlProcessListContexts = repository.getChildrenKeys(ProcessNode.getProcessIdPath(processId)).stream()
+ .map(each -> repository.getDirectly(ProcessNode.getProcessListInstancePath(processId, each))).collect(Collectors.toList());
+ eventBusContext.post(new ShowProcessListResponseEvent(yamlProcessListContexts));
+ }
+
private Collection<String> getTriggerPaths(final String processId) {
return Stream.of(InstanceType.values())
.flatMap(each -> repository.getChildrenKeys(ComputeNode.getOnlineNodePath(each)).stream().map(onlinePath -> ComputeNode.getProcessTriggerInstanceIdNodePath(onlinePath, processId)))
.collect(Collectors.toList());
}
- private void sendShowProcessList(final String processId) {
- List<String> childrenKeys = repository.getChildrenKeys(ProcessNode.getProcessIdPath(processId));
- Collection<String> batchProcessContexts = new LinkedList<>();
- for (String each : childrenKeys) {
- batchProcessContexts.add(repository.getDirectly(ProcessNode.getProcessListInstancePath(processId, each)));
- }
- eventBusContext.post(new ShowProcessListResponseEvent(batchProcessContexts));
- }
-
- /**
- * Kill process.
- *
- * @param event kill process request event
- */
+ @Override
@Subscribe
public void killProcess(final KillProcessRequestEvent event) {
String processId = event.getId();
diff --git a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ClusterProcessListSubscriberTest.java b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ClusterProcessListSubscriberTest.java
index fcd59e1ba50..77be537b9a1 100644
--- a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ClusterProcessListSubscriberTest.java
+++ b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ClusterProcessListSubscriberTest.java
@@ -31,7 +31,6 @@ import org.mockito.junit.jupiter.MockitoExtension;
import java.util.Collections;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -51,12 +50,11 @@ class ClusterProcessListSubscriberTest {
}
@Test
- void assertLoadShowProcessListData() {
+ void assertPostShowProcessListData() {
when(repository.getChildrenKeys(ComputeNode.getOnlineNodePath(InstanceType.JDBC))).thenReturn(Collections.emptyList());
when(repository.getChildrenKeys(ComputeNode.getOnlineNodePath(InstanceType.PROXY))).thenReturn(Collections.singletonList("abc"));
when(repository.getDirectly(any())).thenReturn(null);
- ShowProcessListRequestEvent showProcessListRequestEvent = mock(ShowProcessListRequestEvent.class);
- clusterProcessListSubscriber.loadShowProcessListData(showProcessListRequestEvent);
+ clusterProcessListSubscriber.postShowProcessListData(new ShowProcessListRequestEvent());
verify(repository).persist(any(), any());
}
}
diff --git a/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/StandaloneProcessListSubscriber.java b/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/StandaloneProcessListSubscriber.java
index bf67b24a22a..c3ca49d42aa 100644
--- a/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/StandaloneProcessListSubscriber.java
+++ b/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/StandaloneProcessListSubscriber.java
@@ -27,6 +27,7 @@ import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.mode.event.process.KillProcessRequestEvent;
import org.apache.shardingsphere.mode.event.process.ShowProcessListRequestEvent;
import org.apache.shardingsphere.mode.event.process.ShowProcessListResponseEvent;
+import org.apache.shardingsphere.mode.process.ProcessListSubscriber;
import java.sql.SQLException;
import java.sql.Statement;
@@ -36,7 +37,7 @@ import java.util.Collections;
* Standalone processlist subscriber.
*/
@SuppressWarnings("UnstableApiUsage")
-public final class StandaloneProcessListSubscriber {
+public final class StandaloneProcessListSubscriber implements ProcessListSubscriber {
private final EventBusContext eventBusContext;
@@ -47,23 +48,14 @@ public final class StandaloneProcessListSubscriber {
eventBusContext.register(this);
}
- /**
- * Post show process list data.
- *
- * @param event show process list request event
- */
+ @Override
@Subscribe
public void postShowProcessListData(final ShowProcessListRequestEvent event) {
- YamlProcessListContexts yamlContexts = swapper.swapToYamlConfiguration(ProcessRegistry.getInstance().getAllProcessContexts());
- eventBusContext.post(new ShowProcessListResponseEvent(Collections.singleton(YamlEngine.marshal(yamlContexts))));
+ YamlProcessListContexts yamlProcessListContexts = swapper.swapToYamlConfiguration(ProcessRegistry.getInstance().getAllProcessContexts());
+ eventBusContext.post(new ShowProcessListResponseEvent(Collections.singleton(YamlEngine.marshal(yamlProcessListContexts))));
}
- /**
- * Kill process.
- *
- * @param event kill process request event
- * @throws SQLException SQL exception
- */
+ @Override
@Subscribe
public void killProcess(final KillProcessRequestEvent event) throws SQLException {
ProcessContext processContext = ProcessRegistry.getInstance().getProcessContext(event.getId());
diff --git a/mode/type/standalone/core/src/test/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/StandaloneProcessListSubscriberTest.java b/mode/type/standalone/core/src/test/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/StandaloneProcessListSubscriberTest.java
index 98299a5d054..f677f688e2a 100644
--- a/mode/type/standalone/core/src/test/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/StandaloneProcessListSubscriberTest.java
+++ b/mode/type/standalone/core/src/test/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/StandaloneProcessListSubscriberTest.java
@@ -37,7 +37,7 @@ class StandaloneProcessListSubscriberTest {
void assertPostShowProcessListData() {
ProcessRegistry processRegistry = mock(ProcessRegistry.class);
when(ProcessRegistry.getInstance()).thenReturn(processRegistry);
- new StandaloneProcessListSubscriber(new EventBusContext()).postShowProcessListData(mock(ShowProcessListRequestEvent.class));
+ new StandaloneProcessListSubscriber(new EventBusContext()).postShowProcessListData(new ShowProcessListRequestEvent());
verify(processRegistry).getAllProcessContexts();
}
}
diff --git a/proxy/backend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutor.java b/proxy/backend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutor.java
index 09aa582d7db..19a64fd65d4 100644
--- a/proxy/backend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutor.java
+++ b/proxy/backend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutor.java
@@ -70,7 +70,7 @@ public final class ShowProcessListExecutor implements DatabaseAdminQueryExecutor
*/
@Subscribe
public void receiveProcessListData(final ShowProcessListResponseEvent event) {
- batchProcessContexts = event.getBatchProcessContexts();
+ batchProcessContexts = event.getProcessListContexts();
}
@Override