You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by du...@apache.org on 2022/11/09 02:29:20 UTC
[shardingsphere] branch master updated: Revise pr 18951 (#22014)
This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 76634216073 Revise pr 18951 (#22014)
76634216073 is described below
commit 7663421607367c3443adf78ca936485355aa6d06
Author: zhaojinchao <zh...@apache.org>
AuthorDate: Wed Nov 9 10:29:14 2022 +0800
Revise pr 18951 (#22014)
* Revise pr 18951
* Fix it
---
.../infra/instance/InstanceContext.java | 13 +++++++++
.../status/compute/event/WorkerIdEvent.java | 34 ++++++++++++++++++++++
.../compute/service/ComputeNodeStatusService.java | 1 +
.../watcher/ComputeNodeStateChangedWatcher.java | 4 +++
.../subscriber/StateChangedSubscriber.java | 11 +++++++
.../subscriber/StateChangedSubscriberTest.java | 7 +++++
.../empty_rules/cluster/show_compute_nodes.xml | 2 +-
7 files changed, 71 insertions(+), 1 deletion(-)
diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java
index aa57d857500..3eb59664b1e 100644
--- a/infra/common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java
+++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java
@@ -75,6 +75,19 @@ public final class InstanceContext {
}
}
+ /**
+ * Update instance worker id.
+ *
+ * @param instanceId instance id
+ * @param workerId worker id
+ */
+ public void updateWorkerId(final String instanceId, final Long workerId) {
+ if (instance.getMetaData().getId().equals(instanceId)) {
+ instance.setWorkerId(workerId);
+ }
+ allClusterInstances.stream().filter(each -> each.getMetaData().getId().equals(instanceId)).forEach(each -> each.setWorkerId(workerId));
+ }
+
/**
* Update instance label.
*
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/WorkerIdEvent.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/WorkerIdEvent.java
new file mode 100644
index 00000000000..4ce176ce66b
--- /dev/null
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/WorkerIdEvent.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
+
+/**
+ * Worker id event.
+ */
+@RequiredArgsConstructor
+@Getter
+public final class WorkerIdEvent implements GovernanceEvent {
+
+ private final String instanceId;
+
+ private final Long workerId;
+}
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
index 6ad59749599..2fa0f494718 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
@@ -146,6 +146,7 @@ public final class ComputeNodeStatusService {
ComputeNodeInstance result = new ComputeNodeInstance(instanceMetaData);
result.setLabels(loadInstanceLabels(instanceMetaData.getId()));
result.switchState(loadInstanceStatus(instanceMetaData.getId()));
+ loadInstanceWorkerId(instanceMetaData.getId()).ifPresent(result::setWorkerId);
return result;
}
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
index 7e11bdc5d39..a17b49eefa4 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
@@ -32,6 +32,7 @@ import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.statu
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListTriggerEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListUnitCompleteEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.StateEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.WorkerIdEvent;
import org.apache.shardingsphere.mode.metadata.persist.node.ComputeNode;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
@@ -68,6 +69,9 @@ public final class ComputeNodeStateChangedWatcher implements GovernanceWatcher<G
Collection<String> status = Strings.isNullOrEmpty(event.getValue()) ? new ArrayList<>() : YamlEngine.unmarshal(event.getValue(), Collection.class);
return Optional.of(new StateEvent(instanceId, status));
}
+ if (event.getKey().equals(ComputeNode.getInstanceWorkerIdNodePath(instanceId))) {
+ return Optional.of(new WorkerIdEvent(instanceId, Strings.isNullOrEmpty(event.getValue()) ? null : Long.valueOf(event.getValue())));
+ }
if (event.getKey().equals(ComputeNode.getInstanceLabelsNodePath(instanceId))) {
return Optional.of(new LabelsEvent(instanceId, Strings.isNullOrEmpty(event.getValue()) ? new ArrayList<>() : YamlEngine.unmarshal(event.getValue(), Collection.class)));
}
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriber.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriber.java
index 794e189ff51..20390036f02 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriber.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriber.java
@@ -30,6 +30,7 @@ import org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.InstanceOfflineEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.InstanceOnlineEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.LabelsEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.WorkerIdEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.StateEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.PrimaryStateChangedEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.StorageNodeChangedEvent;
@@ -107,6 +108,16 @@ public final class StateChangedSubscriber {
contextManager.getInstanceContext().updateInstanceStatus(event.getInstanceId(), event.getStatus());
}
+ /**
+ * Renew instance worker id.
+ *
+ * @param event worker id event
+ */
+ @Subscribe
+ public synchronized void renew(final WorkerIdEvent event) {
+ contextManager.getInstanceContext().updateWorkerId(event.getInstanceId(), event.getWorkerId());
+ }
+
/**
* Renew instance labels.
*
diff --git a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java
index ba5f3eecbb8..85119638d73 100644
--- a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java
+++ b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java
@@ -42,6 +42,7 @@ import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.statu
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.InstanceOnlineEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.LabelsEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.StateEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.WorkerIdEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.PrimaryStateChangedEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.StorageNodeChangedEvent;
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
@@ -152,6 +153,12 @@ public final class StateChangedSubscriberTest {
assertThat(contextManager.getInstanceContext().getInstance().getState().getCurrentState(), is(StateType.CIRCUIT_BREAK));
}
+ @Test
+ public void assertRenewInstanceWorkerIdEvent() {
+ subscriber.renew(new WorkerIdEvent(contextManager.getInstanceContext().getInstance().getMetaData().getId(), 0L));
+ assertThat(contextManager.getInstanceContext().getInstance().getWorkerId(), is(0L));
+ }
+
@Test
public void assertRenewInstanceLabels() {
Collection<String> labels = Collections.singleton("test");
diff --git a/test/integration-test/test-suite/src/test/resources/cases/ral/dataset/empty_rules/cluster/show_compute_nodes.xml b/test/integration-test/test-suite/src/test/resources/cases/ral/dataset/empty_rules/cluster/show_compute_nodes.xml
index ff52a8a3b23..33e60507c67 100644
--- a/test/integration-test/test-suite/src/test/resources/cases/ral/dataset/empty_rules/cluster/show_compute_nodes.xml
+++ b/test/integration-test/test-suite/src/test/resources/cases/ral/dataset/empty_rules/cluster/show_compute_nodes.xml
@@ -25,5 +25,5 @@
<column name="worker_id" />
<column name="labels" />
</metadata>
- <row values=" | | 3307 | OK | Cluster | -1 | " />
+ <row values=" | | 3307 | OK | Cluster | 0 | " />
</dataset>