You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by me...@apache.org on 2021/02/05 07:23:45 UTC
[shardingsphere] branch master updated: Add ScalingAPIImplTest
(#9342)
This is an automated email from the ASF dual-hosted git repository.
menghaoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new af5566b Add ScalingAPIImplTest (#9342)
af5566b is described below
commit af5566bb90107f84a42944e8da62d301c756852b
Author: 邱鹿 Lucas <lu...@163.com>
AuthorDate: Fri Feb 5 15:23:07 2021 +0800
Add ScalingAPIImplTest (#9342)
* rename package bitset to distribution
* add ScalingAPIImplTest
Co-authored-by: qiulu3 <Lucas209910>
---
.../shardingsphere-scaling-core/pom.xml | 6 ++
.../scaling/core/api/impl/ScalingAPIImpl.java | 21 ++--
.../AbstractBitSetChannel.java | 2 +-
.../AutoAcknowledgeChannel.java | 2 +-
.../{bitset => distribution}/BitSetChannel.java | 2 +-
.../BlockingQueueChannel.java | 2 +-
.../{ => distribution}/DistributionChannel.java | 7 +-
.../{bitset => distribution}/ManualBitSet.java | 2 +-
.../core/job/task/incremental/IncrementalTask.java | 2 +-
.../scaling/core/api/impl/ScalingAPIImplTest.java | 112 +++++++++++++++++++++
.../AutoAcknowledgeChannelTest.java | 2 +-
.../DistributionChannelTest.java | 3 +-
.../{bitset => distribution}/ManualBitSetTest.java | 2 +-
.../scaling/core/fixture/EmbedTestingServer.java | 74 ++++++++++++++
14 files changed, 218 insertions(+), 21 deletions(-)
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/pom.xml b/shardingsphere-scaling/shardingsphere-scaling-core/pom.xml
index bc5bba1..16b44da 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/pom.xml
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/pom.xml
@@ -95,5 +95,11 @@
<artifactId>h2</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-test</artifactId>
+ <version>${curator.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImpl.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImpl.java
index d9777d4..de01d4b 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImpl.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImpl.java
@@ -29,6 +29,7 @@ import org.apache.shardingsphere.scaling.core.common.constant.ScalingConstant;
import org.apache.shardingsphere.scaling.core.common.exception.ScalingJobNotFoundException;
import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
import org.apache.shardingsphere.scaling.core.job.JobContext;
+import org.apache.shardingsphere.scaling.core.job.JobStatus;
import org.apache.shardingsphere.scaling.core.job.ScalingJob;
import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyCheckResult;
import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyChecker;
@@ -44,9 +45,9 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
-import java.util.stream.Stream;
@Slf4j
public final class ScalingAPIImpl implements ScalingAPI {
@@ -76,12 +77,16 @@ public final class ScalingAPIImpl implements ScalingAPI {
}
private String getStatus(final Map<Integer, JobProgress> jobProgressMap) {
- Stream<JobProgress> jobPositionStream = jobProgressMap.values().stream()
- .filter(Objects::nonNull);
- Optional<JobProgress> jobPositionOptional = jobPositionStream
- .filter(each -> !each.getStatus().isRunning())
- .reduce((a, b) -> a);
- return jobPositionOptional.orElse(jobPositionStream.findAny().orElse(new JobProgress())).getStatus().name();
+ String result = null;
+ Set<JobProgress> jobProgressSet = jobProgressMap.values().stream()
+ .filter(Objects::nonNull)
+ .collect(Collectors.toSet());
+ for (JobProgress each : jobProgressSet) {
+ if (null == result || !each.getStatus().isRunning()) {
+ result = each.getStatus().name();
+ }
+ }
+ return null == result ? JobStatus.RUNNING.name() : result;
}
private int getInventoryFinishedPercentage(final Map<Integer, JobProgress> jobProgressMap) {
@@ -94,7 +99,7 @@ public final class ScalingAPIImpl implements ScalingAPI {
.flatMap(each -> each.getIncrementalTaskProgressMap().values().stream())
.filter(each -> each.getPosition() instanceof FinishedPosition)
.count();
- return (int) ((finished * 100 / total) * (jobProgressMap.size() - isNull) / jobProgressMap.size());
+ return total == 0 ? 0 : (int) ((finished * 100 / total) * (jobProgressMap.size() - isNull) / jobProgressMap.size());
}
private long getIncrementalAverageDelayMilliseconds(final Map<Integer, JobProgress> jobProgressMap) {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/channel/bitset/AbstractBitSetChannel.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/channel/distribution/AbstractBitSetChannel.java
similarity index 96%
rename from shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/channel/bitset/AbstractBitSetChannel.java
rename to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/channel/distribution/AbstractBitSetChannel.java
index 3cc1819..3ed7311 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/channel/bitset/AbstractBitSetChannel.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/channel/distribution/AbstractBitSetChannel.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.scaling.core.common.channel.bitset;
+package org.apache.shardingsphere.scaling.core.common.channel.distribution;
import lombok.AccessLevel;
import lombok.Getter;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/channel/bitset/AutoAcknowledgeChannel.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/channel/distribution/AutoAcknowledgeChannel.java
similarity index 95%
rename from shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/channel/bitset/AutoAcknowledgeChannel.java
rename to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/channel/distribution/AutoAcknowledgeChannel.java
index b632834..1bd6de4 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/channel/bitset/AutoAcknowledgeChannel.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/channel/distribution/AutoAcknowledgeChannel.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.scaling.core.common.channel.bitset;
+package org.apache.shardingsphere.scaling.core.common.channel.distribution;
import org.apache.shardingsphere.scaling.core.common.record.Record;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/channel/bitset/BitSetChannel.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/channel/distribution/BitSetChannel.java
similarity index 96%
rename from shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/channel/bitset/BitSetChannel.java
rename to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/channel/distribution/BitSetChannel.java
index fe81702..7389156 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/channel/bitset/BitSetChannel.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/channel/distribution/BitSetChannel.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.scaling.core.common.channel.bitset;
+package org.apache.shardingsphere.scaling.core.common.channel.distribution;
import org.apache.shardingsphere.scaling.core.common.record.Record;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/channel/bitset/BlockingQueueChannel.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/channel/distribution/BlockingQueueChannel.java
similarity index 96%
rename from shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/channel/bitset/BlockingQueueChannel.java
rename to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/channel/distribution/BlockingQueueChannel.java
index 1b24cf8..1e42772 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/channel/bitset/BlockingQueueChannel.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/channel/distribution/BlockingQueueChannel.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.scaling.core.common.channel.bitset;
+package org.apache.shardingsphere.scaling.core.common.channel.distribution;
import org.apache.shardingsphere.scaling.core.common.record.Record;
import org.apache.shardingsphere.scaling.core.config.ScalingContext;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/channel/DistributionChannel.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/channel/distribution/DistributionChannel.java
similarity index 95%
rename from shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/channel/DistributionChannel.java
rename to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/channel/distribution/DistributionChannel.java
index 9923771..fb2a28b 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/channel/DistributionChannel.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/channel/distribution/DistributionChannel.java
@@ -15,12 +15,11 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.scaling.core.common.channel;
+package org.apache.shardingsphere.scaling.core.common.channel.distribution;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.scaling.core.common.channel.bitset.AutoAcknowledgeChannel;
-import org.apache.shardingsphere.scaling.core.common.channel.bitset.BitSetChannel;
-import org.apache.shardingsphere.scaling.core.common.channel.bitset.BlockingQueueChannel;
+import org.apache.shardingsphere.scaling.core.common.channel.AckCallback;
+import org.apache.shardingsphere.scaling.core.common.channel.Channel;
import org.apache.shardingsphere.scaling.core.common.record.DataRecord;
import org.apache.shardingsphere.scaling.core.common.record.FinishedRecord;
import org.apache.shardingsphere.scaling.core.common.record.PlaceholderRecord;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/channel/bitset/ManualBitSet.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/channel/distribution/ManualBitSet.java
similarity index 98%
rename from shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/channel/bitset/ManualBitSet.java
rename to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/channel/distribution/ManualBitSet.java
index e2c86a4..26e7344 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/channel/bitset/ManualBitSet.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/channel/distribution/ManualBitSet.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.scaling.core.common.channel.bitset;
+package org.apache.shardingsphere.scaling.core.common.channel.distribution;
import java.util.BitSet;
import java.util.LinkedList;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalTask.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalTask.java
index 99e3700..ff2c888 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalTask.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalTask.java
@@ -19,7 +19,7 @@ package org.apache.shardingsphere.scaling.core.job.task.incremental;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.scaling.core.common.channel.DistributionChannel;
+import org.apache.shardingsphere.scaling.core.common.channel.distribution.DistributionChannel;
import org.apache.shardingsphere.scaling.core.common.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.common.exception.ScalingTaskExecuteException;
import org.apache.shardingsphere.scaling.core.common.record.Record;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImplTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImplTest.java
new file mode 100644
index 0000000..6ebb44d
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImplTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.core.api.impl;
+
+import org.apache.shardingsphere.governance.repository.api.config.GovernanceCenterConfiguration;
+import org.apache.shardingsphere.governance.repository.api.config.GovernanceConfiguration;
+import org.apache.shardingsphere.scaling.core.api.JobInfo;
+import org.apache.shardingsphere.scaling.core.api.ScalingAPI;
+import org.apache.shardingsphere.scaling.core.api.ScalingAPIFactory;
+import org.apache.shardingsphere.scaling.core.config.ScalingContext;
+import org.apache.shardingsphere.scaling.core.config.ServerConfiguration;
+import org.apache.shardingsphere.scaling.core.fixture.EmbedTestingServer;
+import org.apache.shardingsphere.scaling.core.job.JobStatus;
+import org.apache.shardingsphere.scaling.core.job.progress.JobProgress;
+import org.apache.shardingsphere.scaling.core.util.JobConfigurationUtil;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+public final class ScalingAPIImplTest {
+
+ private final ScalingAPI scalingAPI = ScalingAPIFactory.getScalingAPI();
+
+ @BeforeClass
+ public static void init() {
+ EmbedTestingServer.start();
+ ScalingContext.getInstance().init(mockServerConfig());
+ }
+
+ private static ServerConfiguration mockServerConfig() {
+ ServerConfiguration result = new ServerConfiguration();
+ result.setGovernanceConfig(new GovernanceConfiguration("test",
+ new GovernanceCenterConfiguration("Zookeeper", EmbedTestingServer.getConnectionString(), new Properties()), true));
+ return result;
+ }
+
+ @Test
+ public void assertStartAndList() {
+ Optional<Long> jobId = scalingAPI.start(JobConfigurationUtil.initJobConfig("/config.json"));
+ assertTrue(jobId.isPresent());
+ JobInfo jobInfo = getNonNullJobInfo(jobId.get());
+ assertTrue(jobInfo.isActive());
+ assertThat(jobInfo.getStatus(), is(JobStatus.RUNNING.name()));
+ assertThat(jobInfo.getTables(), is(new String[]{"ds_0.t1", "ds_0.t2"}));
+ assertThat(jobInfo.getShardingTotalCount(), is(2));
+ assertThat(jobInfo.getInventoryFinishedPercentage(), is(0));
+ assertThat(jobInfo.getIncrementalAverageDelayMilliseconds(), is(-1L));
+ }
+
+ private Optional<JobInfo> getJobInfo(final long jobId) {
+ return scalingAPI.list().stream()
+ .filter(each -> each.getJobId() == jobId)
+ .reduce((a, b) -> a);
+ }
+
+ private JobInfo getNonNullJobInfo(final long jobId) {
+ Optional<JobInfo> result = getJobInfo(jobId);
+ assertTrue(result.isPresent());
+ return result.get();
+ }
+
+ @Test
+ public void assertStartOrStopById() {
+ Optional<Long> jobId = scalingAPI.start(JobConfigurationUtil.initJobConfig("/config.json"));
+ assertTrue(jobId.isPresent());
+ assertTrue(getNonNullJobInfo(jobId.get()).isActive());
+ scalingAPI.stop(jobId.get());
+ assertFalse(getNonNullJobInfo(jobId.get()).isActive());
+ scalingAPI.start(jobId.get());
+ assertTrue(getNonNullJobInfo(jobId.get()).isActive());
+ }
+
+ @Test
+ public void assertRemove() {
+ Optional<Long> jobId = scalingAPI.start(JobConfigurationUtil.initJobConfig("/config.json"));
+ assertTrue(jobId.isPresent());
+ assertTrue(getJobInfo(jobId.get()).isPresent());
+ scalingAPI.remove(jobId.get());
+ assertFalse(getJobInfo(jobId.get()).isPresent());
+ }
+
+ @Test
+ public void assertGetProgress() {
+ Optional<Long> jobId = scalingAPI.start(JobConfigurationUtil.initJobConfig("/config.json"));
+ assertTrue(jobId.isPresent());
+ Map<Integer, JobProgress> jobProgressMap = scalingAPI.getProgress(jobId.get());
+ assertThat(jobProgressMap.size(), is(2));
+ }
+}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/common/channel/bitset/AutoAcknowledgeChannelTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/common/channel/distribution/AutoAcknowledgeChannelTest.java
similarity index 95%
rename from shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/common/channel/bitset/AutoAcknowledgeChannelTest.java
rename to shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/common/channel/distribution/AutoAcknowledgeChannelTest.java
index 320ae7d..d5016b1 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/common/channel/bitset/AutoAcknowledgeChannelTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/common/channel/distribution/AutoAcknowledgeChannelTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.scaling.core.common.channel.bitset;
+package org.apache.shardingsphere.scaling.core.common.channel.distribution;
import org.apache.shardingsphere.scaling.core.common.record.DataRecord;
import org.apache.shardingsphere.scaling.core.job.position.PlaceholderPosition;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/common/channel/DistributionChannelTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/common/channel/distribution/DistributionChannelTest.java
similarity index 96%
rename from shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/common/channel/DistributionChannelTest.java
rename to shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/common/channel/distribution/DistributionChannelTest.java
index 3bfe36b..cbdf18b 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/common/channel/DistributionChannelTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/common/channel/distribution/DistributionChannelTest.java
@@ -15,11 +15,12 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.scaling.core.common.channel;
+package org.apache.shardingsphere.scaling.core.common.channel.distribution;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.SneakyThrows;
+import org.apache.shardingsphere.scaling.core.common.channel.AckCallback;
import org.apache.shardingsphere.scaling.core.common.record.DataRecord;
import org.apache.shardingsphere.scaling.core.common.record.FinishedRecord;
import org.apache.shardingsphere.scaling.core.common.record.PlaceholderRecord;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/common/channel/bitset/ManualBitSetTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/common/channel/distribution/ManualBitSetTest.java
similarity index 97%
rename from shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/common/channel/bitset/ManualBitSetTest.java
rename to shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/common/channel/distribution/ManualBitSetTest.java
index dac22db..48f069a 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/common/channel/bitset/ManualBitSetTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/common/channel/distribution/ManualBitSetTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.scaling.core.common.channel.bitset;
+package org.apache.shardingsphere.scaling.core.common.channel.distribution;
import lombok.SneakyThrows;
import org.apache.shardingsphere.scaling.core.util.ReflectionUtil;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/EmbedTestingServer.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/EmbedTestingServer.java
new file mode 100644
index 0000000..4fc622e
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/EmbedTestingServer.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.core.fixture;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import org.apache.curator.test.TestingServer;
+import org.apache.zookeeper.KeeperException.ConnectionLossException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
+
+import java.io.File;
+import java.io.IOException;
+
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class EmbedTestingServer {
+
+ private static final int PORT = 3181;
+
+ private static volatile TestingServer testingServer;
+
+ /**
+ * Start embed zookeeper server.
+ */
+ public static void start() {
+ if (null != testingServer) {
+ return;
+ }
+ try {
+ testingServer = new TestingServer(PORT, new File(String.format("target/test_zk_data/%s/", System.nanoTime())));
+ // CHECKSTYLE:OFF
+ } catch (final Exception ex) {
+ // CHECKSTYLE:ON
+ if (!isIgnoredException(ex)) {
+ throw new RuntimeException(ex);
+ }
+ } finally {
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ try {
+ testingServer.close();
+ } catch (final IOException ignored) {
+ }
+ }));
+ }
+ }
+
+ private static boolean isIgnoredException(final Throwable cause) {
+ return cause instanceof ConnectionLossException || cause instanceof NoNodeException || cause instanceof NodeExistsException;
+ }
+
+ /**
+ * Get the connection string.
+ *
+ * @return connection string
+ */
+ public static String getConnectionString() {
+ return "localhost:" + PORT;
+ }
+}