You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/11/16 05:33:12 UTC
[shardingsphere] branch master updated: Refactor pipeline distributed barrier with SPI (#22187)
This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 858b043440f Refactor pipeline distributed barrier with SPI (#22187)
858b043440f is described below
commit 858b043440fd7dee0f084ede06e5e3e178828d59
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Wed Nov 16 13:32:55 2022 +0800
Refactor pipeline distributed barrier with SPI (#22187)
* Refactor pipeline distributed barrier with SPI
* Fix codestyle
---
.../spi/barrier/PipelineDistributedBarrier.java | 70 ++++++++++++++++++++++
.../barrier/PipelineDistributedBarrierFactory.java | 44 ++++++++++++++
.../core/api/impl/AbstractPipelineJobAPIImpl.java | 5 +-
.../pipeline/core/job/AbstractPipelineJob.java | 6 +-
.../impl/BarrierMetaDataChangedEventHandler.java | 4 +-
.../impl/PipelineDistributedBarrierImpl.java} | 29 +++------
...tencyCheckChangedJobConfigurationProcessor.java | 6 +-
.../MigrationChangedJobConfigurationProcessor.java | 8 +--
...pipeline.spi.barrier.PipelineDistributedBarrier | 18 ++++++
.../fixture/FixturePipelineDistributedBarrier.java | 51 ++++++++++++++++
...ava => PipelineDistributedBarrierImplTest.java} | 13 ++--
...pipeline.spi.barrier.PipelineDistributedBarrier | 35 +++++++++++
12 files changed, 244 insertions(+), 45 deletions(-)
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/barrier/PipelineDistributedBarrier.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/barrier/PipelineDistributedBarrier.java
new file mode 100644
index 00000000000..ce2e93c15b9
--- /dev/null
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/barrier/PipelineDistributedBarrier.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.spi.barrier;
+
+import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
+import org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPI;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Pipeline distributed barrier.
+ */
+@SingletonSPI
+public interface PipelineDistributedBarrier extends RequiredSPI {
+
+ /**
+ * Register distributed barrier.
+ *
+ * @param barrierPath barrier path
+ * @param totalCount total count
+ */
+ void register(String barrierPath, int totalCount);
+
+ /**
+ * Persist ephemeral children node.
+ *
+ * @param barrierPath barrier path
+ * @param shardingItem sharding item
+ */
+ void persistEphemeralChildrenNode(String barrierPath, int shardingItem);
+
+ /**
+ * Persist ephemeral children node.
+ *
+ * @param barrierPath barrier path
+ */
+ void unregister(String barrierPath);
+
+ /**
+ * Await barrier path all children node is ready.
+ *
+ * @param barrierPath barrier path
+ * @param timeout timeout
+ * @param timeUnit time unit
+ * @return true if the count reached zero and false if the waiting time elapsed before the count reached zero
+ */
+ boolean await(String barrierPath, long timeout, TimeUnit timeUnit);
+
+ /**
+ * notify children node count check.
+ *
+ * @param nodePath node path
+ */
+ void notifyChildrenNodeCountCheck(String nodePath);
+}
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/barrier/PipelineDistributedBarrierFactory.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/barrier/PipelineDistributedBarrierFactory.java
new file mode 100644
index 00000000000..46033bf6b33
--- /dev/null
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/barrier/PipelineDistributedBarrierFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.spi.barrier;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
+import org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPIRegistry;
+
+/**
+ * Pipeline distributed barrier factory.
+ */
+
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class PipelineDistributedBarrierFactory {
+
+ static {
+ ShardingSphereServiceLoader.register(PipelineDistributedBarrier.class);
+ }
+
+ /**
+ * Get instance of pipeline distributed barrier.
+ *
+ * @return got instance
+ */
+ public static PipelineDistributedBarrier getInstance() {
+ return RequiredSPIRegistry.getRegisteredService(PipelineDistributedBarrier.class);
+ }
+}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
index bd72448ba09..765058310ed 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
@@ -26,6 +26,8 @@ import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import org.apache.shardingsphere.data.pipeline.spi.barrier.PipelineDistributedBarrier;
+import org.apache.shardingsphere.data.pipeline.spi.barrier.PipelineDistributedBarrierFactory;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobCreationWithInvalidShardingCountException;
@@ -33,7 +35,6 @@ import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobHas
import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
-import org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.elasticjob.lite.lifecycle.domain.JobBriefInfo;
import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
@@ -56,7 +57,7 @@ public abstract class AbstractPipelineJobAPIImpl implements PipelineJobAPI {
protected static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
- private final PipelineDistributedBarrier pipelineDistributedBarrier = PipelineDistributedBarrier.getInstance();
+ private final PipelineDistributedBarrier pipelineDistributedBarrier = PipelineDistributedBarrierFactory.getInstance();
@Override
public final String marshalJobId(final PipelineJobId pipelineJobId) {
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
index 3aa0731dd7a..afbbf8ab8d5 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
@@ -26,10 +26,10 @@ import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.job.PipelineJob;
import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import org.apache.shardingsphere.data.pipeline.spi.barrier.PipelineDistributedBarrierFactory;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI;
import org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
-import org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.JobBootstrap;
import java.util.ArrayList;
@@ -58,8 +58,6 @@ public abstract class AbstractPipelineJob implements PipelineJob {
private final Map<Integer, PipelineTasksRunner> tasksRunnerMap = new ConcurrentHashMap<>();
- private final PipelineDistributedBarrier distributedBarrier = PipelineDistributedBarrier.getInstance();
-
protected void setJobId(final String jobId) {
this.jobId = jobId;
jobAPI = PipelineAPIFactory.getPipelineJobAPI(PipelineJobIdUtils.parseJobType(jobId));
@@ -102,7 +100,7 @@ public abstract class AbstractPipelineJob implements PipelineJob {
return false;
}
PipelineJobProgressPersistService.addJobProgressPersistContext(getJobId(), shardingItem);
- distributedBarrier.persistEphemeralChildrenNode(PipelineMetaDataNode.getJobBarrierEnablePath(getJobId()), shardingItem);
+ PipelineDistributedBarrierFactory.getInstance().persistEphemeralChildrenNode(PipelineMetaDataNode.getJobBarrierEnablePath(getJobId()), shardingItem);
return true;
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/impl/BarrierMetaDataChangedEventHandler.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/impl/BarrierMetaDataChangedEventHandler.java
index cce22a41d9c..9c788a98ee5 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/impl/BarrierMetaDataChangedEventHandler.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/impl/BarrierMetaDataChangedEventHandler.java
@@ -17,9 +17,9 @@
package org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.impl;
+import org.apache.shardingsphere.data.pipeline.spi.barrier.PipelineDistributedBarrierFactory;
import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
import org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineMetaDataChangedEventHandler;
-import org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
@@ -38,7 +38,7 @@ public final class BarrierMetaDataChangedEventHandler implements PipelineMetaDat
@Override
public void handle(final DataChangedEvent event) {
if (event.getType() == Type.ADDED) {
- PipelineDistributedBarrier.getInstance().checkChildrenNodeCount(event);
+ PipelineDistributedBarrierFactory.getInstance().notifyChildrenNodeCountCheck(event.getKey());
}
}
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrier.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/impl/PipelineDistributedBarrierImpl.java
similarity index 86%
rename from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrier.java
rename to kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/impl/PipelineDistributedBarrierImpl.java
index bf0553f9e69..eb05f3c323e 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrier.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/impl/PipelineDistributedBarrierImpl.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.util;
+package org.apache.shardingsphere.data.pipeline.core.spi.impl;
import com.google.common.base.Strings;
import lombok.Getter;
@@ -24,9 +24,9 @@ import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.concurrent.ConcurrentException;
import org.apache.commons.lang3.concurrent.LazyInitializer;
+import org.apache.shardingsphere.data.pipeline.spi.barrier.PipelineDistributedBarrier;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
-import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
import java.util.List;
import java.util.Map;
@@ -38,9 +38,7 @@ import java.util.concurrent.TimeUnit;
* Pipeline distributed barrier.
*/
@Slf4j
-public final class PipelineDistributedBarrier {
-
- private static final PipelineDistributedBarrier INSTANCE = new PipelineDistributedBarrier();
+public final class PipelineDistributedBarrierImpl implements PipelineDistributedBarrier {
private static final LazyInitializer<ClusterPersistRepository> REPOSITORY_LAZY_INITIALIZER = new LazyInitializer<ClusterPersistRepository>() {
@@ -57,15 +55,6 @@ public final class PipelineDistributedBarrier {
return REPOSITORY_LAZY_INITIALIZER.get();
}
- /**
- * Get instance.
- *
- * @return instance
- */
- public static PipelineDistributedBarrier getInstance() {
- return INSTANCE;
- }
-
/**
* Register count down latch.
*
@@ -129,16 +118,12 @@ public final class PipelineDistributedBarrier {
return false;
}
- /**
- * Check child node count equal sharding count.
- *
- * @param event event
- */
- public void checkChildrenNodeCount(final DataChangedEvent event) {
- if (Strings.isNullOrEmpty(event.getKey())) {
+ @Override
+ public void notifyChildrenNodeCountCheck(final String nodePath) {
+ if (Strings.isNullOrEmpty(nodePath)) {
return;
}
- String barrierPath = event.getKey().substring(0, event.getKey().lastIndexOf("/"));
+ String barrierPath = nodePath.substring(0, nodePath.lastIndexOf("/"));
InnerCountDownLatchHolder holder = countDownLatchMap.get(barrierPath);
if (null == holder) {
return;
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckChangedJobConfigurationProcessor.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckChangedJobConfigurationProcessor.java
index 462b1582119..29fb0b284a6 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckChangedJobConfigurationProcessor.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckChangedJobConfigurationProcessor.java
@@ -20,11 +20,11 @@ package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.job.JobType;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import org.apache.shardingsphere.data.pipeline.spi.barrier.PipelineDistributedBarrierFactory;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
import org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineChangedJobConfigurationProcessor;
-import org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
@@ -38,8 +38,6 @@ import java.util.concurrent.CompletableFuture;
@Slf4j
public final class ConsistencyCheckChangedJobConfigurationProcessor implements PipelineChangedJobConfigurationProcessor {
- private final PipelineDistributedBarrier distributedBarrier = PipelineDistributedBarrier.getInstance();
-
@Override
public void process(final DataChangedEvent.Type eventType, final JobConfigurationPOJO jobConfigPOJO) {
String jobId = jobConfigPOJO.getJobName();
@@ -47,7 +45,7 @@ public final class ConsistencyCheckChangedJobConfigurationProcessor implements P
Collection<Integer> shardingItems = PipelineJobCenter.getShardingItems(jobId);
PipelineJobCenter.stop(jobId);
for (Integer each : shardingItems) {
- distributedBarrier.persistEphemeralChildrenNode(PipelineMetaDataNode.getJobBarrierDisablePath(jobId), each);
+ PipelineDistributedBarrierFactory.getInstance().persistEphemeralChildrenNode(PipelineMetaDataNode.getJobBarrierDisablePath(jobId), each);
}
return;
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java
index 58098ad7ed8..f8607c00700 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java
@@ -20,11 +20,12 @@ package org.apache.shardingsphere.data.pipeline.scenario.migration;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.job.JobType;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import org.apache.shardingsphere.data.pipeline.spi.barrier.PipelineDistributedBarrier;
+import org.apache.shardingsphere.data.pipeline.spi.barrier.PipelineDistributedBarrierFactory;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
import org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineChangedJobConfigurationProcessor;
-import org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
import org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
@@ -39,16 +40,15 @@ import java.util.concurrent.CompletableFuture;
@Slf4j
public final class MigrationChangedJobConfigurationProcessor implements PipelineChangedJobConfigurationProcessor {
- private final PipelineDistributedBarrier distributedBarrier = PipelineDistributedBarrier.getInstance();
-
@Override
public void process(final Type eventType, final JobConfigurationPOJO jobConfigPOJO) {
String jobId = jobConfigPOJO.getJobName();
if (jobConfigPOJO.isDisabled()) {
Collection<Integer> shardingItems = PipelineJobCenter.getShardingItems(jobId);
PipelineJobCenter.stop(jobId);
+ PipelineDistributedBarrier pipelineDistributedBarrier = PipelineDistributedBarrierFactory.getInstance();
for (Integer each : shardingItems) {
- distributedBarrier.persistEphemeralChildrenNode(PipelineMetaDataNode.getJobBarrierDisablePath(jobId), each);
+ pipelineDistributedBarrier.persistEphemeralChildrenNode(PipelineMetaDataNode.getJobBarrierDisablePath(jobId), each);
}
return;
}
diff --git a/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.barrier.PipelineDistributedBarrier b/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.barrier.PipelineDistributedBarrier
new file mode 100644
index 00000000000..abb700ae1d4
--- /dev/null
+++ b/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.barrier.PipelineDistributedBarrier
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.shardingsphere.data.pipeline.core.spi.impl.PipelineDistributedBarrierImpl
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixturePipelineDistributedBarrier.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixturePipelineDistributedBarrier.java
new file mode 100644
index 00000000000..ac455f045d5
--- /dev/null
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixturePipelineDistributedBarrier.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.fixture;
+
+import org.apache.shardingsphere.data.pipeline.spi.barrier.PipelineDistributedBarrier;
+
+import java.util.concurrent.TimeUnit;
+
+public final class FixturePipelineDistributedBarrier implements PipelineDistributedBarrier {
+
+ @Override
+ public void register(final String barrierPath, final int totalCount) {
+ }
+
+ @Override
+ public void persistEphemeralChildrenNode(final String barrierPath, final int shardingItem) {
+ }
+
+ @Override
+ public void unregister(final String barrierPath) {
+ }
+
+ @Override
+ public boolean await(final String barrierPath, final long timeout, final TimeUnit timeUnit) {
+ return true;
+ }
+
+ @Override
+ public void notifyChildrenNodeCountCheck(final String nodePath) {
+ }
+
+ @Override
+ public boolean isDefault() {
+ return true;
+ }
+}
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrierTest.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrierImplTest.java
similarity index 84%
rename from test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrierTest.java
rename to test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrierImplTest.java
index 31a5231548a..cf3fa8fc9bc 100644
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrierTest.java
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrierImplTest.java
@@ -17,11 +17,10 @@
package org.apache.shardingsphere.data.pipeline.core.util;
+import org.apache.shardingsphere.data.pipeline.core.spi.impl.PipelineDistributedBarrierImpl;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
import org.apache.shardingsphere.mode.persist.PersistRepository;
-import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
-import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -32,8 +31,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
-@SuppressWarnings("rawtypes")
-public final class PipelineDistributedBarrierTest {
+public final class PipelineDistributedBarrierImplTest {
@BeforeClass
public static void setUp() {
@@ -41,11 +39,12 @@ public final class PipelineDistributedBarrierTest {
}
@Test
+ @SuppressWarnings("rawtypes")
public void assertRegisterAndRemove() throws NoSuchFieldException, IllegalAccessException {
String jobId = "j0130317c3054317c7363616c696e675f626d73716c";
PersistRepository repository = PipelineContext.getContextManager().getMetaDataContexts().getPersistService().getRepository();
repository.persist(PipelineMetaDataNode.getJobRootPath(jobId), "");
- PipelineDistributedBarrier instance = PipelineDistributedBarrier.getInstance();
+ PipelineDistributedBarrierImpl instance = new PipelineDistributedBarrierImpl();
String parentPath = "/barrier";
instance.register(parentPath, 1);
Map countDownLatchMap = ReflectionUtil.getFieldValue(instance, "countDownLatchMap", Map.class);
@@ -60,13 +59,13 @@ public final class PipelineDistributedBarrierTest {
String jobId = "j0130317c3054317c7363616c696e675f626d73716c";
PersistRepository repository = PipelineContext.getContextManager().getMetaDataContexts().getPersistService().getRepository();
repository.persist(PipelineMetaDataNode.getJobRootPath(jobId), "");
- PipelineDistributedBarrier instance = PipelineDistributedBarrier.getInstance();
+ PipelineDistributedBarrierImpl instance = new PipelineDistributedBarrierImpl();
String barrierEnablePath = PipelineMetaDataNode.getJobBarrierEnablePath(jobId);
instance.register(barrierEnablePath, 1);
instance.persistEphemeralChildrenNode(barrierEnablePath, 1);
boolean actual = instance.await(barrierEnablePath, 1, TimeUnit.SECONDS);
assertFalse(actual);
- instance.checkChildrenNodeCount(new DataChangedEvent(barrierEnablePath + "/0", "", Type.ADDED));
+ instance.notifyChildrenNodeCountCheck(barrierEnablePath + "/0");
actual = instance.await(barrierEnablePath, 1, TimeUnit.SECONDS);
assertTrue(actual);
}
diff --git a/test/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.barrier.PipelineDistributedBarrier b/test/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.barrier.PipelineDistributedBarrier
new file mode 100644
index 00000000000..61327b06465
--- /dev/null
+++ b/test/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.barrier.PipelineDistributedBarrier
@@ -0,0 +1,35 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.shardingsphere.data.pipeline.core.fixture.FixturePipelineDistributedBarrier