You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by me...@apache.org on 2022/07/27 06:07:20 UTC
[shardingsphere] branch master updated: Optimizing worker-id generation by in-memory computing in cluster mode (#19563)
This is an automated email from the ASF dual-hosted git repository.
menghaoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new f14af124ded Optimizing worker-id generation by in-memory computing in cluster mode (#19563)
f14af124ded is described below
commit f14af124dedfa809d7be91b6ef005948efdb3deb
Author: gin <ja...@163.com>
AuthorDate: Wed Jul 27 14:07:14 2022 +0800
Optimizing worker-id generation by in-memory computing in cluster mode (#19563)
* Optimizing worker-id generation by in-memory computing in cluster mode
* Change workerId to primitive type
---
.../keygen/SnowflakeKeyGenerateAlgorithm.java | 33 ++++--------
.../keygen/SnowflakeKeyGenerateAlgorithmTest.java | 42 +++++++++++++--
.../keygen/fixture/WorkerIdGeneratorFixture.java | 2 +
.../keygen/CosIdSnowflakeKeyGenerateAlgorithm.java | 18 +------
.../infra/instance/ComputeNodeInstance.java | 2 +
.../infra/instance/InstanceContext.java | 13 ++++-
.../infra/instance/InstanceContextTest.java | 8 +++
.../compute/service/ComputeNodeStatusService.java | 11 ++--
.../generator/ClusterWorkerIdGenerator.java | 62 +++++++++++++---------
.../service/ComputeNodeStatusServiceTest.java | 2 +-
.../generator/ClusterWorkerIdGeneratorTest.java | 11 +++-
11 files changed, 126 insertions(+), 78 deletions(-)
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/algorithm/keygen/SnowflakeKeyGenerateAlgorithm.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/algorithm/keygen/SnowflakeKeyGenerateAlgorithm.java
index f97783428ac..bb02bdab934 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/algorithm/keygen/SnowflakeKeyGenerateAlgorithm.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/algorithm/keygen/SnowflakeKeyGenerateAlgorithm.java
@@ -43,8 +43,6 @@ public final class SnowflakeKeyGenerateAlgorithm implements KeyGenerateAlgorithm
public static final long EPOCH;
- private static final String WORKER_ID_KEY = "worker-id";
-
private static final String MAX_VIBRATION_OFFSET_KEY = "max-vibration-offset";
private static final String MAX_TOLERATE_TIME_DIFFERENCE_MILLISECONDS_KEY = "max-tolerate-time-difference-milliseconds";
@@ -59,8 +57,6 @@ public final class SnowflakeKeyGenerateAlgorithm implements KeyGenerateAlgorithm
private static final long TIMESTAMP_LEFT_SHIFT_BITS = WORKER_ID_LEFT_SHIFT_BITS + WORKER_ID_BITS;
- private static final long WORKER_ID_MAX_VALUE = (1L << WORKER_ID_BITS) - 1;
-
private static final int DEFAULT_VIBRATION_VALUE = 1;
private static final int MAX_TOLERATE_TIME_DIFFERENCE_MILLISECONDS = 10;
@@ -73,8 +69,6 @@ public final class SnowflakeKeyGenerateAlgorithm implements KeyGenerateAlgorithm
@Getter
private Properties props;
- private long workerId;
-
private int maxVibrationOffset;
private int maxTolerateTimeDifferenceMilliseconds;
@@ -85,6 +79,8 @@ public final class SnowflakeKeyGenerateAlgorithm implements KeyGenerateAlgorithm
private volatile long lastMilliseconds;
+ private volatile InstanceContext instanceContext;
+
static {
Calendar calendar = Calendar.getInstance();
calendar.set(2016, Calendar.NOVEMBER, 1);
@@ -104,21 +100,10 @@ public final class SnowflakeKeyGenerateAlgorithm implements KeyGenerateAlgorithm
@Override
public void setInstanceContext(final InstanceContext instanceContext) {
- workerId = initWorkerId(instanceContext);
- }
-
- private long initWorkerId(final InstanceContext instanceContext) {
- long result = null == instanceContext ? parseWorkerId() : instanceContext.generateWorkerId(props);
- rangeValidate(result);
- return result;
- }
-
- private long parseWorkerId() {
- return null == props ? DEFAULT_WORKER_ID : Long.parseLong(props.getOrDefault(WORKER_ID_KEY, DEFAULT_WORKER_ID).toString());
- }
-
- private void rangeValidate(final long workerId) {
- Preconditions.checkArgument(workerId >= 0L && workerId <= WORKER_ID_MAX_VALUE, "Illegal worker id.");
+ this.instanceContext = instanceContext;
+ if (null != instanceContext) {
+ instanceContext.generateWorkerId(props);
+ }
}
private int getMaxVibrationOffset(final Properties props) {
@@ -146,7 +131,7 @@ public final class SnowflakeKeyGenerateAlgorithm implements KeyGenerateAlgorithm
sequence = sequenceOffset;
}
lastMilliseconds = currentMilliseconds;
- return ((currentMilliseconds - EPOCH) << TIMESTAMP_LEFT_SHIFT_BITS) | (workerId << WORKER_ID_LEFT_SHIFT_BITS) | sequence;
+ return ((currentMilliseconds - EPOCH) << TIMESTAMP_LEFT_SHIFT_BITS) | (getWorkerId() << WORKER_ID_LEFT_SHIFT_BITS) | sequence;
}
@SneakyThrows(InterruptedException.class)
@@ -174,6 +159,10 @@ public final class SnowflakeKeyGenerateAlgorithm implements KeyGenerateAlgorithm
sequenceOffset = sequenceOffset >= maxVibrationOffset ? 0 : sequenceOffset + 1;
}
+ private long getWorkerId() {
+ return null == instanceContext ? DEFAULT_WORKER_ID : instanceContext.getWorkerId();
+ }
+
@Override
public String getType() {
return "SNOWFLAKE";
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/algorithm/keygen/SnowflakeKeyGenerateAlgorithmTest.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/algorithm/keygen/SnowflakeKeyGenerateAlgorithmTest.java
index 0722238bb1d..2122d317c35 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/algorithm/keygen/SnowflakeKeyGenerateAlgorithmTest.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/algorithm/keygen/SnowflakeKeyGenerateAlgorithmTest.java
@@ -18,6 +18,7 @@
package org.apache.shardingsphere.sharding.algorithm.keygen;
import lombok.SneakyThrows;
+import org.apache.shardingsphere.infra.config.algorithm.InstanceAwareAlgorithm;
import org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmConfiguration;
import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
import org.apache.shardingsphere.infra.eventbus.EventBusContext;
@@ -47,6 +48,7 @@ import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
public final class SnowflakeKeyGenerateAlgorithmTest {
@@ -54,12 +56,23 @@ public final class SnowflakeKeyGenerateAlgorithmTest {
private static final int DEFAULT_KEY_AMOUNT = 10;
+ private static final InstanceContext INSTANCE;
+
+ static {
+ InstanceContext instanceContext = mock(InstanceContext.class);
+ when(instanceContext.getWorkerId()).thenReturn(0L);
+ INSTANCE = instanceContext;
+ }
+
@Test
public void assertGenerateKeyWithMultipleThreads() throws ExecutionException, InterruptedException {
int threadNumber = Runtime.getRuntime().availableProcessors() * 2;
ExecutorService executor = Executors.newFixedThreadPool(threadNumber);
int taskNumber = threadNumber * 4;
KeyGenerateAlgorithm algorithm = KeyGenerateAlgorithmFactory.newInstance(new ShardingSphereAlgorithmConfiguration("SNOWFLAKE", new Properties()));
+ if (algorithm instanceof InstanceAwareAlgorithm) {
+ ((InstanceAwareAlgorithm) algorithm).setInstanceContext(INSTANCE);
+ }
Set<Comparable<?>> actual = new HashSet<>(taskNumber, 1);
for (int i = 0; i < taskNumber; i++) {
actual.add(executor.submit((Callable<Comparable<?>>) algorithm::generateKey).get());
@@ -71,6 +84,9 @@ public final class SnowflakeKeyGenerateAlgorithmTest {
public void assertGenerateKeyWithSingleThread() {
SnowflakeKeyGenerateAlgorithm.setTimeService(new FixedTimeService(1));
KeyGenerateAlgorithm algorithm = KeyGenerateAlgorithmFactory.newInstance(new ShardingSphereAlgorithmConfiguration("SNOWFLAKE", new Properties()));
+ if (algorithm instanceof InstanceAwareAlgorithm) {
+ ((InstanceAwareAlgorithm) algorithm).setInstanceContext(INSTANCE);
+ }
List<Comparable<?>> expected = Arrays.asList(0L, 4194305L, 4194306L, 8388608L, 8388609L, 12582913L, 12582914L, 16777216L, 16777217L, 20971521L);
List<Comparable<?>> actual = new ArrayList<>(DEFAULT_KEY_AMOUNT);
for (int i = 0; i < DEFAULT_KEY_AMOUNT; i++) {
@@ -85,6 +101,9 @@ public final class SnowflakeKeyGenerateAlgorithmTest {
Properties props = new Properties();
props.setProperty("max-vibration-offset", "3");
KeyGenerateAlgorithm algorithm = KeyGenerateAlgorithmFactory.newInstance(new ShardingSphereAlgorithmConfiguration("SNOWFLAKE", props));
+ if (algorithm instanceof InstanceAwareAlgorithm) {
+ ((InstanceAwareAlgorithm) algorithm).setInstanceContext(INSTANCE);
+ }
assertThat(algorithm.generateKey(), is(0L));
assertThat(algorithm.generateKey(), is(1L));
assertThat(algorithm.generateKey(), is(2L));
@@ -98,6 +117,9 @@ public final class SnowflakeKeyGenerateAlgorithmTest {
SnowflakeKeyGenerateAlgorithm.setTimeService(new TimeService());
props.setProperty("max-vibration-offset", String.valueOf(3));
KeyGenerateAlgorithm algorithm = KeyGenerateAlgorithmFactory.newInstance(new ShardingSphereAlgorithmConfiguration("SNOWFLAKE", props));
+ if (algorithm instanceof InstanceAwareAlgorithm) {
+ ((InstanceAwareAlgorithm) algorithm).setInstanceContext(INSTANCE);
+ }
String actualGenerateKey0 = Long.toBinaryString(Long.parseLong(algorithm.generateKey().toString()));
assertThat(Integer.parseInt(actualGenerateKey0.substring(actualGenerateKey0.length() - 3), 2), is(0));
Thread.sleep(2L);
@@ -119,6 +141,9 @@ public final class SnowflakeKeyGenerateAlgorithmTest {
TimeService timeService = new FixedTimeService(1);
SnowflakeKeyGenerateAlgorithm.setTimeService(timeService);
KeyGenerateAlgorithm algorithm = KeyGenerateAlgorithmFactory.newInstance(new ShardingSphereAlgorithmConfiguration("SNOWFLAKE", new Properties()));
+ if (algorithm instanceof InstanceAwareAlgorithm) {
+ ((InstanceAwareAlgorithm) algorithm).setInstanceContext(INSTANCE);
+ }
setLastMilliseconds(algorithm, timeService.getCurrentMillis() + 2);
List<Comparable<?>> expected = Arrays.asList(4194304L, 8388609L, 8388610L, 12582912L, 12582913L, 16777217L, 16777218L, 20971520L, 20971521L, 25165825L);
List<Comparable<?>> actual = new ArrayList<>(DEFAULT_KEY_AMOUNT);
@@ -135,6 +160,9 @@ public final class SnowflakeKeyGenerateAlgorithmTest {
Properties props = new Properties();
props.setProperty("max-tolerate-time-difference-milliseconds", String.valueOf(0));
KeyGenerateAlgorithm algorithm = KeyGenerateAlgorithmFactory.newInstance(new ShardingSphereAlgorithmConfiguration("SNOWFLAKE", props));
+ if (algorithm instanceof InstanceAwareAlgorithm) {
+ ((InstanceAwareAlgorithm) algorithm).setInstanceContext(INSTANCE);
+ }
setLastMilliseconds(algorithm, timeService.getCurrentMillis() + 2);
List<Comparable<?>> actual = new ArrayList<>(DEFAULT_KEY_AMOUNT);
for (int i = 0; i < DEFAULT_KEY_AMOUNT; i++) {
@@ -148,6 +176,9 @@ public final class SnowflakeKeyGenerateAlgorithmTest {
TimeService timeService = new FixedTimeService(2);
SnowflakeKeyGenerateAlgorithm.setTimeService(timeService);
KeyGenerateAlgorithm algorithm = KeyGenerateAlgorithmFactory.newInstance(new ShardingSphereAlgorithmConfiguration("SNOWFLAKE", new Properties()));
+ if (algorithm instanceof InstanceAwareAlgorithm) {
+ ((InstanceAwareAlgorithm) algorithm).setInstanceContext(INSTANCE);
+ }
setLastMilliseconds(algorithm, timeService.getCurrentMillis());
setSequence(algorithm, (1 << DEFAULT_SEQUENCE_BITS) - 1);
List<Comparable<?>> expected = Arrays.asList(4194304L, 4194305L, 4194306L, 8388608L, 8388609L, 8388610L, 12582913L, 12582914L, 12582915L, 16777216L);
@@ -175,9 +206,9 @@ public final class SnowflakeKeyGenerateAlgorithmTest {
@Test(expected = IllegalArgumentException.class)
public void assertSetWorkerIdFailureWhenNegative() {
SnowflakeKeyGenerateAlgorithm algorithm = (SnowflakeKeyGenerateAlgorithm) KeyGenerateAlgorithmFactory.newInstance(new ShardingSphereAlgorithmConfiguration("SNOWFLAKE", new Properties()));
- algorithm.setInstanceContext(new InstanceContext(new ComputeNodeInstance(mock(InstanceMetaData.class)), new WorkerIdGeneratorFixture(-1L),
- new ModeConfiguration("Standalone", null, false), mock(LockContext.class), new EventBusContext()));
- algorithm.init(new Properties());
+ InstanceContext instanceContext = new InstanceContext(new ComputeNodeInstance(mock(InstanceMetaData.class)), new WorkerIdGeneratorFixture(-1L),
+ new ModeConfiguration("Standalone", null, false), mock(LockContext.class), new EventBusContext());
+ algorithm.setInstanceContext(instanceContext);
algorithm.generateKey();
}
@@ -191,8 +222,9 @@ public final class SnowflakeKeyGenerateAlgorithmTest {
@Test(expected = IllegalArgumentException.class)
public void assertSetWorkerIdFailureWhenOutOfRange() {
SnowflakeKeyGenerateAlgorithm algorithm = (SnowflakeKeyGenerateAlgorithm) KeyGenerateAlgorithmFactory.newInstance(new ShardingSphereAlgorithmConfiguration("SNOWFLAKE", new Properties()));
- algorithm.setInstanceContext(new InstanceContext(new ComputeNodeInstance(mock(InstanceMetaData.class)), new WorkerIdGeneratorFixture(Long.MIN_VALUE),
- new ModeConfiguration("Standalone", null, false), mock(LockContext.class), new EventBusContext()));
+ InstanceContext instanceContext = new InstanceContext(new ComputeNodeInstance(mock(InstanceMetaData.class)), new WorkerIdGeneratorFixture(Long.MIN_VALUE),
+ new ModeConfiguration("Standalone", null, false), mock(LockContext.class), new EventBusContext());
+ algorithm.setInstanceContext(instanceContext);
algorithm.generateKey();
}
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/algorithm/keygen/fixture/WorkerIdGeneratorFixture.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/algorithm/keygen/fixture/WorkerIdGeneratorFixture.java
index 07d8e765d19..3115fb49f81 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/algorithm/keygen/fixture/WorkerIdGeneratorFixture.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/algorithm/keygen/fixture/WorkerIdGeneratorFixture.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.sharding.algorithm.keygen.fixture;
+import com.google.common.base.Preconditions;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.instance.workerid.WorkerIdGenerator;
@@ -29,6 +30,7 @@ public final class WorkerIdGeneratorFixture implements WorkerIdGenerator {
@Override
public long generate(final Properties props) {
+ Preconditions.checkArgument(fixtureWorkerId >= 0L && fixtureWorkerId <= MAX_WORKER_ID, "Illegal worker id.");
return fixtureWorkerId;
}
}
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-plugin/shardingsphere-sharding-cosid/src/main/java/org/apache/shardingsphere/sharding/cosid/algorithm/keygen/CosIdSnowflakeKeyGenerateAlgorithm.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-plugin/shardingsphere-sharding-cosid/src/main/java/org/apache/shardingsphere/sharding/cosid/algorithm/keygen/CosIdSnowflakeKeyGenerateAlgorithm.java
index 48764f7ff23..6f7bb3c0ecb 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-plugin/shardingsphere-sharding-cosid/src/main/java/org/apache/shardingsphere/sharding/cosid/algorithm/keygen/CosIdSnowflakeKeyGenerateAlgorithm.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-plugin/shardingsphere-sharding-cosid/src/main/java/org/apache/shardingsphere/sharding/cosid/algorithm/keygen/CosIdSnowflakeKeyGenerateAlgorithm.java
@@ -43,12 +43,6 @@ public final class CosIdSnowflakeKeyGenerateAlgorithm implements KeyGenerateAlgo
public static final String EPOCH_KEY = "epoch";
- public static final String WORKER_ID_KEY = "worker-id";
-
- public static final long DEFAULT_WORKER_ID = 0L;
-
- public static final long WORKER_ID_MAX_VALUE = 1L << 10;
-
@Getter
private Properties props;
@@ -85,22 +79,12 @@ public final class CosIdSnowflakeKeyGenerateAlgorithm implements KeyGenerateAlgo
@Override
public void setInstanceContext(final InstanceContext instanceContext) {
- long workerId = initWorkerId(instanceContext);
+ long workerId = instanceContext.generateWorkerId(props);
MillisecondSnowflakeId millisecondSnowflakeId =
new MillisecondSnowflakeId(epoch, MillisecondSnowflakeId.DEFAULT_TIMESTAMP_BIT, MillisecondSnowflakeId.DEFAULT_MACHINE_BIT, MillisecondSnowflakeId.DEFAULT_SEQUENCE_BIT, workerId);
snowflakeId = new StringSnowflakeId(new ClockSyncSnowflakeId(millisecondSnowflakeId), Radix62IdConverter.PAD_START);
}
- private long initWorkerId(final InstanceContext instanceContext) {
- long result = null == instanceContext ? Long.parseLong(props.getOrDefault(WORKER_ID_KEY, DEFAULT_WORKER_ID).toString()) : instanceContext.generateWorkerId(props);
- rangeValidate(result);
- return result;
- }
-
- private void rangeValidate(final long workerId) {
- Preconditions.checkArgument(workerId >= 0L && workerId < WORKER_ID_MAX_VALUE, "Illegal worker id.");
- }
-
@Override
public Comparable<?> generateKey() {
if (asString) {
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstance.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstance.java
index c04b9755ef8..178ce4b151b 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstance.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstance.java
@@ -41,6 +41,8 @@ public final class ComputeNodeInstance {
private Collection<String> labels = new ArrayList<>();
+ private volatile long workerId;
+
/**
* Set labels.
*
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java
index 4405d3be99e..4f37ae6b40a 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java
@@ -96,6 +96,15 @@ public final class InstanceContext {
allClusterInstances.stream().filter(each -> each.getMetaData().getId().equals(instanceId)).forEach(each -> each.setLabels(labels));
}
+ /**
+ * Get worker id.
+ *
+ * @return worker id
+ */
+ public long getWorkerId() {
+ return instance.getWorkerId();
+ }
+
/**
* Generate worker id.
*
@@ -103,7 +112,9 @@ public final class InstanceContext {
* @return worker id
*/
public long generateWorkerId(final Properties props) {
- return workerIdGenerator.generate(props);
+ long result = workerIdGenerator.generate(props);
+ instance.setWorkerId(result);
+ return getWorkerId();
}
/**
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/instance/InstanceContextTest.java b/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/instance/InstanceContextTest.java
index a5a39dcb099..44af3f094cd 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/instance/InstanceContextTest.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/instance/InstanceContextTest.java
@@ -64,6 +64,14 @@ public final class InstanceContextTest {
assertThat(actual, is(StateType.OK));
}
+ @Test
+ public void assertGetWorkerId() {
+ ComputeNodeInstance computeNodeInstance = mock(ComputeNodeInstance.class);
+ when(computeNodeInstance.getWorkerId()).thenReturn(0L);
+ InstanceContext context = new InstanceContext(computeNodeInstance, new WorkerIdGeneratorFixture(Long.MIN_VALUE), modeConfig, lockContext, eventBusContext);
+ assertThat(context.getWorkerId(), is(0L));
+ }
+
@Test
public void assertGenerateWorkerId() {
InstanceContext context = new InstanceContext(new ComputeNodeInstance(mock(InstanceMetaData.class)), new WorkerIdGeneratorFixture(Long.MIN_VALUE), modeConfig, lockContext, eventBusContext);
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
index 611bd05308a..db8a404c9a1 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
@@ -150,15 +150,18 @@ public final class ComputeNodeStatusService {
}
/**
- * Get used worker ids.
+ * Get assigned worker ids.
*
- * @return used worker ids
+ * @return assigned worker ids
*/
- public Set<Long> getUsedWorkerIds() {
+ public Set<Long> getAssignedWorkerIds() {
Set<Long> result = new LinkedHashSet<>();
List<String> childrenKeys = repository.getChildrenKeys(ComputeNode.getInstanceWorkerIdRootNodePath());
for (String each : childrenKeys) {
- result.add(Long.parseLong(repository.get(ComputeNode.getInstanceWorkerIdNodePath(each))));
+ String workerId = repository.get(ComputeNode.getInstanceWorkerIdNodePath(each));
+ if (null != workerId) {
+ result.add(Long.parseLong(workerId));
+ }
}
return result;
}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/generator/ClusterWorkerIdGenerator.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/generator/ClusterWorkerIdGenerator.java
index 6b01f3403fd..7af1aa4a96c 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/generator/ClusterWorkerIdGenerator.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/generator/ClusterWorkerIdGenerator.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.workerid.generator;
+import com.google.common.base.Preconditions;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.infra.exception.ShardingSphereException;
@@ -24,8 +25,14 @@ import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaData;
import org.apache.shardingsphere.infra.instance.workerid.WorkerIdGenerator;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.workerid.node.WorkerIdNode;
+import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Optional;
+import java.util.PriorityQueue;
import java.util.Properties;
+import java.util.Set;
/**
* Worker id generator for cluster mode.
@@ -34,8 +41,6 @@ import java.util.Properties;
@Slf4j
public final class ClusterWorkerIdGenerator implements WorkerIdGenerator {
- private static final int MAX_RETRY = 3;
-
private final RegistryCenter registryCenter;
private final InstanceMetaData instanceMetaData;
@@ -44,37 +49,42 @@ public final class ClusterWorkerIdGenerator implements WorkerIdGenerator {
@Override
public long generate(final Properties props) {
- long result = registryCenter.getComputeNodeStatusService().loadInstanceWorkerId(instanceMetaData.getId()).orElseGet(this::generate);
+ long result = registryCenter.getComputeNodeStatusService().loadInstanceWorkerId(instanceMetaData.getId()).orElseGet(this::reGenerate);
checkIneffectiveConfiguration(result, props);
return result;
}
- private long generate() {
- long result;
- int retryTimes = 0;
+ private Long reGenerate() {
+ Optional<Long> result;
do {
- retryTimes++;
- result = generateSequentialId();
- if (result > MAX_WORKER_ID) {
- result = result % MAX_WORKER_ID + 1;
- }
- // TODO check may retry should in the front of id generate
- if (retryTimes > MAX_RETRY) {
- throw new ShardingSphereException("System assigned %s failed, assigned %s was %s", WORKER_ID_KEY, WORKER_ID_KEY, result);
- }
- } while (isAssignedWorkerId(result));
- registryCenter.getComputeNodeStatusService().persistInstanceWorkerId(instanceMetaData.getId(), result);
- return result;
- }
-
- private long generateSequentialId() {
- String sequentialId = registryCenter.getRepository().getSequentialId(WorkerIdNode.getWorkerIdGeneratorPath(instanceMetaData.getId()), "");
- // TODO maybe throw exception is better if `null == sequentialId`
- return null == sequentialId ? DEFAULT_WORKER_ID : Long.parseLong(sequentialId);
+ result = generateAvailableWorkerId();
+ } while (!result.isPresent());
+ Long generatedWorkId = result.get();
+ registryCenter.getComputeNodeStatusService().persistInstanceWorkerId(instanceMetaData.getId(), generatedWorkId);
+ return generatedWorkId;
}
- private boolean isAssignedWorkerId(final long workerId) {
- return registryCenter.getComputeNodeStatusService().getUsedWorkerIds().contains(workerId);
+ private Optional<Long> generateAvailableWorkerId() {
+ Set<Long> assignedWorkerIds = registryCenter.getComputeNodeStatusService().getAssignedWorkerIds();
+ if (assignedWorkerIds.size() > 1024) {
+ throw new ShardingSphereException("System assigned %s failed, Illegal max vibration offset.", WORKER_ID_KEY);
+ }
+ Collection<Long> maxAvailableIds = new ArrayList<>(1024);
+ for (int i = 0; i < 1024; i++) {
+ maxAvailableIds.add((long) i);
+ }
+ PriorityQueue<Long> priorityQueue = new PriorityQueue<>(maxAvailableIds);
+ for (Long each : assignedWorkerIds) {
+ priorityQueue.remove(each);
+ }
+ Long preselectedWorkerId = priorityQueue.poll();
+ Preconditions.checkState(null != preselectedWorkerId, "Preselected worker-id can not be null.");
+ try {
+ registryCenter.getRepository().persistEphemeral(WorkerIdNode.getWorkerIdGeneratorPath(preselectedWorkerId.toString()), instanceMetaData.getId());
+ return Optional.of(preselectedWorkerId);
+ } catch (final ClusterPersistRepositoryException ignore) {
+ return Optional.empty();
+ }
}
private void checkIneffectiveConfiguration(final long generatedWorkerId, final Properties props) {
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusServiceTest.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusServi [...]
index 67f71353a82..bbf40985fcd 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusServiceTest.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusServiceTest.java
@@ -122,7 +122,7 @@ public final class ComputeNodeStatusServiceTest {
@Test
public void assertGetUsedWorkerIds() {
- new ComputeNodeStatusService(repository).getUsedWorkerIds();
+ new ComputeNodeStatusService(repository).getAssignedWorkerIds();
verify(repository).getChildrenKeys(ComputeNode.getInstanceWorkerIdRootNodePath());
}
}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/generator/ClusterWorkerIdGeneratorTest.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/generator/ClusterWorkerIdGeneratorTest.java
index c62d4fde41a..23320c5f1b6 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/generator/ClusterWorkerIdGeneratorTest.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/generator/ClusterWorkerIdGeneratorTest.java
@@ -20,7 +20,11 @@ package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.work
import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaData;
import org.apache.shardingsphere.infra.instance.workerid.WorkerIdGenerator;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
+import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.internal.util.collections.Sets;
+import org.mockito.stubbing.Answer;
import java.util.Optional;
import java.util.Properties;
@@ -49,8 +53,11 @@ public final class ClusterWorkerIdGeneratorTest {
InstanceMetaData instanceMetaData = mock(InstanceMetaData.class);
when(instanceMetaData.getId()).thenReturn("foo_id");
RegistryCenter registryCenter = mock(RegistryCenter.class, RETURNS_DEEP_STUBS);
+ ClusterPersistRepository repository = mock(ClusterPersistRepository.class);
+ Mockito.doAnswer((Answer<Object>) invocation -> "foo_id").when(repository).persistEphemeral("/worker_id/0", "foo_id");
+ when(registryCenter.getRepository()).thenReturn(repository);
when(registryCenter.getComputeNodeStatusService().loadInstanceWorkerId("foo_id")).thenReturn(Optional.empty());
- when(registryCenter.getRepository().getSequentialId("/worker_id/foo_id", "")).thenReturn("100");
- assertThat(new ClusterWorkerIdGenerator(registryCenter, instanceMetaData).generate(new Properties()), is(100L));
+ when(registryCenter.getComputeNodeStatusService().getAssignedWorkerIds()).thenReturn(Sets.newSet(1L));
+ assertThat(new ClusterWorkerIdGenerator(registryCenter, instanceMetaData).generate(new Properties()), is(0L));
}
}