You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by du...@apache.org on 2022/11/09 02:29:20 UTC

[shardingsphere] branch master updated: Revise pr 18951 (#22014)

This is an automated email from the ASF dual-hosted git repository.

duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 76634216073 Revise pr 18951 (#22014)
76634216073 is described below

commit 7663421607367c3443adf78ca936485355aa6d06
Author: zhaojinchao <zh...@apache.org>
AuthorDate: Wed Nov 9 10:29:14 2022 +0800

    Revise pr 18951 (#22014)
    
    * Revise pr 18951
    
    * Fix it
---
 .../infra/instance/InstanceContext.java            | 13 +++++++++
 .../status/compute/event/WorkerIdEvent.java        | 34 ++++++++++++++++++++++
 .../compute/service/ComputeNodeStatusService.java  |  1 +
 .../watcher/ComputeNodeStateChangedWatcher.java    |  4 +++
 .../subscriber/StateChangedSubscriber.java         | 11 +++++++
 .../subscriber/StateChangedSubscriberTest.java     |  7 +++++
 .../empty_rules/cluster/show_compute_nodes.xml     |  2 +-
 7 files changed, 71 insertions(+), 1 deletion(-)

diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java
index aa57d857500..3eb59664b1e 100644
--- a/infra/common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java
+++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java
@@ -75,6 +75,19 @@ public final class InstanceContext {
         }
     }
     
+    /**
+     * Update instance worker id.
+     *
+     * @param instanceId instance id
+     * @param workerId worker id
+     */
+    public void updateWorkerId(final String instanceId, final Long workerId) {
+        if (instance.getMetaData().getId().equals(instanceId)) {
+            instance.setWorkerId(workerId);
+        }
+        allClusterInstances.stream().filter(each -> each.getMetaData().getId().equals(instanceId)).forEach(each -> each.setWorkerId(workerId));
+    }
+    
     /**
      * Update instance label.
      * 
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/WorkerIdEvent.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/WorkerIdEvent.java
new file mode 100644
index 00000000000..4ce176ce66b
--- /dev/null
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/WorkerIdEvent.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
+
+/**
+ * Worker id event.
+ */
+@RequiredArgsConstructor
+@Getter
+public final class WorkerIdEvent implements GovernanceEvent {
+    
+    private final String instanceId;
+    
+    private final Long workerId;
+}
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
index 6ad59749599..2fa0f494718 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
@@ -146,6 +146,7 @@ public final class ComputeNodeStatusService {
         ComputeNodeInstance result = new ComputeNodeInstance(instanceMetaData);
         result.setLabels(loadInstanceLabels(instanceMetaData.getId()));
         result.switchState(loadInstanceStatus(instanceMetaData.getId()));
+        loadInstanceWorkerId(instanceMetaData.getId()).ifPresent(result::setWorkerId);
         return result;
     }
     
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
index 7e11bdc5d39..a17b49eefa4 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
@@ -32,6 +32,7 @@ import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.statu
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListTriggerEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListUnitCompleteEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.StateEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.WorkerIdEvent;
 import org.apache.shardingsphere.mode.metadata.persist.node.ComputeNode;
 import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
 import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
@@ -68,6 +69,9 @@ public final class ComputeNodeStateChangedWatcher implements GovernanceWatcher<G
                 Collection<String> status = Strings.isNullOrEmpty(event.getValue()) ? new ArrayList<>() : YamlEngine.unmarshal(event.getValue(), Collection.class);
                 return Optional.of(new StateEvent(instanceId, status));
             }
+            if (event.getKey().equals(ComputeNode.getInstanceWorkerIdNodePath(instanceId))) {
+                return Optional.of(new WorkerIdEvent(instanceId, Strings.isNullOrEmpty(event.getValue()) ? null : Long.valueOf(event.getValue())));
+            }
             if (event.getKey().equals(ComputeNode.getInstanceLabelsNodePath(instanceId))) {
                 return Optional.of(new LabelsEvent(instanceId, Strings.isNullOrEmpty(event.getValue()) ? new ArrayList<>() : YamlEngine.unmarshal(event.getValue(), Collection.class)));
             }
diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriber.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriber.java
index 794e189ff51..20390036f02 100644
--- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriber.java
+++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriber.java
@@ -30,6 +30,7 @@ import org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.InstanceOfflineEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.InstanceOnlineEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.LabelsEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.WorkerIdEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.StateEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.PrimaryStateChangedEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.StorageNodeChangedEvent;
@@ -107,6 +108,16 @@ public final class StateChangedSubscriber {
         contextManager.getInstanceContext().updateInstanceStatus(event.getInstanceId(), event.getStatus());
     }
     
+    /**
+     * Renew instance worker id.
+     *
+     * @param event worker id event
+     */
+    @Subscribe
+    public synchronized void renew(final WorkerIdEvent event) {
+        contextManager.getInstanceContext().updateWorkerId(event.getInstanceId(), event.getWorkerId());
+    }
+    
     /**
      * Renew instance labels.
      * 
diff --git a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java
index ba5f3eecbb8..85119638d73 100644
--- a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java
+++ b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java
@@ -42,6 +42,7 @@ import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.statu
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.InstanceOnlineEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.LabelsEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.StateEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.WorkerIdEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.PrimaryStateChangedEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.StorageNodeChangedEvent;
 import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
@@ -152,6 +153,12 @@ public final class StateChangedSubscriberTest {
         assertThat(contextManager.getInstanceContext().getInstance().getState().getCurrentState(), is(StateType.CIRCUIT_BREAK));
     }
     
+    @Test
+    public void assertRenewInstanceWorkerIdEvent() {
+        subscriber.renew(new WorkerIdEvent(contextManager.getInstanceContext().getInstance().getMetaData().getId(), 0L));
+        assertThat(contextManager.getInstanceContext().getInstance().getWorkerId(), is(0L));
+    }
+    
     @Test
     public void assertRenewInstanceLabels() {
         Collection<String> labels = Collections.singleton("test");
diff --git a/test/integration-test/test-suite/src/test/resources/cases/ral/dataset/empty_rules/cluster/show_compute_nodes.xml b/test/integration-test/test-suite/src/test/resources/cases/ral/dataset/empty_rules/cluster/show_compute_nodes.xml
index ff52a8a3b23..33e60507c67 100644
--- a/test/integration-test/test-suite/src/test/resources/cases/ral/dataset/empty_rules/cluster/show_compute_nodes.xml
+++ b/test/integration-test/test-suite/src/test/resources/cases/ral/dataset/empty_rules/cluster/show_compute_nodes.xml
@@ -25,5 +25,5 @@
         <column name="worker_id" />
         <column name="labels" />
     </metadata>
-    <row values=" | | 3307 | OK | Cluster | -1 | " />
+    <row values=" | | 3307 | OK | Cluster | 0 | " />
 </dataset>