You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by me...@apache.org on 2021/02/05 07:23:45 UTC

[shardingsphere] branch master updated: Add ScalingAPIImplTest (#9342)

This is an automated email from the ASF dual-hosted git repository.

menghaoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new af5566b  Add ScalingAPIImplTest (#9342)
af5566b is described below

commit af5566bb90107f84a42944e8da62d301c756852b
Author: 邱鹿 Lucas <lu...@163.com>
AuthorDate: Fri Feb 5 15:23:07 2021 +0800

    Add ScalingAPIImplTest (#9342)
    
    * rename package bitset to distribution
    
    * add ScalingAPIImplTest
    
    Co-authored-by: qiulu3 <Lucas209910>
---
 .../shardingsphere-scaling-core/pom.xml            |   6 ++
 .../scaling/core/api/impl/ScalingAPIImpl.java      |  21 ++--
 .../AbstractBitSetChannel.java                     |   2 +-
 .../AutoAcknowledgeChannel.java                    |   2 +-
 .../{bitset => distribution}/BitSetChannel.java    |   2 +-
 .../BlockingQueueChannel.java                      |   2 +-
 .../{ => distribution}/DistributionChannel.java    |   7 +-
 .../{bitset => distribution}/ManualBitSet.java     |   2 +-
 .../core/job/task/incremental/IncrementalTask.java |   2 +-
 .../scaling/core/api/impl/ScalingAPIImplTest.java  | 112 +++++++++++++++++++++
 .../AutoAcknowledgeChannelTest.java                |   2 +-
 .../DistributionChannelTest.java                   |   3 +-
 .../{bitset => distribution}/ManualBitSetTest.java |   2 +-
 .../scaling/core/fixture/EmbedTestingServer.java   |  74 ++++++++++++++
 14 files changed, 218 insertions(+), 21 deletions(-)

diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/pom.xml b/shardingsphere-scaling/shardingsphere-scaling-core/pom.xml
index bc5bba1..16b44da 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/pom.xml
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/pom.xml
@@ -95,5 +95,11 @@
             <artifactId>h2</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-test</artifactId>
+            <version>${curator.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImpl.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImpl.java
index d9777d4..de01d4b 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImpl.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImpl.java
@@ -29,6 +29,7 @@ import org.apache.shardingsphere.scaling.core.common.constant.ScalingConstant;
 import org.apache.shardingsphere.scaling.core.common.exception.ScalingJobNotFoundException;
 import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
 import org.apache.shardingsphere.scaling.core.job.JobContext;
+import org.apache.shardingsphere.scaling.core.job.JobStatus;
 import org.apache.shardingsphere.scaling.core.job.ScalingJob;
 import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyCheckResult;
 import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyChecker;
@@ -44,9 +45,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
-import java.util.stream.Stream;
 
 @Slf4j
 public final class ScalingAPIImpl implements ScalingAPI {
@@ -76,12 +77,16 @@ public final class ScalingAPIImpl implements ScalingAPI {
     }
     
     private String getStatus(final Map<Integer, JobProgress> jobProgressMap) {
-        Stream<JobProgress> jobPositionStream = jobProgressMap.values().stream()
-                .filter(Objects::nonNull);
-        Optional<JobProgress> jobPositionOptional = jobPositionStream
-                .filter(each -> !each.getStatus().isRunning())
-                .reduce((a, b) -> a);
-        return jobPositionOptional.orElse(jobPositionStream.findAny().orElse(new JobProgress())).getStatus().name();
+        String result = null;
+        Set<JobProgress> jobProgressSet = jobProgressMap.values().stream()
+                .filter(Objects::nonNull)
+                .collect(Collectors.toSet());
+        for (JobProgress each : jobProgressSet) {
+            if (null == result || !each.getStatus().isRunning()) {
+                result = each.getStatus().name();
+            }
+        }
+        return null == result ? JobStatus.RUNNING.name() : result;
     }
     
     private int getInventoryFinishedPercentage(final Map<Integer, JobProgress> jobProgressMap) {
@@ -94,7 +99,7 @@ public final class ScalingAPIImpl implements ScalingAPI {
                 .flatMap(each -> each.getIncrementalTaskProgressMap().values().stream())
                 .filter(each -> each.getPosition() instanceof FinishedPosition)
                 .count();
-        return (int) ((finished * 100 / total) * (jobProgressMap.size() - isNull) / jobProgressMap.size());
+        return total == 0 ? 0 : (int) ((finished * 100 / total) * (jobProgressMap.size() - isNull) / jobProgressMap.size());
     }
     
     private long getIncrementalAverageDelayMilliseconds(final Map<Integer, JobProgress> jobProgressMap) {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/channel/bitset/AbstractBitSetChannel.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/channel/distribution/AbstractBitSetChannel.java
similarity index 96%
rename from shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/channel/bitset/AbstractBitSetChannel.java
rename to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/channel/distribution/AbstractBitSetChannel.java
index 3cc1819..3ed7311 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/channel/bitset/AbstractBitSetChannel.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/channel/distribution/AbstractBitSetChannel.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.scaling.core.common.channel.bitset;
+package org.apache.shardingsphere.scaling.core.common.channel.distribution;
 
 import lombok.AccessLevel;
 import lombok.Getter;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/channel/bitset/AutoAcknowledgeChannel.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/channel/distribution/AutoAcknowledgeChannel.java
similarity index 95%
rename from shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/channel/bitset/AutoAcknowledgeChannel.java
rename to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/channel/distribution/AutoAcknowledgeChannel.java
index b632834..1bd6de4 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/channel/bitset/AutoAcknowledgeChannel.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/channel/distribution/AutoAcknowledgeChannel.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.scaling.core.common.channel.bitset;
+package org.apache.shardingsphere.scaling.core.common.channel.distribution;
 
 import org.apache.shardingsphere.scaling.core.common.record.Record;
 
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/channel/bitset/BitSetChannel.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/channel/distribution/BitSetChannel.java
similarity index 96%
rename from shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/channel/bitset/BitSetChannel.java
rename to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/channel/distribution/BitSetChannel.java
index fe81702..7389156 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/channel/bitset/BitSetChannel.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/channel/distribution/BitSetChannel.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.scaling.core.common.channel.bitset;
+package org.apache.shardingsphere.scaling.core.common.channel.distribution;
 
 import org.apache.shardingsphere.scaling.core.common.record.Record;
 
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/channel/bitset/BlockingQueueChannel.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/channel/distribution/BlockingQueueChannel.java
similarity index 96%
rename from shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/channel/bitset/BlockingQueueChannel.java
rename to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/channel/distribution/BlockingQueueChannel.java
index 1b24cf8..1e42772 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/channel/bitset/BlockingQueueChannel.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/channel/distribution/BlockingQueueChannel.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.scaling.core.common.channel.bitset;
+package org.apache.shardingsphere.scaling.core.common.channel.distribution;
 
 import org.apache.shardingsphere.scaling.core.common.record.Record;
 import org.apache.shardingsphere.scaling.core.config.ScalingContext;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/channel/DistributionChannel.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/channel/distribution/DistributionChannel.java
similarity index 95%
rename from shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/channel/DistributionChannel.java
rename to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/channel/distribution/DistributionChannel.java
index 9923771..fb2a28b 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/channel/DistributionChannel.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/channel/distribution/DistributionChannel.java
@@ -15,12 +15,11 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.scaling.core.common.channel;
+package org.apache.shardingsphere.scaling.core.common.channel.distribution;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.scaling.core.common.channel.bitset.AutoAcknowledgeChannel;
-import org.apache.shardingsphere.scaling.core.common.channel.bitset.BitSetChannel;
-import org.apache.shardingsphere.scaling.core.common.channel.bitset.BlockingQueueChannel;
+import org.apache.shardingsphere.scaling.core.common.channel.AckCallback;
+import org.apache.shardingsphere.scaling.core.common.channel.Channel;
 import org.apache.shardingsphere.scaling.core.common.record.DataRecord;
 import org.apache.shardingsphere.scaling.core.common.record.FinishedRecord;
 import org.apache.shardingsphere.scaling.core.common.record.PlaceholderRecord;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/channel/bitset/ManualBitSet.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/channel/distribution/ManualBitSet.java
similarity index 98%
rename from shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/channel/bitset/ManualBitSet.java
rename to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/channel/distribution/ManualBitSet.java
index e2c86a4..26e7344 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/channel/bitset/ManualBitSet.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/channel/distribution/ManualBitSet.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.scaling.core.common.channel.bitset;
+package org.apache.shardingsphere.scaling.core.common.channel.distribution;
 
 import java.util.BitSet;
 import java.util.LinkedList;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalTask.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalTask.java
index 99e3700..ff2c888 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalTask.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalTask.java
@@ -19,7 +19,7 @@ package org.apache.shardingsphere.scaling.core.job.task.incremental;
 
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.scaling.core.common.channel.DistributionChannel;
+import org.apache.shardingsphere.scaling.core.common.channel.distribution.DistributionChannel;
 import org.apache.shardingsphere.scaling.core.common.datasource.DataSourceManager;
 import org.apache.shardingsphere.scaling.core.common.exception.ScalingTaskExecuteException;
 import org.apache.shardingsphere.scaling.core.common.record.Record;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImplTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImplTest.java
new file mode 100644
index 0000000..6ebb44d
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImplTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.core.api.impl;
+
+import org.apache.shardingsphere.governance.repository.api.config.GovernanceCenterConfiguration;
+import org.apache.shardingsphere.governance.repository.api.config.GovernanceConfiguration;
+import org.apache.shardingsphere.scaling.core.api.JobInfo;
+import org.apache.shardingsphere.scaling.core.api.ScalingAPI;
+import org.apache.shardingsphere.scaling.core.api.ScalingAPIFactory;
+import org.apache.shardingsphere.scaling.core.config.ScalingContext;
+import org.apache.shardingsphere.scaling.core.config.ServerConfiguration;
+import org.apache.shardingsphere.scaling.core.fixture.EmbedTestingServer;
+import org.apache.shardingsphere.scaling.core.job.JobStatus;
+import org.apache.shardingsphere.scaling.core.job.progress.JobProgress;
+import org.apache.shardingsphere.scaling.core.util.JobConfigurationUtil;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+public final class ScalingAPIImplTest {
+    
+    private final ScalingAPI scalingAPI = ScalingAPIFactory.getScalingAPI();
+    
+    @BeforeClass
+    public static void init() {
+        EmbedTestingServer.start();
+        ScalingContext.getInstance().init(mockServerConfig());
+    }
+    
+    private static ServerConfiguration mockServerConfig() {
+        ServerConfiguration result = new ServerConfiguration();
+        result.setGovernanceConfig(new GovernanceConfiguration("test",
+                new GovernanceCenterConfiguration("Zookeeper", EmbedTestingServer.getConnectionString(), new Properties()), true));
+        return result;
+    }
+    
+    @Test
+    public void assertStartAndList() {
+        Optional<Long> jobId = scalingAPI.start(JobConfigurationUtil.initJobConfig("/config.json"));
+        assertTrue(jobId.isPresent());
+        JobInfo jobInfo = getNonNullJobInfo(jobId.get());
+        assertTrue(jobInfo.isActive());
+        assertThat(jobInfo.getStatus(), is(JobStatus.RUNNING.name()));
+        assertThat(jobInfo.getTables(), is(new String[]{"ds_0.t1", "ds_0.t2"}));
+        assertThat(jobInfo.getShardingTotalCount(), is(2));
+        assertThat(jobInfo.getInventoryFinishedPercentage(), is(0));
+        assertThat(jobInfo.getIncrementalAverageDelayMilliseconds(), is(-1L));
+    }
+    
+    private Optional<JobInfo> getJobInfo(final long jobId) {
+        return scalingAPI.list().stream()
+                .filter(each -> each.getJobId() == jobId)
+                .reduce((a, b) -> a);
+    }
+    
+    private JobInfo getNonNullJobInfo(final long jobId) {
+        Optional<JobInfo> result = getJobInfo(jobId);
+        assertTrue(result.isPresent());
+        return result.get();
+    }
+    
+    @Test
+    public void assertStartOrStopById() {
+        Optional<Long> jobId = scalingAPI.start(JobConfigurationUtil.initJobConfig("/config.json"));
+        assertTrue(jobId.isPresent());
+        assertTrue(getNonNullJobInfo(jobId.get()).isActive());
+        scalingAPI.stop(jobId.get());
+        assertFalse(getNonNullJobInfo(jobId.get()).isActive());
+        scalingAPI.start(jobId.get());
+        assertTrue(getNonNullJobInfo(jobId.get()).isActive());
+    }
+    
+    @Test
+    public void assertRemove() {
+        Optional<Long> jobId = scalingAPI.start(JobConfigurationUtil.initJobConfig("/config.json"));
+        assertTrue(jobId.isPresent());
+        assertTrue(getJobInfo(jobId.get()).isPresent());
+        scalingAPI.remove(jobId.get());
+        assertFalse(getJobInfo(jobId.get()).isPresent());
+    }
+    
+    @Test
+    public void assertGetProgress() {
+        Optional<Long> jobId = scalingAPI.start(JobConfigurationUtil.initJobConfig("/config.json"));
+        assertTrue(jobId.isPresent());
+        Map<Integer, JobProgress> jobProgressMap = scalingAPI.getProgress(jobId.get());
+        assertThat(jobProgressMap.size(), is(2));
+    }
+}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/common/channel/bitset/AutoAcknowledgeChannelTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/common/channel/distribution/AutoAcknowledgeChannelTest.java
similarity index 95%
rename from shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/common/channel/bitset/AutoAcknowledgeChannelTest.java
rename to shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/common/channel/distribution/AutoAcknowledgeChannelTest.java
index 320ae7d..d5016b1 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/common/channel/bitset/AutoAcknowledgeChannelTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/common/channel/distribution/AutoAcknowledgeChannelTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.scaling.core.common.channel.bitset;
+package org.apache.shardingsphere.scaling.core.common.channel.distribution;
 
 import org.apache.shardingsphere.scaling.core.common.record.DataRecord;
 import org.apache.shardingsphere.scaling.core.job.position.PlaceholderPosition;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/common/channel/DistributionChannelTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/common/channel/distribution/DistributionChannelTest.java
similarity index 96%
rename from shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/common/channel/DistributionChannelTest.java
rename to shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/common/channel/distribution/DistributionChannelTest.java
index 3bfe36b..cbdf18b 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/common/channel/DistributionChannelTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/common/channel/distribution/DistributionChannelTest.java
@@ -15,11 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.scaling.core.common.channel;
+package org.apache.shardingsphere.scaling.core.common.channel.distribution;
 
 import lombok.AllArgsConstructor;
 import lombok.Getter;
 import lombok.SneakyThrows;
+import org.apache.shardingsphere.scaling.core.common.channel.AckCallback;
 import org.apache.shardingsphere.scaling.core.common.record.DataRecord;
 import org.apache.shardingsphere.scaling.core.common.record.FinishedRecord;
 import org.apache.shardingsphere.scaling.core.common.record.PlaceholderRecord;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/common/channel/bitset/ManualBitSetTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/common/channel/distribution/ManualBitSetTest.java
similarity index 97%
rename from shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/common/channel/bitset/ManualBitSetTest.java
rename to shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/common/channel/distribution/ManualBitSetTest.java
index dac22db..48f069a 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/common/channel/bitset/ManualBitSetTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/common/channel/distribution/ManualBitSetTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.scaling.core.common.channel.bitset;
+package org.apache.shardingsphere.scaling.core.common.channel.distribution;
 
 import lombok.SneakyThrows;
 import org.apache.shardingsphere.scaling.core.util.ReflectionUtil;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/EmbedTestingServer.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/EmbedTestingServer.java
new file mode 100644
index 0000000..4fc622e
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/EmbedTestingServer.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.core.fixture;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import org.apache.curator.test.TestingServer;
+import org.apache.zookeeper.KeeperException.ConnectionLossException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
+
+import java.io.File;
+import java.io.IOException;
+
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class EmbedTestingServer {
+    
+    private static final int PORT = 3181;
+    
+    private static volatile TestingServer testingServer;
+    
+    /**
+     * Start embed zookeeper server.
+     */
+    public static void start() {
+        if (null != testingServer) {
+            return;
+        }
+        try {
+            testingServer = new TestingServer(PORT, new File(String.format("target/test_zk_data/%s/", System.nanoTime())));
+            // CHECKSTYLE:OFF
+        } catch (final Exception ex) {
+            // CHECKSTYLE:ON
+            if (!isIgnoredException(ex)) {
+                throw new RuntimeException(ex);
+            }
+        } finally {
+            Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+                try {
+                    testingServer.close();
+                } catch (final IOException ignored) {
+                }
+            }));
+        }
+    }
+    
+    private static boolean isIgnoredException(final Throwable cause) {
+        return cause instanceof ConnectionLossException || cause instanceof NoNodeException || cause instanceof NodeExistsException;
+    }
+    
+    /**
+     * Get the connection string.
+     *
+     * @return connection string
+     */
+    public static String getConnectionString() {
+        return "localhost:" + PORT;
+    }
+}