You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by me...@apache.org on 2022/07/08 07:10:28 UTC

[shardingsphere] branch master updated: Remove worker-id from compute node instance (#18951)

This is an automated email from the ASF dual-hosted git repository.

menghaoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new ccfc10cb4da Remove worker-id from compute node instance (#18951)
ccfc10cb4da is described below

commit ccfc10cb4dadd5b9effad53391a7fc0477ad731f
Author: gin <ja...@163.com>
AuthorDate: Fri Jul 8 15:10:22 2022 +0800

    Remove worker-id from compute node instance (#18951)
    
    * Remove worker-id from compute node instance
    
    * Fix log info
---
 .../infra/instance/ComputeNodeInstance.java        |  2 --
 .../infra/instance/InstanceContext.java            | 28 +-----------------
 .../metadata/persist/node/ComputeNodeTest.java     |  5 ++++
 .../ClusterContextManagerCoordinator.java          | 13 ---------
 .../status/compute/event/WorkerIdEvent.java        | 34 ----------------------
 .../compute/service/ComputeNodeStatusService.java  |  1 -
 .../watcher/ComputeNodeStateChangedWatcher.java    |  4 ---
 .../generator/ClusterWorkerIdGenerator.java        |  6 ++--
 .../service/ComputeNodeStatusServiceTest.java      |  6 ++++
 .../ComputeNodeStateChangedWatcherTest.java        | 19 ------------
 10 files changed, 15 insertions(+), 103 deletions(-)

diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstance.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstance.java
index 51a68a2e8e7..adda363dc40 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstance.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstance.java
@@ -40,8 +40,6 @@ public final class ComputeNodeInstance {
     
     private Collection<String> labels;
     
-    private Long workerId;
-    
     /**
      * Set labels.
      *
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java
index 352bb7c8f2d..1ff9164163d 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java
@@ -28,7 +28,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Objects;
 import java.util.Optional;
 import java.util.Properties;
 
@@ -77,17 +76,6 @@ public final class InstanceContext {
         }
     }
     
-    /**
-     * Update instance worker id.
-     * 
-     * @param workerId worker id
-     */
-    public void updateWorkerId(final Long workerId) {
-        if (!Objects.equals(workerId, instance.getWorkerId())) {
-            instance.setWorkerId(workerId);
-        }
-    }
-    
     /**
      * Update instance label.
      * 
@@ -101,15 +89,6 @@ public final class InstanceContext {
         computeNodeInstances.stream().filter(each -> each.getInstanceMetaData().getId().equals(instanceId)).forEach(each -> each.setLabels(labels));
     }
     
-    /**
-     * Get worker id.
-     *
-     * @return worker id
-     */
-    public long getWorkerId() {
-        return instance.getWorkerId();
-    }
-    
     /**
      * Generate worker id.
      *
@@ -117,12 +96,7 @@ public final class InstanceContext {
      * @return worker id
      */
     public long generateWorkerId(final Properties props) {
-        Long result = instance.getWorkerId();
-        if (null == result) {
-            result = workerIdGenerator.generate(props);
-            instance.setWorkerId(result);
-        }
-        return result;
+        return workerIdGenerator.generate(props);
     }
     
     /**
diff --git a/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNodeTest.java b/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNodeTest.java
index 8d46c5fed38..8c0a24caa48 100644
--- a/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNodeTest.java
+++ b/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNodeTest.java
@@ -60,6 +60,11 @@ public final class ComputeNodeTest {
         assertThat(ComputeNode.getInstanceWorkerIdNodePath("foo_instance"), is("/nodes/compute_nodes/worker_id/foo_instance"));
     }
     
+    @Test
+    public void assertGetInstanceWorkerIdRootNodePath() {
+        assertThat(ComputeNode.getInstanceWorkerIdRootNodePath(), is("/nodes/compute_nodes/worker_id"));
+    }
+    
     @Test
     public void assertGetInstanceIdByComputeNodePath() {
         assertThat(ComputeNode.getInstanceIdByComputeNode("/nodes/compute_nodes/status/foo_instance_1"), is("foo_instance_1"));
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
index 690ef0b76cd..ad8f3a7e187 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
@@ -48,7 +48,6 @@ import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.statu
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListTriggerEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListUnitCompleteEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.StateEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.WorkerIdEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.PrimaryStateChangedEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.StorageNodeChangedEvent;
 import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
@@ -219,18 +218,6 @@ public final class ClusterContextManagerCoordinator {
         contextManager.getInstanceContext().updateInstanceStatus(event.getInstanceId(), event.getStatus());
     }
     
-    /**
-     * Renew instance worker id.
-     *
-     * @param event worker id event
-     */
-    @Subscribe
-    public synchronized void renew(final WorkerIdEvent event) {
-        if (contextManager.getInstanceContext().getInstance().getInstanceMetaData().getId().equals(event.getInstanceId())) {
-            contextManager.getInstanceContext().updateWorkerId(event.getWorkerId());
-        }
-    }
-    
     /**
      * Renew instance labels.
      * 
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/WorkerIdEvent.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/WorkerIdEvent.java
deleted file mode 100644
index a979386202b..00000000000
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/WorkerIdEvent.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event;
-
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
-
-/**
- * Worker id changed event.
- */
-@RequiredArgsConstructor
-@Getter
-public final class WorkerIdEvent implements GovernanceEvent {
-    
-    private final String instanceId;
-    
-    private final Long workerId;
-}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
index c197e90a2d7..611bd05308a 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
@@ -146,7 +146,6 @@ public final class ComputeNodeStatusService {
         ComputeNodeInstance result = new ComputeNodeInstance(instanceMetaData);
         result.setLabels(loadInstanceLabels(instanceMetaData.getId()));
         result.switchState(loadInstanceStatus(instanceMetaData.getId()));
-        loadInstanceWorkerId(instanceMetaData.getId()).ifPresent(result::setWorkerId);
         return result;
     }
     
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChan [...]
index 35a5c1cc143..c1bfd093142 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
@@ -30,7 +30,6 @@ import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.statu
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListTriggerEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListUnitCompleteEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.StateEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.WorkerIdEvent;
 import org.apache.shardingsphere.mode.metadata.persist.node.ComputeNode;
 import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
 import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
@@ -67,9 +66,6 @@ public final class ComputeNodeStateChangedWatcher implements GovernanceWatcher<G
                 Collection<String> status = Strings.isNullOrEmpty(event.getValue()) ? new ArrayList<>() : YamlEngine.unmarshal(event.getValue(), Collection.class);
                 return Optional.of(new StateEvent(instanceId, status));
             }
-            if (event.getKey().equals(ComputeNode.getInstanceWorkerIdNodePath(instanceId))) {
-                return Optional.of(new WorkerIdEvent(instanceId, Strings.isNullOrEmpty(event.getValue()) ? null : Long.valueOf(event.getValue())));
-            }
             if (event.getKey().equals(ComputeNode.getInstanceLabelsNodePath(instanceId))) {
                 return Optional.of(new LabelsEvent(instanceId, Strings.isNullOrEmpty(event.getValue()) ? new ArrayList<>() : YamlEngine.unmarshal(event.getValue(), Collection.class)));
             }
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/generator/ClusterWorkerIdGenerator.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/generator/ClusterWorkerIdGenerator.java
index a69a369a82a..d388c278ce7 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/generator/ClusterWorkerIdGenerator.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/generator/ClusterWorkerIdGenerator.java
@@ -58,10 +58,10 @@ public final class ClusterWorkerIdGenerator implements WorkerIdGenerator {
             reTryCount++;
             result = generateSequentialId();
             if (result > MAX_WORKER_ID) {
-                result = result % 1024L;
+                result = result % MAX_WORKER_ID + 1;
             }
             if (reTryCount > MAX_RE_TRY) {
-                throw new ShardingSphereException("System assigned work-id failed, assigned work-id was {}", result);
+                throw new ShardingSphereException("System assigned %s failed, assigned worker id was %s", WORKER_ID_KEY, result);
             }
         } while (isExist(result));
         registryCenter.getComputeNodeStatusService().persistInstanceWorkerId(instanceMetaData.getId(), result);
@@ -80,7 +80,7 @@ public final class ClusterWorkerIdGenerator implements WorkerIdGenerator {
     private void checkConfigured(final long generatedWorkerId, final Properties props) {
         Optional<Long> configuredWorkerId = parseWorkerId(props);
         if (configuredWorkerId.isPresent()) {
-            log.warn("No need to configured {} in cluster mode, system assigned work-id was {}", WORKER_ID_KEY, generatedWorkerId);
+            log.warn("No need to configured {} in cluster mode, system assigned worker id was {}", WORKER_ID_KEY, generatedWorkerId);
         }
     }
 }
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusServiceTest.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusServi [...]
index 3fe67e38a04..d08b616363b 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusServiceTest.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusServiceTest.java
@@ -112,4 +112,10 @@ public final class ComputeNodeStatusServiceTest {
         ComputeNodeInstance actual = new ComputeNodeStatusService(repository).loadComputeNodeInstance(instanceMetaData);
         assertThat(actual.getInstanceMetaData(), is(instanceMetaData));
     }
+    
+    @Test
+    public void assertGetUsedWorkerIds() {
+        new ComputeNodeStatusService(repository).getUsedWorkerIds();
+        verify(repository).getChildrenKeys(ComputeNode.getInstanceWorkerIdRootNodePath());
+    }
 }
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcherTest.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeState [...]
index b63eefc2544..0724b2d2c39 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcherTest.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcherTest.java
@@ -22,7 +22,6 @@ import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.Gover
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.ComputeNodeStatus;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.LabelsEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.StateEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.WorkerIdEvent;
 import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
 import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
 import org.junit.Test;
@@ -55,24 +54,6 @@ public final class ComputeNodeStateChangedWatcherTest {
         assertThat(((StateEvent) actual.get()).getInstanceId(), is("127.0.0.1@3307"));
     }
     
-    @Test
-    public void assertCreateAddWorkerIdEvent() {
-        Optional<GovernanceEvent> actual = new ComputeNodeStateChangedWatcher()
-                .createGovernanceEvent(new DataChangedEvent("/nodes/compute_nodes/worker_id/127.0.0.1@3307", "123", Type.ADDED));
-        assertTrue(actual.isPresent());
-        assertThat(((WorkerIdEvent) actual.get()).getWorkerId(), is(123L));
-        assertThat(((WorkerIdEvent) actual.get()).getInstanceId(), is("127.0.0.1@3307"));
-    }
-    
-    @Test
-    public void assertCreateUpdateWorkerIdEvent() {
-        Optional<GovernanceEvent> actual = new ComputeNodeStateChangedWatcher()
-                .createGovernanceEvent(new DataChangedEvent("/nodes/compute_nodes/worker_id/127.0.0.1@3307", "123", Type.UPDATED));
-        assertTrue(actual.isPresent());
-        assertThat(((WorkerIdEvent) actual.get()).getWorkerId(), is(123L));
-        assertThat(((WorkerIdEvent) actual.get()).getInstanceId(), is("127.0.0.1@3307"));
-    }
-    
     @Test
     public void assertCreateAddLabelEvent() {
         Optional<GovernanceEvent> actual = new ComputeNodeStateChangedWatcher()