You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by me...@apache.org on 2022/07/08 07:10:28 UTC
[shardingsphere] branch master updated: Remove worker-id from compute node instance (#18951)
This is an automated email from the ASF dual-hosted git repository.
menghaoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new ccfc10cb4da Remove worker-id from compute node instance (#18951)
ccfc10cb4da is described below
commit ccfc10cb4dadd5b9effad53391a7fc0477ad731f
Author: gin <ja...@163.com>
AuthorDate: Fri Jul 8 15:10:22 2022 +0800
Remove worker-id from compute node instance (#18951)
* Remove worker-id from compute node instance
* Fix log info
---
.../infra/instance/ComputeNodeInstance.java | 2 --
.../infra/instance/InstanceContext.java | 28 +-----------------
.../metadata/persist/node/ComputeNodeTest.java | 5 ++++
.../ClusterContextManagerCoordinator.java | 13 ---------
.../status/compute/event/WorkerIdEvent.java | 34 ----------------------
.../compute/service/ComputeNodeStatusService.java | 1 -
.../watcher/ComputeNodeStateChangedWatcher.java | 4 ---
.../generator/ClusterWorkerIdGenerator.java | 6 ++--
.../service/ComputeNodeStatusServiceTest.java | 6 ++++
.../ComputeNodeStateChangedWatcherTest.java | 19 ------------
10 files changed, 15 insertions(+), 103 deletions(-)
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstance.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstance.java
index 51a68a2e8e7..adda363dc40 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstance.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstance.java
@@ -40,8 +40,6 @@ public final class ComputeNodeInstance {
private Collection<String> labels;
- private Long workerId;
-
/**
* Set labels.
*
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java
index 352bb7c8f2d..1ff9164163d 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java
@@ -28,7 +28,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
-import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
@@ -77,17 +76,6 @@ public final class InstanceContext {
}
}
- /**
- * Update instance worker id.
- *
- * @param workerId worker id
- */
- public void updateWorkerId(final Long workerId) {
- if (!Objects.equals(workerId, instance.getWorkerId())) {
- instance.setWorkerId(workerId);
- }
- }
-
/**
* Update instance label.
*
@@ -101,15 +89,6 @@ public final class InstanceContext {
computeNodeInstances.stream().filter(each -> each.getInstanceMetaData().getId().equals(instanceId)).forEach(each -> each.setLabels(labels));
}
- /**
- * Get worker id.
- *
- * @return worker id
- */
- public long getWorkerId() {
- return instance.getWorkerId();
- }
-
/**
* Generate worker id.
*
@@ -117,12 +96,7 @@ public final class InstanceContext {
* @return worker id
*/
public long generateWorkerId(final Properties props) {
- Long result = instance.getWorkerId();
- if (null == result) {
- result = workerIdGenerator.generate(props);
- instance.setWorkerId(result);
- }
- return result;
+ return workerIdGenerator.generate(props);
}
/**
diff --git a/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNodeTest.java b/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNodeTest.java
index 8d46c5fed38..8c0a24caa48 100644
--- a/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNodeTest.java
+++ b/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNodeTest.java
@@ -60,6 +60,11 @@ public final class ComputeNodeTest {
assertThat(ComputeNode.getInstanceWorkerIdNodePath("foo_instance"), is("/nodes/compute_nodes/worker_id/foo_instance"));
}
+ @Test
+ public void assertGetInstanceWorkerIdRootNodePath() {
+ assertThat(ComputeNode.getInstanceWorkerIdRootNodePath(), is("/nodes/compute_nodes/worker_id"));
+ }
+
@Test
public void assertGetInstanceIdByComputeNodePath() {
assertThat(ComputeNode.getInstanceIdByComputeNode("/nodes/compute_nodes/status/foo_instance_1"), is("foo_instance_1"));
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
index 690ef0b76cd..ad8f3a7e187 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
@@ -48,7 +48,6 @@ import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.statu
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListTriggerEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListUnitCompleteEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.StateEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.WorkerIdEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.PrimaryStateChangedEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.StorageNodeChangedEvent;
import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
@@ -219,18 +218,6 @@ public final class ClusterContextManagerCoordinator {
contextManager.getInstanceContext().updateInstanceStatus(event.getInstanceId(), event.getStatus());
}
- /**
- * Renew instance worker id.
- *
- * @param event worker id event
- */
- @Subscribe
- public synchronized void renew(final WorkerIdEvent event) {
- if (contextManager.getInstanceContext().getInstance().getInstanceMetaData().getId().equals(event.getInstanceId())) {
- contextManager.getInstanceContext().updateWorkerId(event.getWorkerId());
- }
- }
-
/**
* Renew instance labels.
*
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/WorkerIdEvent.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/WorkerIdEvent.java
deleted file mode 100644
index a979386202b..00000000000
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/WorkerIdEvent.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event;
-
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
-
-/**
- * Worker id changed event.
- */
-@RequiredArgsConstructor
-@Getter
-public final class WorkerIdEvent implements GovernanceEvent {
-
- private final String instanceId;
-
- private final Long workerId;
-}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
index c197e90a2d7..611bd05308a 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
@@ -146,7 +146,6 @@ public final class ComputeNodeStatusService {
ComputeNodeInstance result = new ComputeNodeInstance(instanceMetaData);
result.setLabels(loadInstanceLabels(instanceMetaData.getId()));
result.switchState(loadInstanceStatus(instanceMetaData.getId()));
- loadInstanceWorkerId(instanceMetaData.getId()).ifPresent(result::setWorkerId);
return result;
}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChan [...]
index 35a5c1cc143..c1bfd093142 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
@@ -30,7 +30,6 @@ import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.statu
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListTriggerEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListUnitCompleteEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.StateEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.WorkerIdEvent;
import org.apache.shardingsphere.mode.metadata.persist.node.ComputeNode;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
@@ -67,9 +66,6 @@ public final class ComputeNodeStateChangedWatcher implements GovernanceWatcher<G
Collection<String> status = Strings.isNullOrEmpty(event.getValue()) ? new ArrayList<>() : YamlEngine.unmarshal(event.getValue(), Collection.class);
return Optional.of(new StateEvent(instanceId, status));
}
- if (event.getKey().equals(ComputeNode.getInstanceWorkerIdNodePath(instanceId))) {
- return Optional.of(new WorkerIdEvent(instanceId, Strings.isNullOrEmpty(event.getValue()) ? null : Long.valueOf(event.getValue())));
- }
if (event.getKey().equals(ComputeNode.getInstanceLabelsNodePath(instanceId))) {
return Optional.of(new LabelsEvent(instanceId, Strings.isNullOrEmpty(event.getValue()) ? new ArrayList<>() : YamlEngine.unmarshal(event.getValue(), Collection.class)));
}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/generator/ClusterWorkerIdGenerator.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/generator/ClusterWorkerIdGenerator.java
index a69a369a82a..d388c278ce7 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/generator/ClusterWorkerIdGenerator.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/generator/ClusterWorkerIdGenerator.java
@@ -58,10 +58,10 @@ public final class ClusterWorkerIdGenerator implements WorkerIdGenerator {
reTryCount++;
result = generateSequentialId();
if (result > MAX_WORKER_ID) {
- result = result % 1024L;
+ result = result % MAX_WORKER_ID + 1;
}
if (reTryCount > MAX_RE_TRY) {
- throw new ShardingSphereException("System assigned work-id failed, assigned work-id was {}", result);
+ throw new ShardingSphereException("System assigned %s failed, assigned worker id was %s", WORKER_ID_KEY, result);
}
} while (isExist(result));
registryCenter.getComputeNodeStatusService().persistInstanceWorkerId(instanceMetaData.getId(), result);
@@ -80,7 +80,7 @@ public final class ClusterWorkerIdGenerator implements WorkerIdGenerator {
private void checkConfigured(final long generatedWorkerId, final Properties props) {
Optional<Long> configuredWorkerId = parseWorkerId(props);
if (configuredWorkerId.isPresent()) {
- log.warn("No need to configured {} in cluster mode, system assigned work-id was {}", WORKER_ID_KEY, generatedWorkerId);
+ log.warn("No need to configured {} in cluster mode, system assigned worker id was {}", WORKER_ID_KEY, generatedWorkerId);
}
}
}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusServiceTest.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusServi [...]
index 3fe67e38a04..d08b616363b 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusServiceTest.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusServiceTest.java
@@ -112,4 +112,10 @@ public final class ComputeNodeStatusServiceTest {
ComputeNodeInstance actual = new ComputeNodeStatusService(repository).loadComputeNodeInstance(instanceMetaData);
assertThat(actual.getInstanceMetaData(), is(instanceMetaData));
}
+
+ @Test
+ public void assertGetUsedWorkerIds() {
+ new ComputeNodeStatusService(repository).getUsedWorkerIds();
+ verify(repository).getChildrenKeys(ComputeNode.getInstanceWorkerIdRootNodePath());
+ }
}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcherTest.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeState [...]
index b63eefc2544..0724b2d2c39 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcherTest.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcherTest.java
@@ -22,7 +22,6 @@ import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.Gover
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.ComputeNodeStatus;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.LabelsEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.StateEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.WorkerIdEvent;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
import org.junit.Test;
@@ -55,24 +54,6 @@ public final class ComputeNodeStateChangedWatcherTest {
assertThat(((StateEvent) actual.get()).getInstanceId(), is("127.0.0.1@3307"));
}
- @Test
- public void assertCreateAddWorkerIdEvent() {
- Optional<GovernanceEvent> actual = new ComputeNodeStateChangedWatcher()
- .createGovernanceEvent(new DataChangedEvent("/nodes/compute_nodes/worker_id/127.0.0.1@3307", "123", Type.ADDED));
- assertTrue(actual.isPresent());
- assertThat(((WorkerIdEvent) actual.get()).getWorkerId(), is(123L));
- assertThat(((WorkerIdEvent) actual.get()).getInstanceId(), is("127.0.0.1@3307"));
- }
-
- @Test
- public void assertCreateUpdateWorkerIdEvent() {
- Optional<GovernanceEvent> actual = new ComputeNodeStateChangedWatcher()
- .createGovernanceEvent(new DataChangedEvent("/nodes/compute_nodes/worker_id/127.0.0.1@3307", "123", Type.UPDATED));
- assertTrue(actual.isPresent());
- assertThat(((WorkerIdEvent) actual.get()).getWorkerId(), is(123L));
- assertThat(((WorkerIdEvent) actual.get()).getInstanceId(), is("127.0.0.1@3307"));
- }
-
@Test
public void assertCreateAddLabelEvent() {
Optional<GovernanceEvent> actual = new ComputeNodeStateChangedWatcher()