You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/10/26 05:21:46 UTC
[shardingsphere] branch master updated: Improve stop pipeline job, extract method to the upper level (#21750)
This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 32e4b3ab0e2 Improve stop pipeline job, extract method to the upper level (#21750)
32e4b3ab0e2 is described below
commit 32e4b3ab0e2cc3e4fd090491450455a29b3b49d6
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Wed Oct 26 13:21:33 2022 +0800
Improve stop pipeline job, extract method to the upper level (#21750)
* Improve job sync to stop, extract method to the upper level
* revise comment
* Fix divide zero
* Fix codestyle
* Fix unexpected error
* Improve
---
.../core/api/impl/AbstractPipelineJobAPIImpl.java | 2 +-
.../pipeline/core/job/AbstractPipelineJob.java | 27 ++++++++++++++++++++++
.../data/pipeline/core/task/IncrementalTask.java | 6 +++--
.../core/util/PipelineDistributedBarrier.java | 4 ++++
...tencyCheckChangedJobConfigurationProcessor.java | 5 ++++
.../consistencycheck/ConsistencyCheckJob.java | 17 ++++----------
.../ConsistencyCheckJobAPIImpl.java | 2 +-
.../MigrationChangedJobConfigurationProcessor.java | 5 ++++
.../pipeline/scenario/migration/MigrationJob.java | 18 ++++-----------
9 files changed, 55 insertions(+), 31 deletions(-)
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
index ff09f119145..c2d64115917 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
@@ -140,7 +140,7 @@ public abstract class AbstractPipelineJobAPIImpl implements PipelineJobAPI {
jobConfigPOJO.getProps().setProperty("stop_time_millis", System.currentTimeMillis() + "");
PipelineAPIFactory.getJobConfigurationAPI().updateJobConfiguration(jobConfigPOJO);
String barrierPath = PipelineMetaDataNode.getJobBarrierDisablePath(jobId);
- pipelineDistributedBarrier.register(barrierPath, jobConfigPOJO.getShardingTotalCount());
+ pipelineDistributedBarrier.register(barrierPath, 1);
pipelineDistributedBarrier.await(barrierPath, 5, TimeUnit.SECONDS);
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
index c29e4310a55..3b8d5c6f628 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
@@ -17,12 +17,17 @@
package org.apache.shardingsphere.data.pipeline.core.job;
+import lombok.AccessLevel;
import lombok.Getter;
import lombok.Setter;
import org.apache.shardingsphere.data.pipeline.api.job.PipelineJob;
import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
+import org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
+import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
+import org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.JobBootstrap;
+import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
@@ -42,8 +47,11 @@ public abstract class AbstractPipelineJob implements PipelineJob {
@Setter
private volatile JobBootstrap jobBootstrap;
+ @Getter(value = AccessLevel.PRIVATE)
private final Map<Integer, PipelineTasksRunner> tasksRunnerMap = new ConcurrentHashMap<>();
+ private final PipelineDistributedBarrier distributedBarrier = PipelineDistributedBarrier.getInstance();
+
protected void runInBackground(final Runnable runnable) {
new Thread(runnable).start();
}
@@ -52,4 +60,23 @@ public abstract class AbstractPipelineJob implements PipelineJob {
public Optional<PipelineTasksRunner> getTasksRunner(final int shardingItem) {
return Optional.ofNullable(tasksRunnerMap.get(shardingItem));
}
+
+ protected void addTaskRunner(final int shardingItem, final PipelineTasksRunner tasksRunner) {
+ tasksRunnerMap.put(shardingItem, tasksRunner);
+ PipelineJobProgressPersistService.addJobProgressPersistContext(getJobId(), shardingItem);
+ distributedBarrier.persistEphemeralChildrenNode(PipelineMetaDataNode.getJobBarrierEnablePath(getJobId()), shardingItem);
+ }
+
+ protected boolean containsTaskRunner(final int shardingItem) {
+ return tasksRunnerMap.containsKey(shardingItem);
+ }
+
+ protected void clearTaskRunner() {
+ tasksRunnerMap.clear();
+ PipelineJobProgressPersistService.removeJobProgressPersistContext(jobId);
+ }
+
+ protected Collection<PipelineTasksRunner> getTaskRunners() {
+ return tasksRunnerMap.values();
+ }
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
index 98de46f4a8a..c30f072def6 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
@@ -42,6 +42,7 @@ import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDump
import java.util.Collection;
import java.util.LinkedList;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
/**
@@ -82,8 +83,9 @@ public final class IncrementalTask implements PipelineTask, AutoCloseable {
private IncrementalTaskProgress createIncrementalTaskProgress(final IngestPosition<?> position, final InventoryIncrementalJobItemProgress jobItemProgress) {
IncrementalTaskProgress incrementalTaskProgress = new IncrementalTaskProgress();
incrementalTaskProgress.setPosition(position);
- if (null != jobItemProgress) {
- incrementalTaskProgress.setIncrementalTaskDelay(jobItemProgress.getIncremental().getIncrementalTaskProgress().getIncrementalTaskDelay());
+ if (null != jobItemProgress && null != jobItemProgress.getIncremental()) {
+ Optional.ofNullable(jobItemProgress.getIncremental().getIncrementalTaskProgress())
+ .ifPresent(optional -> incrementalTaskProgress.setIncrementalTaskDelay(jobItemProgress.getIncremental().getIncrementalTaskProgress().getIncrementalTaskDelay()));
}
return incrementalTaskProgress;
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrier.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrier.java
index 0e74180f818..c856a565e7c 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrier.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrier.java
@@ -84,6 +84,10 @@ public final class PipelineDistributedBarrier {
* @param shardingItem sharding item
*/
public void persistEphemeralChildrenNode(final String parentPath, final int shardingItem) {
+ if (!getRepository().isExisted(parentPath)) {
+ log.info("parent path {} not exist, ignore", parentPath);
+ return;
+ }
String key = String.join("/", parentPath, Integer.toString(shardingItem));
getRepository().delete(key);
getRepository().persistEphemeral(key, "");
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckChangedJobConfigurationProcessor.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckChangedJobConfigurationProcessor.java
index 406fe64310c..e2e5564fa34 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckChangedJobConfigurationProcessor.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckChangedJobConfigurationProcessor.java
@@ -22,7 +22,9 @@ import org.apache.shardingsphere.data.pipeline.api.job.JobType;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
+import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
import org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineChangedJobConfigurationProcessor;
+import org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
@@ -35,12 +37,15 @@ import java.util.concurrent.CompletableFuture;
@Slf4j
public final class ConsistencyCheckChangedJobConfigurationProcessor implements PipelineChangedJobConfigurationProcessor {
+ private final PipelineDistributedBarrier distributedBarrier = PipelineDistributedBarrier.getInstance();
+
@Override
public void process(final DataChangedEvent.Type eventType, final JobConfigurationPOJO jobConfigPOJO) {
String jobId = jobConfigPOJO.getJobName();
if (jobConfigPOJO.isDisabled()) {
log.info("{} is disabled", jobId);
PipelineJobCenter.stop(jobId);
+ distributedBarrier.persistEphemeralChildrenNode(PipelineMetaDataNode.getJobBarrierDisablePath(jobId), 0);
return;
}
switch (eventType) {
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
index 5cc1b0a5066..7012b4690f7 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
@@ -23,9 +23,6 @@ import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.job.PipelineJob;
import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob;
-import org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
-import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
-import org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
import org.apache.shardingsphere.data.pipeline.yaml.job.YamlConsistencyCheckJobConfigurationSwapper;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
@@ -38,8 +35,6 @@ public final class ConsistencyCheckJob extends AbstractPipelineJob implements Si
private final ConsistencyCheckJobAPI jobAPI = ConsistencyCheckJobAPIFactory.getInstance();
- private final PipelineDistributedBarrier pipelineDistributedBarrier = PipelineDistributedBarrier.getInstance();
-
@Override
public void execute(final ShardingContext shardingContext) {
String checkJobId = shardingContext.getJobName();
@@ -52,7 +47,7 @@ public final class ConsistencyCheckJob extends AbstractPipelineJob implements Si
setJobId(checkJobId);
ConsistencyCheckJobConfiguration jobConfig = new YamlConsistencyCheckJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
ConsistencyCheckJobItemContext jobItemContext = new ConsistencyCheckJobItemContext(jobConfig, shardingItem, JobStatus.RUNNING);
- if (getTasksRunnerMap().containsKey(shardingItem)) {
+ if (containsTaskRunner(shardingItem)) {
log.warn("tasksRunnerMap contains shardingItem {}, ignore", shardingItem);
return;
}
@@ -60,8 +55,7 @@ public final class ConsistencyCheckJob extends AbstractPipelineJob implements Si
jobAPI.cleanJobItemErrorMessage(jobItemContext.getJobId(), jobItemContext.getShardingItem());
ConsistencyCheckTasksRunner tasksRunner = new ConsistencyCheckTasksRunner(jobItemContext);
tasksRunner.start();
- PipelineJobProgressPersistService.addJobProgressPersistContext(checkJobId, shardingContext.getShardingItem());
- getTasksRunnerMap().put(shardingItem, tasksRunner);
+ addTaskRunner(shardingItem, tasksRunner);
}
@Override
@@ -74,12 +68,9 @@ public final class ConsistencyCheckJob extends AbstractPipelineJob implements Si
log.info("stop consistency check job, jobId is null, ignore");
return;
}
- for (PipelineTasksRunner each : getTasksRunnerMap().values()) {
+ for (PipelineTasksRunner each : getTaskRunners()) {
each.stop();
}
- getTasksRunnerMap().clear();
- String jobBarrierDisablePath = PipelineMetaDataNode.getJobBarrierDisablePath(getJobId());
- pipelineDistributedBarrier.persistEphemeralChildrenNode(jobBarrierDisablePath, 0);
- PipelineJobProgressPersistService.removeJobProgressPersistContext(getJobId());
+ clearTaskRunner();
}
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
index 6c7dad4eca7..c2b4ba02466 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
@@ -194,7 +194,7 @@ public final class ConsistencyCheckJobAPIImpl extends AbstractPipelineJobAPIImpl
result.setRemainingSeconds(0L);
} else {
long checkedRecordsCount = Math.min(jobItemProgress.getCheckedRecordsCount(), recordsCount);
- result.setFinishedPercentage((int) (checkedRecordsCount * 100 / recordsCount));
+ result.setFinishedPercentage(0 == recordsCount ? 0 : (int) (checkedRecordsCount * 100 / recordsCount));
JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(checkJobId);
Long stopTimeMillis = jobConfigPOJO.isDisabled() ? Long.parseLong(jobConfigPOJO.getProps().getProperty("stop_time_millis")) : null;
long durationMillis = (null != stopTimeMillis ? stopTimeMillis : System.currentTimeMillis()) - jobItemProgress.getCheckBeginTimeMillis();
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java
index b0429f58cd2..532a9a86520 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java
@@ -18,6 +18,8 @@
package org.apache.shardingsphere.data.pipeline.scenario.migration;
import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
+import org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
import org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper;
import org.apache.shardingsphere.data.pipeline.api.job.JobType;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
@@ -36,12 +38,15 @@ import java.util.concurrent.CompletableFuture;
@Slf4j
public final class MigrationChangedJobConfigurationProcessor implements PipelineChangedJobConfigurationProcessor {
+ private final PipelineDistributedBarrier distributedBarrier = PipelineDistributedBarrier.getInstance();
+
@Override
public void process(final Type eventType, final JobConfigurationPOJO jobConfigPOJO) {
String jobId = jobConfigPOJO.getJobName();
if (jobConfigPOJO.isDisabled()) {
log.info("{} is disabled", jobId);
PipelineJobCenter.stop(jobId);
+ distributedBarrier.persistEphemeralChildrenNode(PipelineMetaDataNode.getJobBarrierDisablePath(jobId), 0);
return;
}
switch (eventType) {
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
index 0792434e09f..7c354ad2b87 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
@@ -27,10 +27,7 @@ import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
import org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
-import org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
-import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
import org.apache.shardingsphere.data.pipeline.core.task.InventoryIncrementalTasksRunner;
-import org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
import org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
@@ -48,8 +45,6 @@ public final class MigrationJob extends AbstractPipelineJob implements SimpleJob
private final PipelineDataSourceManager dataSourceManager = new DefaultPipelineDataSourceManager();
- private final PipelineDistributedBarrier pipelineDistributedBarrier = PipelineDistributedBarrier.getInstance();
-
// Shared by all sharding items
private final MigrationJobPreparer jobPreparer = new MigrationJobPreparer();
@@ -67,7 +62,7 @@ public final class MigrationJob extends AbstractPipelineJob implements SimpleJob
MigrationProcessContext jobProcessContext = jobAPI.buildPipelineProcessContext(jobConfig);
MigrationTaskConfiguration taskConfig = jobAPI.buildTaskConfiguration(jobConfig, shardingItem, jobProcessContext.getPipelineProcessConfig());
MigrationJobItemContext jobItemContext = new MigrationJobItemContext(jobConfig, shardingItem, initProgress, jobProcessContext, taskConfig, dataSourceManager);
- if (getTasksRunnerMap().containsKey(shardingItem)) {
+ if (containsTaskRunner(shardingItem)) {
log.warn("tasksRunnerMap contains shardingItem {}, ignore", shardingItem);
return;
}
@@ -78,9 +73,7 @@ public final class MigrationJob extends AbstractPipelineJob implements SimpleJob
prepare(jobItemContext);
tasksRunner.start();
});
- getTasksRunnerMap().put(shardingItem, tasksRunner);
- PipelineJobProgressPersistService.addJobProgressPersistContext(getJobId(), shardingItem);
- pipelineDistributedBarrier.persistEphemeralChildrenNode(PipelineMetaDataNode.getJobBarrierEnablePath(getJobId()), shardingItem);
+ addTaskRunner(shardingItem, tasksRunner);
}
private void prepare(final MigrationJobItemContext jobItemContext) {
@@ -118,12 +111,9 @@ public final class MigrationJob extends AbstractPipelineJob implements SimpleJob
return;
}
log.info("stop tasks runner, jobId={}", jobId);
- String jobBarrierDisablePath = PipelineMetaDataNode.getJobBarrierDisablePath(jobId);
- for (PipelineTasksRunner each : getTasksRunnerMap().values()) {
+ for (PipelineTasksRunner each : getTaskRunners()) {
each.stop();
- pipelineDistributedBarrier.persistEphemeralChildrenNode(jobBarrierDisablePath, each.getJobItemContext().getShardingItem());
}
- getTasksRunnerMap().clear();
- PipelineJobProgressPersistService.removeJobProgressPersistContext(jobId);
+ clearTaskRunner();
}
}