You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2023/05/04 15:15:30 UTC

[shardingsphere] branch master updated: Refactor ComputeNodeStateChangedWatcher (#25457)

This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 2b2650d1d1e Refactor ComputeNodeStateChangedWatcher (#25457)
2b2650d1d1e is described below

commit 2b2650d1d1e2960cb92d01981a52a39f7cfc3db7
Author: Liang Zhang <zh...@apache.org>
AuthorDate: Thu May 4 23:15:21 2023 +0800

    Refactor ComputeNodeStateChangedWatcher (#25457)
---
 .../metadata/persist/node/ComputeNode.java         | 24 ++++++-------
 .../metadata/persist/node/ComputeNodeTest.java     |  2 +-
 .../subscriber/ProcessListChangedSubscriber.java   | 20 +++++------
 ...nt.java => KillLocalProcessCompletedEvent.java} |  4 +--
 ...rocessEvent.java => KillLocalProcessEvent.java} |  4 +--
 ...erEvent.java => ReportLocalProcessesEvent.java} |  4 +--
 .../watcher/ComputeNodeStateChangedWatcher.java    | 40 +++++++++++-----------
 .../ProcessListChangedSubscriberTest.java          | 28 ++++++++-------
 8 files changed, 64 insertions(+), 62 deletions(-)

diff --git a/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/node/ComputeNode.java b/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/node/ComputeNode.java
index 25dfd4dbd7f..c21c08cc872 100644
--- a/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/node/ComputeNode.java
+++ b/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/node/ComputeNode.java
@@ -38,9 +38,9 @@ public final class ComputeNode {
     
     private static final String LABELS_NODE = "labels";
     
-    private static final String PROCESS_TRIGGER = "process_trigger";
+    private static final String SHOW_PROCESS_LIST_TRIGGER = "process_trigger";
     
-    private static final String PROCESS_KILL = "process_kill";
+    private static final String KILL_PROCESS_TRIGGER = "process_kill";
     
     private static final String STATUS_NODE = "status";
     
@@ -77,21 +77,21 @@ public final class ComputeNode {
     }
     
     /**
-     * Get process trigger node path.
+     * Get show process list trigger node path.
      * 
-     * @return path of process trigger node path
+     * @return show process list trigger node path
      */
-    public static String getProcessTriggerNodePatch() {
-        return String.join("/", "", ROOT_NODE, COMPUTE_NODE, PROCESS_TRIGGER);
+    public static String getShowProcessListTriggerNodePath() {
+        return String.join("/", "", ROOT_NODE, COMPUTE_NODE, SHOW_PROCESS_LIST_TRIGGER);
     }
     
     /**
-     * Get process kill node path.
+     * Get kill process trigger node path.
      *
-     * @return path of process kill node path
+     * @return kill process trigger node path
      */
-    public static String getProcessKillNodePatch() {
-        return String.join("/", "", ROOT_NODE, COMPUTE_NODE, PROCESS_KILL);
+    public static String getKillProcessTriggerNodePath() {
+        return String.join("/", "", ROOT_NODE, COMPUTE_NODE, KILL_PROCESS_TRIGGER);
     }
     
     /**
@@ -102,7 +102,7 @@ public final class ComputeNode {
      * @return path of process trigger instance node path
      */
     public static String getProcessTriggerInstanceNodePath(final String instanceId, final String taskId) {
-        return String.join("/", "", ROOT_NODE, COMPUTE_NODE, PROCESS_TRIGGER, String.join(":", instanceId, taskId));
+        return String.join("/", "", ROOT_NODE, COMPUTE_NODE, SHOW_PROCESS_LIST_TRIGGER, String.join(":", instanceId, taskId));
     }
     
     /**
@@ -113,7 +113,7 @@ public final class ComputeNode {
      * @return path of process kill instance id node path
      */
     public static String getProcessKillInstanceIdNodePath(final String instanceId, final String processId) {
-        return String.join("/", "", ROOT_NODE, COMPUTE_NODE, PROCESS_KILL, String.join(":", instanceId, processId));
+        return String.join("/", "", ROOT_NODE, COMPUTE_NODE, KILL_PROCESS_TRIGGER, String.join(":", instanceId, processId));
     }
     
     /**
diff --git a/kernel/metadata/core/src/test/java/org/apache/shardingsphere/metadata/persist/node/ComputeNodeTest.java b/kernel/metadata/core/src/test/java/org/apache/shardingsphere/metadata/persist/node/ComputeNodeTest.java
index 725fc8eb35b..42b282e2580 100644
--- a/kernel/metadata/core/src/test/java/org/apache/shardingsphere/metadata/persist/node/ComputeNodeTest.java
+++ b/kernel/metadata/core/src/test/java/org/apache/shardingsphere/metadata/persist/node/ComputeNodeTest.java
@@ -39,7 +39,7 @@ class ComputeNodeTest {
     
     @Test
     void assertGetProcessTriggerNodePatch() {
-        assertThat(ComputeNode.getProcessTriggerNodePatch(), is("/nodes/compute_nodes/process_trigger"));
+        assertThat(ComputeNode.getShowProcessListTriggerNodePath(), is("/nodes/compute_nodes/process_trigger"));
     }
     
     @Test
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriber.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriber.java
index 0213e98375e..42bf40f9ad7 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriber.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriber.java
@@ -27,10 +27,10 @@ import org.apache.shardingsphere.metadata.persist.node.ComputeNode;
 import org.apache.shardingsphere.metadata.persist.node.ProcessNode;
 import org.apache.shardingsphere.mode.manager.ContextManager;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillProcessEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillProcessCompletedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillLocalProcessEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillLocalProcessCompletedEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ReportLocalProcessesCompletedEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListTriggerEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ReportLocalProcessesEvent;
 
 import java.sql.SQLException;
 import java.sql.Statement;
@@ -60,7 +60,7 @@ public final class ProcessListChangedSubscriber {
      * @param event show process list trigger event
      */
     @Subscribe
-    public void reportLocalProcesses(final ShowProcessListTriggerEvent event) {
+    public void reportLocalProcesses(final ReportLocalProcessesEvent event) {
         if (!event.getInstanceId().equals(contextManager.getInstanceContext().getInstance().getMetaData().getId())) {
             return;
         }
@@ -83,13 +83,13 @@ public final class ProcessListChangedSubscriber {
     }
     
     /**
-     * Kill process.
+     * Kill local process.
      *
-     * @param event kill process id event
+     * @param event kill local process event
      * @throws SQLException SQL exception
      */
     @Subscribe
-    public synchronized void killProcess(final KillProcessEvent event) throws SQLException {
+    public synchronized void killLocalProcess(final KillLocalProcessEvent event) throws SQLException {
         if (!event.getInstanceId().equals(contextManager.getInstanceContext().getInstance().getMetaData().getId())) {
             return;
         }
@@ -103,12 +103,12 @@ public final class ProcessListChangedSubscriber {
     }
     
     /**
-     * Complete to kill process.
+     * Complete to kill local process.
      *
-     * @param event kill process completed event
+     * @param event kill local process completed event
      */
     @Subscribe
-    public synchronized void completeToKillProcess(final KillProcessCompletedEvent event) {
+    public synchronized void completeToKillLocalProcess(final KillLocalProcessCompletedEvent event) {
         ProcessOperationLockRegistry.getInstance().notify(event.getProcessId());
     }
 }
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/KillProcessCompletedEvent.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/KillLocalProcessCompletedEvent.java
similarity index 90%
rename from mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/KillProcessCompletedEvent.java
rename to mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/KillLocalProcessCompletedEvent.java
index 4d130fd4859..200cb2997ec 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/KillProcessCompletedEvent.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/KillLocalProcessCompletedEvent.java
@@ -22,11 +22,11 @@ import lombok.RequiredArgsConstructor;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
 
 /**
- * Kill process completed event.
+ * Kill local process completed event.
  */
 @RequiredArgsConstructor
 @Getter
-public final class KillProcessCompletedEvent implements GovernanceEvent {
+public final class KillLocalProcessCompletedEvent implements GovernanceEvent {
     
     private final String processId;
 }
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/KillProcessEvent.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/KillLocalProcessEvent.java
similarity index 92%
rename from mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/KillProcessEvent.java
rename to mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/KillLocalProcessEvent.java
index 95ccbf5e396..fa48cf31ff6 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/KillProcessEvent.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/KillLocalProcessEvent.java
@@ -22,11 +22,11 @@ import lombok.RequiredArgsConstructor;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
 
 /**
- * Kill process event.
+ * Kill local process event.
  */
 @RequiredArgsConstructor
 @Getter
-public final class KillProcessEvent implements GovernanceEvent {
+public final class KillLocalProcessEvent implements GovernanceEvent {
     
     private final String instanceId;
     
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/ShowProcessListTriggerEvent.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/ReportLocalProcessesEvent.java
similarity index 91%
rename from mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/ShowProcessListTriggerEvent.java
rename to mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/ReportLocalProcessesEvent.java
index 1d737634ad7..4e149799680 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/ShowProcessListTriggerEvent.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/ReportLocalProcessesEvent.java
@@ -22,11 +22,11 @@ import lombok.RequiredArgsConstructor;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
 
 /**
- * Show processlist trigger event.
+ * Report local processes event.
  */
 @RequiredArgsConstructor
 @Getter
-public final class ShowProcessListTriggerEvent implements GovernanceEvent {
+public final class ReportLocalProcessesEvent implements GovernanceEvent {
     
     private final String instanceId;
     
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
index 44aea4d6bc4..3c2d145c4e9 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
@@ -28,11 +28,11 @@ import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.Gover
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.InstanceOfflineEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.InstanceOnlineEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillProcessEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillProcessCompletedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillLocalProcessEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillLocalProcessCompletedEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.LabelsEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ReportLocalProcessesCompletedEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListTriggerEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ReportLocalProcessesEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.StateEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.WorkerIdEvent;
 import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
@@ -79,17 +79,17 @@ public final class ComputeNodeStateChangedWatcher implements GovernanceWatcher<G
         if (event.getKey().startsWith(ComputeNode.getOnlineInstanceNodePath())) {
             return createInstanceEvent(event);
         }
-        if (event.getKey().startsWith(ComputeNode.getProcessTriggerNodePatch())) {
-            return createShowProcessListTriggerEvent(event);
+        if (event.getKey().startsWith(ComputeNode.getShowProcessListTriggerNodePath())) {
+            return createReportLocalProcessesEvent(event);
         }
-        if (event.getKey().startsWith(ComputeNode.getProcessKillNodePatch())) {
-            return createKillProcessIdEvent(event);
+        if (event.getKey().startsWith(ComputeNode.getKillProcessTriggerNodePath())) {
+            return createKillLocalProcessEvent(event);
         }
         return Optional.empty();
     }
     
     private Optional<GovernanceEvent> createInstanceEvent(final DataChangedEvent event) {
-        Matcher matcher = matchInstanceOnlinePath(event.getKey());
+        Matcher matcher = getInstanceOnlinePathMatcher(event.getKey());
         if (matcher.find()) {
             ComputeNodeData computeNodeData = YamlEngine.unmarshal(event.getValue(), ComputeNodeData.class);
             InstanceMetaData instanceMetaData = InstanceMetaDataFactory.create(matcher.group(2),
@@ -104,17 +104,17 @@ public final class ComputeNodeStateChangedWatcher implements GovernanceWatcher<G
         return Optional.empty();
     }
     
-    private Matcher matchInstanceOnlinePath(final String onlineInstancePath) {
+    private Matcher getInstanceOnlinePathMatcher(final String onlineInstancePath) {
         return Pattern.compile(ComputeNode.getOnlineInstanceNodePath() + "/([\\S]+)/([\\S]+)$", Pattern.CASE_INSENSITIVE).matcher(onlineInstancePath);
     }
     
-    private Optional<GovernanceEvent> createShowProcessListTriggerEvent(final DataChangedEvent event) {
-        Matcher matcher = getShowProcessTriggerMatcher(event);
+    private Optional<GovernanceEvent> createReportLocalProcessesEvent(final DataChangedEvent event) {
+        Matcher matcher = getShowProcessListTriggerMatcher(event);
         if (!matcher.find()) {
             return Optional.empty();
         }
         if (Type.ADDED == event.getType()) {
-            return Optional.of(new ShowProcessListTriggerEvent(matcher.group(1), matcher.group(2)));
+            return Optional.of(new ReportLocalProcessesEvent(matcher.group(1), matcher.group(2)));
         }
         if (Type.DELETED == event.getType()) {
             return Optional.of(new ReportLocalProcessesCompletedEvent(matcher.group(2)));
@@ -122,26 +122,26 @@ public final class ComputeNodeStateChangedWatcher implements GovernanceWatcher<G
         return Optional.empty();
     }
     
-    private static Matcher getShowProcessTriggerMatcher(final DataChangedEvent event) {
-        return Pattern.compile(ComputeNode.getProcessTriggerNodePatch() + "/([\\S]+):([\\S]+)$", Pattern.CASE_INSENSITIVE).matcher(event.getKey());
+    private static Matcher getShowProcessListTriggerMatcher(final DataChangedEvent event) {
+        return Pattern.compile(ComputeNode.getShowProcessListTriggerNodePath() + "/([\\S]+):([\\S]+)$", Pattern.CASE_INSENSITIVE).matcher(event.getKey());
     }
     
-    private Optional<GovernanceEvent> createKillProcessIdEvent(final DataChangedEvent event) {
-        Matcher matcher = getKillProcessIdMatcher(event);
+    private Optional<GovernanceEvent> createKillLocalProcessEvent(final DataChangedEvent event) {
+        Matcher matcher = getKillProcessTriggerMatcher(event);
         if (!matcher.find()) {
             return Optional.empty();
         }
         if (Type.ADDED == event.getType()) {
-            return Optional.of(new KillProcessEvent(matcher.group(1), matcher.group(2)));
+            return Optional.of(new KillLocalProcessEvent(matcher.group(1), matcher.group(2)));
         }
         if (Type.DELETED == event.getType()) {
-            return Optional.of(new KillProcessCompletedEvent(matcher.group(2)));
+            return Optional.of(new KillLocalProcessCompletedEvent(matcher.group(2)));
         }
         return Optional.empty();
     }
     
-    private static Matcher getKillProcessIdMatcher(final DataChangedEvent event) {
-        Pattern pattern = Pattern.compile(ComputeNode.getProcessKillNodePatch() + "/([\\S]+):([\\S]+)$", Pattern.CASE_INSENSITIVE);
+    private static Matcher getKillProcessTriggerMatcher(final DataChangedEvent event) {
+        Pattern pattern = Pattern.compile(ComputeNode.getKillProcessTriggerNodePath() + "/([\\S]+):([\\S]+)$", Pattern.CASE_INSENSITIVE);
         return pattern.matcher(event.getKey());
     }
 }
diff --git a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
index e57eac3c54b..31011afc267 100644
--- a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
+++ b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
@@ -35,10 +35,10 @@ import org.apache.shardingsphere.mode.manager.ContextManager;
 import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter;
 import org.apache.shardingsphere.mode.manager.cluster.ClusterContextManagerBuilder;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillProcessEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillProcessCompletedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillLocalProcessCompletedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillLocalProcessEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ReportLocalProcessesCompletedEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListTriggerEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ReportLocalProcessesEvent;
 import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
 import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
@@ -47,7 +47,6 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.Answers;
 import org.mockito.Mock;
-import org.mockito.internal.configuration.plugins.Plugins;
 import org.mockito.junit.jupiter.MockitoExtension;
 
 import java.sql.SQLException;
@@ -73,6 +72,8 @@ class ProcessListChangedSubscriberTest {
     
     private ContextManager contextManager;
     
+    private RegistryCenter registryCenter;
+    
     @Mock(answer = Answers.RETURNS_DEEP_STUBS)
     private ShardingSphereDatabase database;
     
@@ -81,7 +82,8 @@ class ProcessListChangedSubscriberTest {
         contextManager = new ClusterContextManagerBuilder().build(createContextManagerBuilderParameter());
         contextManager.renewMetaDataContexts(new MetaDataContexts(contextManager.getMetaDataContexts().getPersistService(), new ShardingSphereMetaData(createDatabases(),
                 contextManager.getMetaDataContexts().getMetaData().getGlobalRuleMetaData(), new ConfigurationProperties(new Properties()))));
-        subscriber = new ProcessListChangedSubscriber(new RegistryCenter(mock(ClusterPersistRepository.class), new EventBusContext(), mock(ProxyInstanceMetaData.class), null), contextManager);
+        registryCenter = new RegistryCenter(mock(ClusterPersistRepository.class), new EventBusContext(), mock(ProxyInstanceMetaData.class), null);
+        subscriber = new ProcessListChangedSubscriber(registryCenter, contextManager);
     }
     
     private ContextManagerBuilderParameter createContextManagerBuilderParameter() {
@@ -103,14 +105,14 @@ class ProcessListChangedSubscriberTest {
     }
     
     @Test
-    void assertReportLocalProcesses() throws ReflectiveOperationException {
+    void assertReportLocalProcesses() {
         String instanceId = contextManager.getInstanceContext().getInstance().getMetaData().getId();
         Process process = mock(Process.class);
         String processId = "foo_id";
         when(process.getId()).thenReturn(processId);
         ProcessRegistry.getInstance().add(process);
-        subscriber.reportLocalProcesses(new ShowProcessListTriggerEvent(instanceId, processId));
-        ClusterPersistRepository repository = ((RegistryCenter) Plugins.getMemberAccessor().get(ProcessListChangedSubscriber.class.getDeclaredField("registryCenter"), subscriber)).getRepository();
+        subscriber.reportLocalProcesses(new ReportLocalProcessesEvent(instanceId, processId));
+        ClusterPersistRepository repository = registryCenter.getRepository();
         verify(repository).persist("/execution_nodes/foo_id/" + instanceId,
                 "processes:" + System.lineSeparator() + "- completedUnitCount: 0\n  id: foo_id\n  idle: false\n  startMillis: 0\n  totalUnitCount: 0" + System.lineSeparator());
         verify(repository).delete("/nodes/compute_nodes/process_trigger/" + instanceId + ":foo_id");
@@ -134,16 +136,16 @@ class ProcessListChangedSubscriberTest {
     }
     
     @Test
-    void assertKillProcess() throws SQLException, ReflectiveOperationException {
+    void assertKillLocalProcess() throws SQLException {
         String instanceId = contextManager.getInstanceContext().getInstance().getMetaData().getId();
         String processId = "foo_id";
-        subscriber.killProcess(new KillProcessEvent(instanceId, processId));
-        ClusterPersistRepository repository = ((RegistryCenter) Plugins.getMemberAccessor().get(ProcessListChangedSubscriber.class.getDeclaredField("registryCenter"), subscriber)).getRepository();
+        subscriber.killLocalProcess(new KillLocalProcessEvent(instanceId, processId));
+        ClusterPersistRepository repository = registryCenter.getRepository();
         verify(repository).delete("/nodes/compute_nodes/process_kill/" + instanceId + ":foo_id");
     }
     
     @Test
-    void assertCompleteToKillProcess() {
+    void assertCompleteToKillLocalProcess() {
         String processId = "foo_id";
         long startMillis = System.currentTimeMillis();
         Executors.newFixedThreadPool(1).submit(() -> {
@@ -151,7 +153,7 @@ class ProcessListChangedSubscriberTest {
                 Thread.sleep(50L);
             } catch (final InterruptedException ignored) {
             }
-            subscriber.completeToKillProcess(new KillProcessCompletedEvent(processId));
+            subscriber.completeToKillLocalProcess(new KillLocalProcessCompletedEvent(processId));
         });
         waitUntilReleaseReady(processId);
         long currentMillis = System.currentTimeMillis();