You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2023/05/04 15:15:30 UTC
[shardingsphere] branch master updated: Refactor ComputeNodeStateChangedWatcher (#25457)
This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 2b2650d1d1e Refactor ComputeNodeStateChangedWatcher (#25457)
2b2650d1d1e is described below
commit 2b2650d1d1e2960cb92d01981a52a39f7cfc3db7
Author: Liang Zhang <zh...@apache.org>
AuthorDate: Thu May 4 23:15:21 2023 +0800
Refactor ComputeNodeStateChangedWatcher (#25457)
---
.../metadata/persist/node/ComputeNode.java | 24 ++++++-------
.../metadata/persist/node/ComputeNodeTest.java | 2 +-
.../subscriber/ProcessListChangedSubscriber.java | 20 +++++------
...nt.java => KillLocalProcessCompletedEvent.java} | 4 +--
...rocessEvent.java => KillLocalProcessEvent.java} | 4 +--
...erEvent.java => ReportLocalProcessesEvent.java} | 4 +--
.../watcher/ComputeNodeStateChangedWatcher.java | 40 +++++++++++-----------
.../ProcessListChangedSubscriberTest.java | 28 ++++++++-------
8 files changed, 64 insertions(+), 62 deletions(-)
diff --git a/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/node/ComputeNode.java b/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/node/ComputeNode.java
index 25dfd4dbd7f..c21c08cc872 100644
--- a/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/node/ComputeNode.java
+++ b/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/node/ComputeNode.java
@@ -38,9 +38,9 @@ public final class ComputeNode {
private static final String LABELS_NODE = "labels";
- private static final String PROCESS_TRIGGER = "process_trigger";
+ private static final String SHOW_PROCESS_LIST_TRIGGER = "process_trigger";
- private static final String PROCESS_KILL = "process_kill";
+ private static final String KILL_PROCESS_TRIGGER = "process_kill";
private static final String STATUS_NODE = "status";
@@ -77,21 +77,21 @@ public final class ComputeNode {
}
/**
- * Get process trigger node path.
+ * Get show process list trigger node path.
*
- * @return path of process trigger node path
+ * @return show process list trigger node path
*/
- public static String getProcessTriggerNodePatch() {
- return String.join("/", "", ROOT_NODE, COMPUTE_NODE, PROCESS_TRIGGER);
+ public static String getShowProcessListTriggerNodePath() {
+ return String.join("/", "", ROOT_NODE, COMPUTE_NODE, SHOW_PROCESS_LIST_TRIGGER);
}
/**
- * Get process kill node path.
+ * Get kill process trigger node path.
*
- * @return path of process kill node path
+ * @return kill process trigger node path
*/
- public static String getProcessKillNodePatch() {
- return String.join("/", "", ROOT_NODE, COMPUTE_NODE, PROCESS_KILL);
+ public static String getKillProcessTriggerNodePath() {
+ return String.join("/", "", ROOT_NODE, COMPUTE_NODE, KILL_PROCESS_TRIGGER);
}
/**
@@ -102,7 +102,7 @@ public final class ComputeNode {
* @return path of process trigger instance node path
*/
public static String getProcessTriggerInstanceNodePath(final String instanceId, final String taskId) {
- return String.join("/", "", ROOT_NODE, COMPUTE_NODE, PROCESS_TRIGGER, String.join(":", instanceId, taskId));
+ return String.join("/", "", ROOT_NODE, COMPUTE_NODE, SHOW_PROCESS_LIST_TRIGGER, String.join(":", instanceId, taskId));
}
/**
@@ -113,7 +113,7 @@ public final class ComputeNode {
* @return path of process kill instance id node path
*/
public static String getProcessKillInstanceIdNodePath(final String instanceId, final String processId) {
- return String.join("/", "", ROOT_NODE, COMPUTE_NODE, PROCESS_KILL, String.join(":", instanceId, processId));
+ return String.join("/", "", ROOT_NODE, COMPUTE_NODE, KILL_PROCESS_TRIGGER, String.join(":", instanceId, processId));
}
/**
diff --git a/kernel/metadata/core/src/test/java/org/apache/shardingsphere/metadata/persist/node/ComputeNodeTest.java b/kernel/metadata/core/src/test/java/org/apache/shardingsphere/metadata/persist/node/ComputeNodeTest.java
index 725fc8eb35b..42b282e2580 100644
--- a/kernel/metadata/core/src/test/java/org/apache/shardingsphere/metadata/persist/node/ComputeNodeTest.java
+++ b/kernel/metadata/core/src/test/java/org/apache/shardingsphere/metadata/persist/node/ComputeNodeTest.java
@@ -39,7 +39,7 @@ class ComputeNodeTest {
@Test
void assertGetProcessTriggerNodePatch() {
- assertThat(ComputeNode.getProcessTriggerNodePatch(), is("/nodes/compute_nodes/process_trigger"));
+ assertThat(ComputeNode.getShowProcessListTriggerNodePath(), is("/nodes/compute_nodes/process_trigger"));
}
@Test
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriber.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriber.java
index 0213e98375e..42bf40f9ad7 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriber.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriber.java
@@ -27,10 +27,10 @@ import org.apache.shardingsphere.metadata.persist.node.ComputeNode;
import org.apache.shardingsphere.metadata.persist.node.ProcessNode;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillProcessEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillProcessCompletedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillLocalProcessEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillLocalProcessCompletedEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ReportLocalProcessesCompletedEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListTriggerEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ReportLocalProcessesEvent;
import java.sql.SQLException;
import java.sql.Statement;
@@ -60,7 +60,7 @@ public final class ProcessListChangedSubscriber {
* @param event show process list trigger event
*/
@Subscribe
- public void reportLocalProcesses(final ShowProcessListTriggerEvent event) {
+ public void reportLocalProcesses(final ReportLocalProcessesEvent event) {
if (!event.getInstanceId().equals(contextManager.getInstanceContext().getInstance().getMetaData().getId())) {
return;
}
@@ -83,13 +83,13 @@ public final class ProcessListChangedSubscriber {
}
/**
- * Kill process.
+ * Kill local process.
*
- * @param event kill process id event
+ * @param event kill local process event
* @throws SQLException SQL exception
*/
@Subscribe
- public synchronized void killProcess(final KillProcessEvent event) throws SQLException {
+ public synchronized void killLocalProcess(final KillLocalProcessEvent event) throws SQLException {
if (!event.getInstanceId().equals(contextManager.getInstanceContext().getInstance().getMetaData().getId())) {
return;
}
@@ -103,12 +103,12 @@ public final class ProcessListChangedSubscriber {
}
/**
- * Complete to kill process.
+ * Complete to kill local process.
*
- * @param event kill process completed event
+ * @param event kill local process completed event
*/
@Subscribe
- public synchronized void completeToKillProcess(final KillProcessCompletedEvent event) {
+ public synchronized void completeToKillLocalProcess(final KillLocalProcessCompletedEvent event) {
ProcessOperationLockRegistry.getInstance().notify(event.getProcessId());
}
}
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/KillProcessCompletedEvent.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/KillLocalProcessCompletedEvent.java
similarity index 90%
rename from mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/KillProcessCompletedEvent.java
rename to mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/KillLocalProcessCompletedEvent.java
index 4d130fd4859..200cb2997ec 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/KillProcessCompletedEvent.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/KillLocalProcessCompletedEvent.java
@@ -22,11 +22,11 @@ import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
/**
- * Kill process completed event.
+ * Kill local process completed event.
*/
@RequiredArgsConstructor
@Getter
-public final class KillProcessCompletedEvent implements GovernanceEvent {
+public final class KillLocalProcessCompletedEvent implements GovernanceEvent {
private final String processId;
}
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/KillProcessEvent.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/KillLocalProcessEvent.java
similarity index 92%
rename from mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/KillProcessEvent.java
rename to mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/KillLocalProcessEvent.java
index 95ccbf5e396..fa48cf31ff6 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/KillProcessEvent.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/KillLocalProcessEvent.java
@@ -22,11 +22,11 @@ import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
/**
- * Kill process event.
+ * Kill local process event.
*/
@RequiredArgsConstructor
@Getter
-public final class KillProcessEvent implements GovernanceEvent {
+public final class KillLocalProcessEvent implements GovernanceEvent {
private final String instanceId;
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/ShowProcessListTriggerEvent.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/ReportLocalProcessesEvent.java
similarity index 91%
rename from mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/ShowProcessListTriggerEvent.java
rename to mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/ReportLocalProcessesEvent.java
index 1d737634ad7..4e149799680 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/ShowProcessListTriggerEvent.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/ReportLocalProcessesEvent.java
@@ -22,11 +22,11 @@ import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
/**
- * Show processlist trigger event.
+ * Report local processes event.
*/
@RequiredArgsConstructor
@Getter
-public final class ShowProcessListTriggerEvent implements GovernanceEvent {
+public final class ReportLocalProcessesEvent implements GovernanceEvent {
private final String instanceId;
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
index 44aea4d6bc4..3c2d145c4e9 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
@@ -28,11 +28,11 @@ import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.Gover
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.InstanceOfflineEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.InstanceOnlineEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillProcessEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillProcessCompletedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillLocalProcessEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillLocalProcessCompletedEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.LabelsEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ReportLocalProcessesCompletedEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListTriggerEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ReportLocalProcessesEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.StateEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.WorkerIdEvent;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
@@ -79,17 +79,17 @@ public final class ComputeNodeStateChangedWatcher implements GovernanceWatcher<G
if (event.getKey().startsWith(ComputeNode.getOnlineInstanceNodePath())) {
return createInstanceEvent(event);
}
- if (event.getKey().startsWith(ComputeNode.getProcessTriggerNodePatch())) {
- return createShowProcessListTriggerEvent(event);
+ if (event.getKey().startsWith(ComputeNode.getShowProcessListTriggerNodePath())) {
+ return createReportLocalProcessesEvent(event);
}
- if (event.getKey().startsWith(ComputeNode.getProcessKillNodePatch())) {
- return createKillProcessIdEvent(event);
+ if (event.getKey().startsWith(ComputeNode.getKillProcessTriggerNodePath())) {
+ return createKillLocalProcessEvent(event);
}
return Optional.empty();
}
private Optional<GovernanceEvent> createInstanceEvent(final DataChangedEvent event) {
- Matcher matcher = matchInstanceOnlinePath(event.getKey());
+ Matcher matcher = getInstanceOnlinePathMatcher(event.getKey());
if (matcher.find()) {
ComputeNodeData computeNodeData = YamlEngine.unmarshal(event.getValue(), ComputeNodeData.class);
InstanceMetaData instanceMetaData = InstanceMetaDataFactory.create(matcher.group(2),
@@ -104,17 +104,17 @@ public final class ComputeNodeStateChangedWatcher implements GovernanceWatcher<G
return Optional.empty();
}
- private Matcher matchInstanceOnlinePath(final String onlineInstancePath) {
+ private Matcher getInstanceOnlinePathMatcher(final String onlineInstancePath) {
return Pattern.compile(ComputeNode.getOnlineInstanceNodePath() + "/([\\S]+)/([\\S]+)$", Pattern.CASE_INSENSITIVE).matcher(onlineInstancePath);
}
- private Optional<GovernanceEvent> createShowProcessListTriggerEvent(final DataChangedEvent event) {
- Matcher matcher = getShowProcessTriggerMatcher(event);
+ private Optional<GovernanceEvent> createReportLocalProcessesEvent(final DataChangedEvent event) {
+ Matcher matcher = getShowProcessListTriggerMatcher(event);
if (!matcher.find()) {
return Optional.empty();
}
if (Type.ADDED == event.getType()) {
- return Optional.of(new ShowProcessListTriggerEvent(matcher.group(1), matcher.group(2)));
+ return Optional.of(new ReportLocalProcessesEvent(matcher.group(1), matcher.group(2)));
}
if (Type.DELETED == event.getType()) {
return Optional.of(new ReportLocalProcessesCompletedEvent(matcher.group(2)));
@@ -122,26 +122,26 @@ public final class ComputeNodeStateChangedWatcher implements GovernanceWatcher<G
return Optional.empty();
}
- private static Matcher getShowProcessTriggerMatcher(final DataChangedEvent event) {
- return Pattern.compile(ComputeNode.getProcessTriggerNodePatch() + "/([\\S]+):([\\S]+)$", Pattern.CASE_INSENSITIVE).matcher(event.getKey());
+ private static Matcher getShowProcessListTriggerMatcher(final DataChangedEvent event) {
+ return Pattern.compile(ComputeNode.getShowProcessListTriggerNodePath() + "/([\\S]+):([\\S]+)$", Pattern.CASE_INSENSITIVE).matcher(event.getKey());
}
- private Optional<GovernanceEvent> createKillProcessIdEvent(final DataChangedEvent event) {
- Matcher matcher = getKillProcessIdMatcher(event);
+ private Optional<GovernanceEvent> createKillLocalProcessEvent(final DataChangedEvent event) {
+ Matcher matcher = getKillProcessTriggerMatcher(event);
if (!matcher.find()) {
return Optional.empty();
}
if (Type.ADDED == event.getType()) {
- return Optional.of(new KillProcessEvent(matcher.group(1), matcher.group(2)));
+ return Optional.of(new KillLocalProcessEvent(matcher.group(1), matcher.group(2)));
}
if (Type.DELETED == event.getType()) {
- return Optional.of(new KillProcessCompletedEvent(matcher.group(2)));
+ return Optional.of(new KillLocalProcessCompletedEvent(matcher.group(2)));
}
return Optional.empty();
}
- private static Matcher getKillProcessIdMatcher(final DataChangedEvent event) {
- Pattern pattern = Pattern.compile(ComputeNode.getProcessKillNodePatch() + "/([\\S]+):([\\S]+)$", Pattern.CASE_INSENSITIVE);
+ private static Matcher getKillProcessTriggerMatcher(final DataChangedEvent event) {
+ Pattern pattern = Pattern.compile(ComputeNode.getKillProcessTriggerNodePath() + "/([\\S]+):([\\S]+)$", Pattern.CASE_INSENSITIVE);
return pattern.matcher(event.getKey());
}
}
diff --git a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
index e57eac3c54b..31011afc267 100644
--- a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
+++ b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
@@ -35,10 +35,10 @@ import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter;
import org.apache.shardingsphere.mode.manager.cluster.ClusterContextManagerBuilder;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillProcessEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillProcessCompletedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillLocalProcessCompletedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillLocalProcessEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ReportLocalProcessesCompletedEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListTriggerEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ReportLocalProcessesEvent;
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
@@ -47,7 +47,6 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Answers;
import org.mockito.Mock;
-import org.mockito.internal.configuration.plugins.Plugins;
import org.mockito.junit.jupiter.MockitoExtension;
import java.sql.SQLException;
@@ -73,6 +72,8 @@ class ProcessListChangedSubscriberTest {
private ContextManager contextManager;
+ private RegistryCenter registryCenter;
+
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
private ShardingSphereDatabase database;
@@ -81,7 +82,8 @@ class ProcessListChangedSubscriberTest {
contextManager = new ClusterContextManagerBuilder().build(createContextManagerBuilderParameter());
contextManager.renewMetaDataContexts(new MetaDataContexts(contextManager.getMetaDataContexts().getPersistService(), new ShardingSphereMetaData(createDatabases(),
contextManager.getMetaDataContexts().getMetaData().getGlobalRuleMetaData(), new ConfigurationProperties(new Properties()))));
- subscriber = new ProcessListChangedSubscriber(new RegistryCenter(mock(ClusterPersistRepository.class), new EventBusContext(), mock(ProxyInstanceMetaData.class), null), contextManager);
+ registryCenter = new RegistryCenter(mock(ClusterPersistRepository.class), new EventBusContext(), mock(ProxyInstanceMetaData.class), null);
+ subscriber = new ProcessListChangedSubscriber(registryCenter, contextManager);
}
private ContextManagerBuilderParameter createContextManagerBuilderParameter() {
@@ -103,14 +105,14 @@ class ProcessListChangedSubscriberTest {
}
@Test
- void assertReportLocalProcesses() throws ReflectiveOperationException {
+ void assertReportLocalProcesses() {
String instanceId = contextManager.getInstanceContext().getInstance().getMetaData().getId();
Process process = mock(Process.class);
String processId = "foo_id";
when(process.getId()).thenReturn(processId);
ProcessRegistry.getInstance().add(process);
- subscriber.reportLocalProcesses(new ShowProcessListTriggerEvent(instanceId, processId));
- ClusterPersistRepository repository = ((RegistryCenter) Plugins.getMemberAccessor().get(ProcessListChangedSubscriber.class.getDeclaredField("registryCenter"), subscriber)).getRepository();
+ subscriber.reportLocalProcesses(new ReportLocalProcessesEvent(instanceId, processId));
+ ClusterPersistRepository repository = registryCenter.getRepository();
verify(repository).persist("/execution_nodes/foo_id/" + instanceId,
"processes:" + System.lineSeparator() + "- completedUnitCount: 0\n id: foo_id\n idle: false\n startMillis: 0\n totalUnitCount: 0" + System.lineSeparator());
verify(repository).delete("/nodes/compute_nodes/process_trigger/" + instanceId + ":foo_id");
@@ -134,16 +136,16 @@ class ProcessListChangedSubscriberTest {
}
@Test
- void assertKillProcess() throws SQLException, ReflectiveOperationException {
+ void assertKillLocalProcess() throws SQLException {
String instanceId = contextManager.getInstanceContext().getInstance().getMetaData().getId();
String processId = "foo_id";
- subscriber.killProcess(new KillProcessEvent(instanceId, processId));
- ClusterPersistRepository repository = ((RegistryCenter) Plugins.getMemberAccessor().get(ProcessListChangedSubscriber.class.getDeclaredField("registryCenter"), subscriber)).getRepository();
+ subscriber.killLocalProcess(new KillLocalProcessEvent(instanceId, processId));
+ ClusterPersistRepository repository = registryCenter.getRepository();
verify(repository).delete("/nodes/compute_nodes/process_kill/" + instanceId + ":foo_id");
}
@Test
- void assertCompleteToKillProcess() {
+ void assertCompleteToKillLocalProcess() {
String processId = "foo_id";
long startMillis = System.currentTimeMillis();
Executors.newFixedThreadPool(1).submit(() -> {
@@ -151,7 +153,7 @@ class ProcessListChangedSubscriberTest {
Thread.sleep(50L);
} catch (final InterruptedException ignored) {
}
- subscriber.completeToKillProcess(new KillProcessCompletedEvent(processId));
+ subscriber.completeToKillLocalProcess(new KillLocalProcessCompletedEvent(processId));
});
waitUntilReleaseReady(processId);
long currentMillis = System.currentTimeMillis();