You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2023/05/02 16:48:37 UTC

[shardingsphere] branch master updated: Add ProcessListSubscriber interface (#25432)

This is an automated email from the ASF dual-hosted git repository.

zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 814de015e75 Add ProcessListSubscriber interface (#25432)
814de015e75 is described below

commit 814de015e757dd859f2ca6911bff88660bc5c816
Author: Liang Zhang <zh...@apache.org>
AuthorDate: Wed May 3 00:48:30 2023 +0800

    Add ProcessListSubscriber interface (#25432)
    
    * Refactor ClusterProcessListSubscriber
    
    * Add ProcessListSubscriber interface
    
    * Add ProcessListSubscriber interface
---
 .../process/ShowProcessListResponseEvent.java      |  2 +-
 .../ProcessListSubscriber.java}                    | 29 ++++++++++++-----
 .../subscriber/ClusterProcessListSubscriber.java   | 38 ++++++++--------------
 .../ClusterProcessListSubscriberTest.java          |  6 ++--
 .../StandaloneProcessListSubscriber.java           | 20 ++++--------
 .../StandaloneProcessListSubscriberTest.java       |  2 +-
 .../admin/executor/ShowProcessListExecutor.java    |  2 +-
 7 files changed, 44 insertions(+), 55 deletions(-)

diff --git a/mode/core/src/main/java/org/apache/shardingsphere/mode/event/process/ShowProcessListResponseEvent.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/event/process/ShowProcessListResponseEvent.java
index 8e90327bf7b..8250b24d0e3 100644
--- a/mode/core/src/main/java/org/apache/shardingsphere/mode/event/process/ShowProcessListResponseEvent.java
+++ b/mode/core/src/main/java/org/apache/shardingsphere/mode/event/process/ShowProcessListResponseEvent.java
@@ -29,5 +29,5 @@ import java.util.Collection;
 @Getter
 public final class ShowProcessListResponseEvent {
     
-    private final Collection<String> batchProcessContexts;
+    private final Collection<String> processListContexts;
 }
diff --git a/mode/core/src/main/java/org/apache/shardingsphere/mode/event/process/ShowProcessListResponseEvent.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/process/ProcessListSubscriber.java
similarity index 54%
copy from mode/core/src/main/java/org/apache/shardingsphere/mode/event/process/ShowProcessListResponseEvent.java
copy to mode/core/src/main/java/org/apache/shardingsphere/mode/process/ProcessListSubscriber.java
index 8e90327bf7b..383a964a508 100644
--- a/mode/core/src/main/java/org/apache/shardingsphere/mode/event/process/ShowProcessListResponseEvent.java
+++ b/mode/core/src/main/java/org/apache/shardingsphere/mode/process/ProcessListSubscriber.java
@@ -15,19 +15,30 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.mode.event.process;
+package org.apache.shardingsphere.mode.process;
 
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.mode.event.process.KillProcessRequestEvent;
+import org.apache.shardingsphere.mode.event.process.ShowProcessListRequestEvent;
 
-import java.util.Collection;
+import java.sql.SQLException;
 
 /**
- * Show process list response event.
+ * Processlist subscriber.
  */
-@RequiredArgsConstructor
-@Getter
-public final class ShowProcessListResponseEvent {
+public interface ProcessListSubscriber {
     
-    private final Collection<String> batchProcessContexts;
+    /**
+     * Post show process list data.
+     *
+     * @param event show process list request event
+     */
+    void postShowProcessListData(ShowProcessListRequestEvent event);
+    
+    /**
+     * Kill process.
+     *
+     * @param event kill process request event
+     * @throws SQLException SQL exception
+     */
+    void killProcess(KillProcessRequestEvent event) throws SQLException;
 }
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ClusterProcessListSubscriber.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ClusterProcessListSubscriber.java
index 50ffc0ef5f2..31bfa25c42e 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ClusterProcessListSubscriber.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ClusterProcessListSubscriber.java
@@ -23,15 +23,14 @@ import org.apache.shardingsphere.infra.executor.sql.process.lock.ShowProcessList
 import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
 import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
 import org.apache.shardingsphere.metadata.persist.node.ComputeNode;
+import org.apache.shardingsphere.metadata.persist.node.ProcessNode;
 import org.apache.shardingsphere.mode.event.process.KillProcessRequestEvent;
 import org.apache.shardingsphere.mode.event.process.ShowProcessListRequestEvent;
 import org.apache.shardingsphere.mode.event.process.ShowProcessListResponseEvent;
-import org.apache.shardingsphere.metadata.persist.node.ProcessNode;
+import org.apache.shardingsphere.mode.process.ProcessListSubscriber;
 import org.apache.shardingsphere.mode.spi.PersistRepository;
 
 import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.stream.Collectors;
@@ -41,7 +40,7 @@ import java.util.stream.Stream;
  * Cluster processlist subscriber.
  */
 @SuppressWarnings("UnstableApiUsage")
-public final class ClusterProcessListSubscriber {
+public final class ClusterProcessListSubscriber implements ProcessListSubscriber {
     
     private final PersistRepository repository;
     
@@ -53,20 +52,16 @@ public final class ClusterProcessListSubscriber {
         eventBusContext.register(this);
     }
     
-    /**
-     * Load show process list data.
-     *
-     * @param event get children request event.
-     */
+    @Override
     @Subscribe
-    public void loadShowProcessListData(final ShowProcessListRequestEvent event) {
+    public void postShowProcessListData(final ShowProcessListRequestEvent event) {
         String processId = new UUID(ThreadLocalRandom.current().nextLong(), ThreadLocalRandom.current().nextLong()).toString().replace("-", "");
         boolean triggerIsComplete = false;
         Collection<String> triggerPaths = getTriggerPaths(processId);
         try {
             triggerPaths.forEach(each -> repository.persist(each, ""));
             triggerIsComplete = waitAllNodeDataReady(processId, triggerPaths);
-            sendShowProcessList(processId);
+            postShowProcessListData(processId);
         } finally {
             repository.delete(ProcessNode.getProcessIdPath(processId));
             if (!triggerIsComplete) {
@@ -75,26 +70,19 @@ public final class ClusterProcessListSubscriber {
         }
     }
     
+    private void postShowProcessListData(final String processId) {
+        Collection<String> yamlProcessListContexts = repository.getChildrenKeys(ProcessNode.getProcessIdPath(processId)).stream()
+                .map(each -> repository.getDirectly(ProcessNode.getProcessListInstancePath(processId, each))).collect(Collectors.toList());
+        eventBusContext.post(new ShowProcessListResponseEvent(yamlProcessListContexts));
+    }
+    
     private Collection<String> getTriggerPaths(final String processId) {
         return Stream.of(InstanceType.values())
                 .flatMap(each -> repository.getChildrenKeys(ComputeNode.getOnlineNodePath(each)).stream().map(onlinePath -> ComputeNode.getProcessTriggerInstanceIdNodePath(onlinePath, processId)))
                 .collect(Collectors.toList());
     }
     
-    private void sendShowProcessList(final String processId) {
-        List<String> childrenKeys = repository.getChildrenKeys(ProcessNode.getProcessIdPath(processId));
-        Collection<String> batchProcessContexts = new LinkedList<>();
-        for (String each : childrenKeys) {
-            batchProcessContexts.add(repository.getDirectly(ProcessNode.getProcessListInstancePath(processId, each)));
-        }
-        eventBusContext.post(new ShowProcessListResponseEvent(batchProcessContexts));
-    }
-    
-    /**
-     * Kill process.
-     *
-     * @param event kill process request event
-     */
+    @Override
     @Subscribe
     public void killProcess(final KillProcessRequestEvent event) {
         String processId = event.getId();
diff --git a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ClusterProcessListSubscriberTest.java b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ClusterProcessListSubscriberTest.java
index fcd59e1ba50..77be537b9a1 100644
--- a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ClusterProcessListSubscriberTest.java
+++ b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ClusterProcessListSubscriberTest.java
@@ -31,7 +31,6 @@ import org.mockito.junit.jupiter.MockitoExtension;
 import java.util.Collections;
 
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -51,12 +50,11 @@ class ClusterProcessListSubscriberTest {
     }
     
     @Test
-    void assertLoadShowProcessListData() {
+    void assertPostShowProcessListData() {
         when(repository.getChildrenKeys(ComputeNode.getOnlineNodePath(InstanceType.JDBC))).thenReturn(Collections.emptyList());
         when(repository.getChildrenKeys(ComputeNode.getOnlineNodePath(InstanceType.PROXY))).thenReturn(Collections.singletonList("abc"));
         when(repository.getDirectly(any())).thenReturn(null);
-        ShowProcessListRequestEvent showProcessListRequestEvent = mock(ShowProcessListRequestEvent.class);
-        clusterProcessListSubscriber.loadShowProcessListData(showProcessListRequestEvent);
+        clusterProcessListSubscriber.postShowProcessListData(new ShowProcessListRequestEvent());
         verify(repository).persist(any(), any());
     }
 }
diff --git a/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/StandaloneProcessListSubscriber.java b/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/StandaloneProcessListSubscriber.java
index bf67b24a22a..c3ca49d42aa 100644
--- a/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/StandaloneProcessListSubscriber.java
+++ b/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/StandaloneProcessListSubscriber.java
@@ -27,6 +27,7 @@ import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import org.apache.shardingsphere.mode.event.process.KillProcessRequestEvent;
 import org.apache.shardingsphere.mode.event.process.ShowProcessListRequestEvent;
 import org.apache.shardingsphere.mode.event.process.ShowProcessListResponseEvent;
+import org.apache.shardingsphere.mode.process.ProcessListSubscriber;
 
 import java.sql.SQLException;
 import java.sql.Statement;
@@ -36,7 +37,7 @@ import java.util.Collections;
  * Standalone processlist subscriber.
  */
 @SuppressWarnings("UnstableApiUsage")
-public final class StandaloneProcessListSubscriber {
+public final class StandaloneProcessListSubscriber implements ProcessListSubscriber {
     
     private final EventBusContext eventBusContext;
     
@@ -47,23 +48,14 @@ public final class StandaloneProcessListSubscriber {
         eventBusContext.register(this);
     }
     
-    /**
-     * Post show process list data.
-     *
-     * @param event show process list request event
-     */
+    @Override
     @Subscribe
     public void postShowProcessListData(final ShowProcessListRequestEvent event) {
-        YamlProcessListContexts yamlContexts = swapper.swapToYamlConfiguration(ProcessRegistry.getInstance().getAllProcessContexts());
-        eventBusContext.post(new ShowProcessListResponseEvent(Collections.singleton(YamlEngine.marshal(yamlContexts))));
+        YamlProcessListContexts yamlProcessListContexts = swapper.swapToYamlConfiguration(ProcessRegistry.getInstance().getAllProcessContexts());
+        eventBusContext.post(new ShowProcessListResponseEvent(Collections.singleton(YamlEngine.marshal(yamlProcessListContexts))));
     }
     
-    /**
-     * Kill process.
-     *
-     * @param event kill process request event
-     * @throws SQLException SQL exception
-     */
+    @Override
     @Subscribe
     public void killProcess(final KillProcessRequestEvent event) throws SQLException {
         ProcessContext processContext = ProcessRegistry.getInstance().getProcessContext(event.getId());
diff --git a/mode/type/standalone/core/src/test/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/StandaloneProcessListSubscriberTest.java b/mode/type/standalone/core/src/test/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/StandaloneProcessListSubscriberTest.java
index 98299a5d054..f677f688e2a 100644
--- a/mode/type/standalone/core/src/test/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/StandaloneProcessListSubscriberTest.java
+++ b/mode/type/standalone/core/src/test/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/StandaloneProcessListSubscriberTest.java
@@ -37,7 +37,7 @@ class StandaloneProcessListSubscriberTest {
     void assertPostShowProcessListData() {
         ProcessRegistry processRegistry = mock(ProcessRegistry.class);
         when(ProcessRegistry.getInstance()).thenReturn(processRegistry);
-        new StandaloneProcessListSubscriber(new EventBusContext()).postShowProcessListData(mock(ShowProcessListRequestEvent.class));
+        new StandaloneProcessListSubscriber(new EventBusContext()).postShowProcessListData(new ShowProcessListRequestEvent());
         verify(processRegistry).getAllProcessContexts();
     }
 }
diff --git a/proxy/backend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutor.java b/proxy/backend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutor.java
index 09aa582d7db..19a64fd65d4 100644
--- a/proxy/backend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutor.java
+++ b/proxy/backend/type/mysql/src/main/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutor.java
@@ -70,7 +70,7 @@ public final class ShowProcessListExecutor implements DatabaseAdminQueryExecutor
      */
     @Subscribe
     public void receiveProcessListData(final ShowProcessListResponseEvent event) {
-        batchProcessContexts = event.getBatchProcessContexts();
+        batchProcessContexts = event.getProcessListContexts();
     }
     
     @Override