You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/11/16 05:33:12 UTC

[shardingsphere] branch master updated: Refactor pipeline distributed barrier with SPI (#22187)

This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 858b043440f Refactor pipeline distributed barrier with SPI (#22187)
858b043440f is described below

commit 858b043440fd7dee0f084ede06e5e3e178828d59
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Wed Nov 16 13:32:55 2022 +0800

    Refactor pipeline distributed barrier with SPI (#22187)
    
    * Refactor pipeline distributed barrier with SPI
    
    * Fix codestyle
---
 .../spi/barrier/PipelineDistributedBarrier.java    | 70 ++++++++++++++++++++++
 .../barrier/PipelineDistributedBarrierFactory.java | 44 ++++++++++++++
 .../core/api/impl/AbstractPipelineJobAPIImpl.java  |  5 +-
 .../pipeline/core/job/AbstractPipelineJob.java     |  6 +-
 .../impl/BarrierMetaDataChangedEventHandler.java   |  4 +-
 .../impl/PipelineDistributedBarrierImpl.java}      | 29 +++------
 ...tencyCheckChangedJobConfigurationProcessor.java |  6 +-
 .../MigrationChangedJobConfigurationProcessor.java |  8 +--
 ...pipeline.spi.barrier.PipelineDistributedBarrier | 18 ++++++
 .../fixture/FixturePipelineDistributedBarrier.java | 51 ++++++++++++++++
 ...ava => PipelineDistributedBarrierImplTest.java} | 13 ++--
 ...pipeline.spi.barrier.PipelineDistributedBarrier | 35 +++++++++++
 12 files changed, 244 insertions(+), 45 deletions(-)

diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/barrier/PipelineDistributedBarrier.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/barrier/PipelineDistributedBarrier.java
new file mode 100644
index 00000000000..ce2e93c15b9
--- /dev/null
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/barrier/PipelineDistributedBarrier.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.spi.barrier;
+
+import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
+import org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPI;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Pipeline distributed barrier.
+ */
+@SingletonSPI
+public interface PipelineDistributedBarrier extends RequiredSPI {
+    
+    /**
+     * Register distributed barrier.
+     *
+     * @param barrierPath barrier path
+     * @param totalCount total count
+     */
+    void register(String barrierPath, int totalCount);
+    
+    /**
+     * Persist ephemeral children node.
+     *
+     * @param barrierPath barrier path
+     * @param shardingItem sharding item
+     */
+    void persistEphemeralChildrenNode(String barrierPath, int shardingItem);
+    
+    /**
+     * Persist ephemeral children node.
+     *
+     * @param barrierPath barrier path
+     */
+    void unregister(String barrierPath);
+    
+    /**
+     * Await barrier path all children node is ready.
+     *
+     * @param barrierPath barrier path
+     * @param timeout timeout
+     * @param timeUnit time unit
+     * @return true if the count reached zero and false if the waiting time elapsed before the count reached zero
+     */
+    boolean await(String barrierPath, long timeout, TimeUnit timeUnit);
+    
+    /**
+     * notify children node count check.
+     *
+     * @param nodePath node path
+     */
+    void notifyChildrenNodeCountCheck(String nodePath);
+}
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/barrier/PipelineDistributedBarrierFactory.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/barrier/PipelineDistributedBarrierFactory.java
new file mode 100644
index 00000000000..46033bf6b33
--- /dev/null
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/barrier/PipelineDistributedBarrierFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.spi.barrier;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
+import org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPIRegistry;
+
+/**
+ * Pipeline distributed barrier factory.
+ */
+
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class PipelineDistributedBarrierFactory {
+    
+    static {
+        ShardingSphereServiceLoader.register(PipelineDistributedBarrier.class);
+    }
+    
+    /**
+     * Get instance of pipeline distributed barrier.
+     *
+     * @return got instance
+     */
+    public static PipelineDistributedBarrier getInstance() {
+        return RequiredSPIRegistry.getRegisteredService(PipelineDistributedBarrier.class);
+    }
+}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
index bd72448ba09..765058310ed 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
@@ -26,6 +26,8 @@ import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
 import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
 import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import org.apache.shardingsphere.data.pipeline.spi.barrier.PipelineDistributedBarrier;
+import org.apache.shardingsphere.data.pipeline.spi.barrier.PipelineDistributedBarrierFactory;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobCreationWithInvalidShardingCountException;
@@ -33,7 +35,6 @@ import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobHas
 import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
-import org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import org.apache.shardingsphere.elasticjob.lite.lifecycle.domain.JobBriefInfo;
 import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
@@ -56,7 +57,7 @@ public abstract class AbstractPipelineJobAPIImpl implements PipelineJobAPI {
     
     protected static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
     
-    private final PipelineDistributedBarrier pipelineDistributedBarrier = PipelineDistributedBarrier.getInstance();
+    private final PipelineDistributedBarrier pipelineDistributedBarrier = PipelineDistributedBarrierFactory.getInstance();
     
     @Override
     public final String marshalJobId(final PipelineJobId pipelineJobId) {
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
index 3aa0731dd7a..afbbf8ab8d5 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
@@ -26,10 +26,10 @@ import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.api.job.PipelineJob;
 import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import org.apache.shardingsphere.data.pipeline.spi.barrier.PipelineDistributedBarrierFactory;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI;
 import org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
 import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
-import org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
 import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.JobBootstrap;
 
 import java.util.ArrayList;
@@ -58,8 +58,6 @@ public abstract class AbstractPipelineJob implements PipelineJob {
     
     private final Map<Integer, PipelineTasksRunner> tasksRunnerMap = new ConcurrentHashMap<>();
     
-    private final PipelineDistributedBarrier distributedBarrier = PipelineDistributedBarrier.getInstance();
-    
     protected void setJobId(final String jobId) {
         this.jobId = jobId;
         jobAPI = PipelineAPIFactory.getPipelineJobAPI(PipelineJobIdUtils.parseJobType(jobId));
@@ -102,7 +100,7 @@ public abstract class AbstractPipelineJob implements PipelineJob {
             return false;
         }
         PipelineJobProgressPersistService.addJobProgressPersistContext(getJobId(), shardingItem);
-        distributedBarrier.persistEphemeralChildrenNode(PipelineMetaDataNode.getJobBarrierEnablePath(getJobId()), shardingItem);
+        PipelineDistributedBarrierFactory.getInstance().persistEphemeralChildrenNode(PipelineMetaDataNode.getJobBarrierEnablePath(getJobId()), shardingItem);
         return true;
     }
     
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/impl/BarrierMetaDataChangedEventHandler.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/impl/BarrierMetaDataChangedEventHandler.java
index cce22a41d9c..9c788a98ee5 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/impl/BarrierMetaDataChangedEventHandler.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/impl/BarrierMetaDataChangedEventHandler.java
@@ -17,9 +17,9 @@
 
 package org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.impl;
 
+import org.apache.shardingsphere.data.pipeline.spi.barrier.PipelineDistributedBarrierFactory;
 import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
 import org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineMetaDataChangedEventHandler;
-import org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
 import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
 import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
 
@@ -38,7 +38,7 @@ public final class BarrierMetaDataChangedEventHandler implements PipelineMetaDat
     @Override
     public void handle(final DataChangedEvent event) {
         if (event.getType() == Type.ADDED) {
-            PipelineDistributedBarrier.getInstance().checkChildrenNodeCount(event);
+            PipelineDistributedBarrierFactory.getInstance().notifyChildrenNodeCountCheck(event.getKey());
         }
     }
 }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrier.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/impl/PipelineDistributedBarrierImpl.java
similarity index 86%
rename from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrier.java
rename to kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/impl/PipelineDistributedBarrierImpl.java
index bf0553f9e69..eb05f3c323e 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrier.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/impl/PipelineDistributedBarrierImpl.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.util;
+package org.apache.shardingsphere.data.pipeline.core.spi.impl;
 
 import com.google.common.base.Strings;
 import lombok.Getter;
@@ -24,9 +24,9 @@ import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.concurrent.ConcurrentException;
 import org.apache.commons.lang3.concurrent.LazyInitializer;
+import org.apache.shardingsphere.data.pipeline.spi.barrier.PipelineDistributedBarrier;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
-import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
 
 import java.util.List;
 import java.util.Map;
@@ -38,9 +38,7 @@ import java.util.concurrent.TimeUnit;
  * Pipeline distributed barrier.
  */
 @Slf4j
-public final class PipelineDistributedBarrier {
-    
-    private static final PipelineDistributedBarrier INSTANCE = new PipelineDistributedBarrier();
+public final class PipelineDistributedBarrierImpl implements PipelineDistributedBarrier {
     
     private static final LazyInitializer<ClusterPersistRepository> REPOSITORY_LAZY_INITIALIZER = new LazyInitializer<ClusterPersistRepository>() {
         
@@ -57,15 +55,6 @@ public final class PipelineDistributedBarrier {
         return REPOSITORY_LAZY_INITIALIZER.get();
     }
     
-    /**
-     * Get instance.
-     *
-     * @return instance
-     */
-    public static PipelineDistributedBarrier getInstance() {
-        return INSTANCE;
-    }
-    
     /**
      * Register count down latch.
      *
@@ -129,16 +118,12 @@ public final class PipelineDistributedBarrier {
         return false;
     }
     
-    /**
-     * Check child node count equal sharding count.
-     *
-     * @param event event
-     */
-    public void checkChildrenNodeCount(final DataChangedEvent event) {
-        if (Strings.isNullOrEmpty(event.getKey())) {
+    @Override
+    public void notifyChildrenNodeCountCheck(final String nodePath) {
+        if (Strings.isNullOrEmpty(nodePath)) {
             return;
         }
-        String barrierPath = event.getKey().substring(0, event.getKey().lastIndexOf("/"));
+        String barrierPath = nodePath.substring(0, nodePath.lastIndexOf("/"));
         InnerCountDownLatchHolder holder = countDownLatchMap.get(barrierPath);
         if (null == holder) {
             return;
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckChangedJobConfigurationProcessor.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckChangedJobConfigurationProcessor.java
index 462b1582119..29fb0b284a6 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckChangedJobConfigurationProcessor.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckChangedJobConfigurationProcessor.java
@@ -20,11 +20,11 @@ package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.job.JobType;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import org.apache.shardingsphere.data.pipeline.spi.barrier.PipelineDistributedBarrierFactory;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
 import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
 import org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineChangedJobConfigurationProcessor;
-import org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
 import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
@@ -38,8 +38,6 @@ import java.util.concurrent.CompletableFuture;
 @Slf4j
 public final class ConsistencyCheckChangedJobConfigurationProcessor implements PipelineChangedJobConfigurationProcessor {
     
-    private final PipelineDistributedBarrier distributedBarrier = PipelineDistributedBarrier.getInstance();
-    
     @Override
     public void process(final DataChangedEvent.Type eventType, final JobConfigurationPOJO jobConfigPOJO) {
         String jobId = jobConfigPOJO.getJobName();
@@ -47,7 +45,7 @@ public final class ConsistencyCheckChangedJobConfigurationProcessor implements P
             Collection<Integer> shardingItems = PipelineJobCenter.getShardingItems(jobId);
             PipelineJobCenter.stop(jobId);
             for (Integer each : shardingItems) {
-                distributedBarrier.persistEphemeralChildrenNode(PipelineMetaDataNode.getJobBarrierDisablePath(jobId), each);
+                PipelineDistributedBarrierFactory.getInstance().persistEphemeralChildrenNode(PipelineMetaDataNode.getJobBarrierDisablePath(jobId), each);
             }
             return;
         }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java
index 58098ad7ed8..f8607c00700 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java
@@ -20,11 +20,12 @@ package org.apache.shardingsphere.data.pipeline.scenario.migration;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.job.JobType;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import org.apache.shardingsphere.data.pipeline.spi.barrier.PipelineDistributedBarrier;
+import org.apache.shardingsphere.data.pipeline.spi.barrier.PipelineDistributedBarrierFactory;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
 import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
 import org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineChangedJobConfigurationProcessor;
-import org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
 import org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
@@ -39,16 +40,15 @@ import java.util.concurrent.CompletableFuture;
 @Slf4j
 public final class MigrationChangedJobConfigurationProcessor implements PipelineChangedJobConfigurationProcessor {
     
-    private final PipelineDistributedBarrier distributedBarrier = PipelineDistributedBarrier.getInstance();
-    
     @Override
     public void process(final Type eventType, final JobConfigurationPOJO jobConfigPOJO) {
         String jobId = jobConfigPOJO.getJobName();
         if (jobConfigPOJO.isDisabled()) {
             Collection<Integer> shardingItems = PipelineJobCenter.getShardingItems(jobId);
             PipelineJobCenter.stop(jobId);
+            PipelineDistributedBarrier pipelineDistributedBarrier = PipelineDistributedBarrierFactory.getInstance();
             for (Integer each : shardingItems) {
-                distributedBarrier.persistEphemeralChildrenNode(PipelineMetaDataNode.getJobBarrierDisablePath(jobId), each);
+                pipelineDistributedBarrier.persistEphemeralChildrenNode(PipelineMetaDataNode.getJobBarrierDisablePath(jobId), each);
             }
             return;
         }
diff --git a/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.barrier.PipelineDistributedBarrier b/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.barrier.PipelineDistributedBarrier
new file mode 100644
index 00000000000..abb700ae1d4
--- /dev/null
+++ b/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.barrier.PipelineDistributedBarrier
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.shardingsphere.data.pipeline.core.spi.impl.PipelineDistributedBarrierImpl
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixturePipelineDistributedBarrier.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixturePipelineDistributedBarrier.java
new file mode 100644
index 00000000000..ac455f045d5
--- /dev/null
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixturePipelineDistributedBarrier.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.fixture;
+
+import org.apache.shardingsphere.data.pipeline.spi.barrier.PipelineDistributedBarrier;
+
+import java.util.concurrent.TimeUnit;
+
+public final class FixturePipelineDistributedBarrier implements PipelineDistributedBarrier {
+    
+    @Override
+    public void register(final String barrierPath, final int totalCount) {
+    }
+    
+    @Override
+    public void persistEphemeralChildrenNode(final String barrierPath, final int shardingItem) {
+    }
+    
+    @Override
+    public void unregister(final String barrierPath) {
+    }
+    
+    @Override
+    public boolean await(final String barrierPath, final long timeout, final TimeUnit timeUnit) {
+        return true;
+    }
+    
+    @Override
+    public void notifyChildrenNodeCountCheck(final String nodePath) {
+    }
+    
+    @Override
+    public boolean isDefault() {
+        return true;
+    }
+}
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrierTest.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrierImplTest.java
similarity index 84%
rename from test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrierTest.java
rename to test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrierImplTest.java
index 31a5231548a..cf3fa8fc9bc 100644
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrierTest.java
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrierImplTest.java
@@ -17,11 +17,10 @@
 
 package org.apache.shardingsphere.data.pipeline.core.util;
 
+import org.apache.shardingsphere.data.pipeline.core.spi.impl.PipelineDistributedBarrierImpl;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
 import org.apache.shardingsphere.mode.persist.PersistRepository;
-import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
-import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -32,8 +31,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
-@SuppressWarnings("rawtypes")
-public final class PipelineDistributedBarrierTest {
+public final class PipelineDistributedBarrierImplTest {
     
     @BeforeClass
     public static void setUp() {
@@ -41,11 +39,12 @@ public final class PipelineDistributedBarrierTest {
     }
     
     @Test
+    @SuppressWarnings("rawtypes")
     public void assertRegisterAndRemove() throws NoSuchFieldException, IllegalAccessException {
         String jobId = "j0130317c3054317c7363616c696e675f626d73716c";
         PersistRepository repository = PipelineContext.getContextManager().getMetaDataContexts().getPersistService().getRepository();
         repository.persist(PipelineMetaDataNode.getJobRootPath(jobId), "");
-        PipelineDistributedBarrier instance = PipelineDistributedBarrier.getInstance();
+        PipelineDistributedBarrierImpl instance = new PipelineDistributedBarrierImpl();
         String parentPath = "/barrier";
         instance.register(parentPath, 1);
         Map countDownLatchMap = ReflectionUtil.getFieldValue(instance, "countDownLatchMap", Map.class);
@@ -60,13 +59,13 @@ public final class PipelineDistributedBarrierTest {
         String jobId = "j0130317c3054317c7363616c696e675f626d73716c";
         PersistRepository repository = PipelineContext.getContextManager().getMetaDataContexts().getPersistService().getRepository();
         repository.persist(PipelineMetaDataNode.getJobRootPath(jobId), "");
-        PipelineDistributedBarrier instance = PipelineDistributedBarrier.getInstance();
+        PipelineDistributedBarrierImpl instance = new PipelineDistributedBarrierImpl();
         String barrierEnablePath = PipelineMetaDataNode.getJobBarrierEnablePath(jobId);
         instance.register(barrierEnablePath, 1);
         instance.persistEphemeralChildrenNode(barrierEnablePath, 1);
         boolean actual = instance.await(barrierEnablePath, 1, TimeUnit.SECONDS);
         assertFalse(actual);
-        instance.checkChildrenNodeCount(new DataChangedEvent(barrierEnablePath + "/0", "", Type.ADDED));
+        instance.notifyChildrenNodeCountCheck(barrierEnablePath + "/0");
         actual = instance.await(barrierEnablePath, 1, TimeUnit.SECONDS);
         assertTrue(actual);
     }
diff --git a/test/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.barrier.PipelineDistributedBarrier b/test/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.barrier.PipelineDistributedBarrier
new file mode 100644
index 00000000000..61327b06465
--- /dev/null
+++ b/test/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.barrier.PipelineDistributedBarrier
@@ -0,0 +1,35 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.shardingsphere.data.pipeline.core.fixture.FixturePipelineDistributedBarrier