You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/10/26 05:21:46 UTC

[shardingsphere] branch master updated: Improve stop pipeline job, extract method to the upper level (#21750)

This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 32e4b3ab0e2 Improve stop pipeline job, extract method to the upper level (#21750)
32e4b3ab0e2 is described below

commit 32e4b3ab0e2cc3e4fd090491450455a29b3b49d6
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Wed Oct 26 13:21:33 2022 +0800

    Improve stop pipeline job, extract method to the upper level (#21750)
    
    * Improve job sync to stop, extract method to the upper level
    
    * revise comment
    
    * Fix divide zero
    
    * Fix codestyle
    
    * Fix unexpected error
    
    * Improve
---
 .../core/api/impl/AbstractPipelineJobAPIImpl.java  |  2 +-
 .../pipeline/core/job/AbstractPipelineJob.java     | 27 ++++++++++++++++++++++
 .../data/pipeline/core/task/IncrementalTask.java   |  6 +++--
 .../core/util/PipelineDistributedBarrier.java      |  4 ++++
 ...tencyCheckChangedJobConfigurationProcessor.java |  5 ++++
 .../consistencycheck/ConsistencyCheckJob.java      | 17 ++++----------
 .../ConsistencyCheckJobAPIImpl.java                |  2 +-
 .../MigrationChangedJobConfigurationProcessor.java |  5 ++++
 .../pipeline/scenario/migration/MigrationJob.java  | 18 ++++-----------
 9 files changed, 55 insertions(+), 31 deletions(-)

diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
index ff09f119145..c2d64115917 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
@@ -140,7 +140,7 @@ public abstract class AbstractPipelineJobAPIImpl implements PipelineJobAPI {
         jobConfigPOJO.getProps().setProperty("stop_time_millis", System.currentTimeMillis() + "");
         PipelineAPIFactory.getJobConfigurationAPI().updateJobConfiguration(jobConfigPOJO);
         String barrierPath = PipelineMetaDataNode.getJobBarrierDisablePath(jobId);
-        pipelineDistributedBarrier.register(barrierPath, jobConfigPOJO.getShardingTotalCount());
+        pipelineDistributedBarrier.register(barrierPath, 1);
         pipelineDistributedBarrier.await(barrierPath, 5, TimeUnit.SECONDS);
     }
     
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
index c29e4310a55..3b8d5c6f628 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
@@ -17,12 +17,17 @@
 
 package org.apache.shardingsphere.data.pipeline.core.job;
 
+import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.Setter;
 import org.apache.shardingsphere.data.pipeline.api.job.PipelineJob;
 import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
+import org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
+import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
+import org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
 import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.JobBootstrap;
 
+import java.util.Collection;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
@@ -42,8 +47,11 @@ public abstract class AbstractPipelineJob implements PipelineJob {
     @Setter
     private volatile JobBootstrap jobBootstrap;
     
+    @Getter(value = AccessLevel.PRIVATE)
     private final Map<Integer, PipelineTasksRunner> tasksRunnerMap = new ConcurrentHashMap<>();
     
+    private final PipelineDistributedBarrier distributedBarrier = PipelineDistributedBarrier.getInstance();
+    
     protected void runInBackground(final Runnable runnable) {
         new Thread(runnable).start();
     }
@@ -52,4 +60,23 @@ public abstract class AbstractPipelineJob implements PipelineJob {
     public Optional<PipelineTasksRunner> getTasksRunner(final int shardingItem) {
         return Optional.ofNullable(tasksRunnerMap.get(shardingItem));
     }
+    
+    protected void addTaskRunner(final int shardingItem, final PipelineTasksRunner tasksRunner) {
+        tasksRunnerMap.put(shardingItem, tasksRunner);
+        PipelineJobProgressPersistService.addJobProgressPersistContext(getJobId(), shardingItem);
+        distributedBarrier.persistEphemeralChildrenNode(PipelineMetaDataNode.getJobBarrierEnablePath(getJobId()), shardingItem);
+    }
+    
+    protected boolean containsTaskRunner(final int shardingItem) {
+        return tasksRunnerMap.containsKey(shardingItem);
+    }
+    
+    protected void clearTaskRunner() {
+        tasksRunnerMap.clear();
+        PipelineJobProgressPersistService.removeJobProgressPersistContext(jobId);
+    }
+    
+    protected Collection<PipelineTasksRunner> getTaskRunners() {
+        return tasksRunnerMap.values();
+    }
 }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
index 98de46f4a8a..c30f072def6 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
@@ -42,6 +42,7 @@ import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDump
 
 import java.util.Collection;
 import java.util.LinkedList;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -82,8 +83,9 @@ public final class IncrementalTask implements PipelineTask, AutoCloseable {
     private IncrementalTaskProgress createIncrementalTaskProgress(final IngestPosition<?> position, final InventoryIncrementalJobItemProgress jobItemProgress) {
         IncrementalTaskProgress incrementalTaskProgress = new IncrementalTaskProgress();
         incrementalTaskProgress.setPosition(position);
-        if (null != jobItemProgress) {
-            incrementalTaskProgress.setIncrementalTaskDelay(jobItemProgress.getIncremental().getIncrementalTaskProgress().getIncrementalTaskDelay());
+        if (null != jobItemProgress && null != jobItemProgress.getIncremental()) {
+            Optional.ofNullable(jobItemProgress.getIncremental().getIncrementalTaskProgress())
+                    .ifPresent(optional -> incrementalTaskProgress.setIncrementalTaskDelay(jobItemProgress.getIncremental().getIncrementalTaskProgress().getIncrementalTaskDelay()));
         }
         return incrementalTaskProgress;
     }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrier.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrier.java
index 0e74180f818..c856a565e7c 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrier.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrier.java
@@ -84,6 +84,10 @@ public final class PipelineDistributedBarrier {
      * @param shardingItem sharding item
      */
     public void persistEphemeralChildrenNode(final String parentPath, final int shardingItem) {
+        if (!getRepository().isExisted(parentPath)) {
+            log.info("parent path {} not exist, ignore", parentPath);
+            return;
+        }
         String key = String.join("/", parentPath, Integer.toString(shardingItem));
         getRepository().delete(key);
         getRepository().persistEphemeral(key, "");
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckChangedJobConfigurationProcessor.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckChangedJobConfigurationProcessor.java
index 406fe64310c..e2e5564fa34 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckChangedJobConfigurationProcessor.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckChangedJobConfigurationProcessor.java
@@ -22,7 +22,9 @@ import org.apache.shardingsphere.data.pipeline.api.job.JobType;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
+import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
 import org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineChangedJobConfigurationProcessor;
+import org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
 import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
@@ -35,12 +37,15 @@ import java.util.concurrent.CompletableFuture;
 @Slf4j
 public final class ConsistencyCheckChangedJobConfigurationProcessor implements PipelineChangedJobConfigurationProcessor {
     
+    private final PipelineDistributedBarrier distributedBarrier = PipelineDistributedBarrier.getInstance();
+    
     @Override
     public void process(final DataChangedEvent.Type eventType, final JobConfigurationPOJO jobConfigPOJO) {
         String jobId = jobConfigPOJO.getJobName();
         if (jobConfigPOJO.isDisabled()) {
             log.info("{} is disabled", jobId);
             PipelineJobCenter.stop(jobId);
+            distributedBarrier.persistEphemeralChildrenNode(PipelineMetaDataNode.getJobBarrierDisablePath(jobId), 0);
             return;
         }
         switch (eventType) {
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
index 5cc1b0a5066..7012b4690f7 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
@@ -23,9 +23,6 @@ import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.api.job.PipelineJob;
 import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
 import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob;
-import org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
-import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
-import org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
 import org.apache.shardingsphere.data.pipeline.yaml.job.YamlConsistencyCheckJobConfigurationSwapper;
 import org.apache.shardingsphere.elasticjob.api.ShardingContext;
 import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
@@ -38,8 +35,6 @@ public final class ConsistencyCheckJob extends AbstractPipelineJob implements Si
     
     private final ConsistencyCheckJobAPI jobAPI = ConsistencyCheckJobAPIFactory.getInstance();
     
-    private final PipelineDistributedBarrier pipelineDistributedBarrier = PipelineDistributedBarrier.getInstance();
-    
     @Override
     public void execute(final ShardingContext shardingContext) {
         String checkJobId = shardingContext.getJobName();
@@ -52,7 +47,7 @@ public final class ConsistencyCheckJob extends AbstractPipelineJob implements Si
         setJobId(checkJobId);
         ConsistencyCheckJobConfiguration jobConfig = new YamlConsistencyCheckJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
         ConsistencyCheckJobItemContext jobItemContext = new ConsistencyCheckJobItemContext(jobConfig, shardingItem, JobStatus.RUNNING);
-        if (getTasksRunnerMap().containsKey(shardingItem)) {
+        if (containsTaskRunner(shardingItem)) {
             log.warn("tasksRunnerMap contains shardingItem {}, ignore", shardingItem);
             return;
         }
@@ -60,8 +55,7 @@ public final class ConsistencyCheckJob extends AbstractPipelineJob implements Si
         jobAPI.cleanJobItemErrorMessage(jobItemContext.getJobId(), jobItemContext.getShardingItem());
         ConsistencyCheckTasksRunner tasksRunner = new ConsistencyCheckTasksRunner(jobItemContext);
         tasksRunner.start();
-        PipelineJobProgressPersistService.addJobProgressPersistContext(checkJobId, shardingContext.getShardingItem());
-        getTasksRunnerMap().put(shardingItem, tasksRunner);
+        addTaskRunner(shardingItem, tasksRunner);
     }
     
     @Override
@@ -74,12 +68,9 @@ public final class ConsistencyCheckJob extends AbstractPipelineJob implements Si
             log.info("stop consistency check job, jobId is null, ignore");
             return;
         }
-        for (PipelineTasksRunner each : getTasksRunnerMap().values()) {
+        for (PipelineTasksRunner each : getTaskRunners()) {
             each.stop();
         }
-        getTasksRunnerMap().clear();
-        String jobBarrierDisablePath = PipelineMetaDataNode.getJobBarrierDisablePath(getJobId());
-        pipelineDistributedBarrier.persistEphemeralChildrenNode(jobBarrierDisablePath, 0);
-        PipelineJobProgressPersistService.removeJobProgressPersistContext(getJobId());
+        clearTaskRunner();
     }
 }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
index 6c7dad4eca7..c2b4ba02466 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
@@ -194,7 +194,7 @@ public final class ConsistencyCheckJobAPIImpl extends AbstractPipelineJobAPIImpl
             result.setRemainingSeconds(0L);
         } else {
             long checkedRecordsCount = Math.min(jobItemProgress.getCheckedRecordsCount(), recordsCount);
-            result.setFinishedPercentage((int) (checkedRecordsCount * 100 / recordsCount));
+            result.setFinishedPercentage(0 == recordsCount ? 0 : (int) (checkedRecordsCount * 100 / recordsCount));
             JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(checkJobId);
             Long stopTimeMillis = jobConfigPOJO.isDisabled() ? Long.parseLong(jobConfigPOJO.getProps().getProperty("stop_time_millis")) : null;
             long durationMillis = (null != stopTimeMillis ? stopTimeMillis : System.currentTimeMillis()) - jobItemProgress.getCheckBeginTimeMillis();
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java
index b0429f58cd2..532a9a86520 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java
@@ -18,6 +18,8 @@
 package org.apache.shardingsphere.data.pipeline.scenario.migration;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
+import org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
 import org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper;
 import org.apache.shardingsphere.data.pipeline.api.job.JobType;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
@@ -36,12 +38,15 @@ import java.util.concurrent.CompletableFuture;
 @Slf4j
 public final class MigrationChangedJobConfigurationProcessor implements PipelineChangedJobConfigurationProcessor {
     
+    private final PipelineDistributedBarrier distributedBarrier = PipelineDistributedBarrier.getInstance();
+    
     @Override
     public void process(final Type eventType, final JobConfigurationPOJO jobConfigPOJO) {
         String jobId = jobConfigPOJO.getJobName();
         if (jobConfigPOJO.isDisabled()) {
             log.info("{} is disabled", jobId);
             PipelineJobCenter.stop(jobId);
+            distributedBarrier.persistEphemeralChildrenNode(PipelineMetaDataNode.getJobBarrierDisablePath(jobId), 0);
             return;
         }
         switch (eventType) {
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
index 0792434e09f..7c354ad2b87 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
@@ -27,10 +27,7 @@ import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
 import org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
-import org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
-import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
 import org.apache.shardingsphere.data.pipeline.core.task.InventoryIncrementalTasksRunner;
-import org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
 import org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper;
 import org.apache.shardingsphere.elasticjob.api.ShardingContext;
 import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
@@ -48,8 +45,6 @@ public final class MigrationJob extends AbstractPipelineJob implements SimpleJob
     
     private final PipelineDataSourceManager dataSourceManager = new DefaultPipelineDataSourceManager();
     
-    private final PipelineDistributedBarrier pipelineDistributedBarrier = PipelineDistributedBarrier.getInstance();
-    
     // Shared by all sharding items
     private final MigrationJobPreparer jobPreparer = new MigrationJobPreparer();
     
@@ -67,7 +62,7 @@ public final class MigrationJob extends AbstractPipelineJob implements SimpleJob
         MigrationProcessContext jobProcessContext = jobAPI.buildPipelineProcessContext(jobConfig);
         MigrationTaskConfiguration taskConfig = jobAPI.buildTaskConfiguration(jobConfig, shardingItem, jobProcessContext.getPipelineProcessConfig());
         MigrationJobItemContext jobItemContext = new MigrationJobItemContext(jobConfig, shardingItem, initProgress, jobProcessContext, taskConfig, dataSourceManager);
-        if (getTasksRunnerMap().containsKey(shardingItem)) {
+        if (containsTaskRunner(shardingItem)) {
             log.warn("tasksRunnerMap contains shardingItem {}, ignore", shardingItem);
             return;
         }
@@ -78,9 +73,7 @@ public final class MigrationJob extends AbstractPipelineJob implements SimpleJob
             prepare(jobItemContext);
             tasksRunner.start();
         });
-        getTasksRunnerMap().put(shardingItem, tasksRunner);
-        PipelineJobProgressPersistService.addJobProgressPersistContext(getJobId(), shardingItem);
-        pipelineDistributedBarrier.persistEphemeralChildrenNode(PipelineMetaDataNode.getJobBarrierEnablePath(getJobId()), shardingItem);
+        addTaskRunner(shardingItem, tasksRunner);
     }
     
     private void prepare(final MigrationJobItemContext jobItemContext) {
@@ -118,12 +111,9 @@ public final class MigrationJob extends AbstractPipelineJob implements SimpleJob
             return;
         }
         log.info("stop tasks runner, jobId={}", jobId);
-        String jobBarrierDisablePath = PipelineMetaDataNode.getJobBarrierDisablePath(jobId);
-        for (PipelineTasksRunner each : getTasksRunnerMap().values()) {
+        for (PipelineTasksRunner each : getTaskRunners()) {
             each.stop();
-            pipelineDistributedBarrier.persistEphemeralChildrenNode(jobBarrierDisablePath, each.getJobItemContext().getShardingItem());
         }
-        getTasksRunnerMap().clear();
-        PipelineJobProgressPersistService.removeJobProgressPersistContext(jobId);
+        clearTaskRunner();
     }
 }