You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by az...@apache.org on 2023/06/09 07:38:34 UTC

[shardingsphere] branch master updated: Fix execute engine not closed after job stopping (#26212)

This is an automated email from the ASF dual-hosted git repository.

azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 9389d93f340 Fix execute engine not closed after job stopping (#26212)
9389d93f340 is described below

commit 9389d93f340dc71618ed6e4884e15efaa24e09fb
Author: Hongsheng Zhong <zh...@apache.org>
AuthorDate: Fri Jun 9 15:38:24 2023 +0800

    Fix execute engine not closed after job stopping (#26212)
    
    * Fix execute engine not closed after job stopping
    
    * Impl getJobProcessContext() for consistency check
    
    * Manage consistency check execute engine in process context
    
    * Shutdown execute engine after all runnable completed
    
    * Block on ConsistencyCheckTasksRunner.start
    
    * Shutdown thread pool for CDC job
    
    * Refactor resources clean
---
 .../data/pipeline/cdc/core/job/CDCJob.java         | 14 +++++
 .../api/context/PipelineProcessContext.java        |  2 +-
 ...AbstractInventoryIncrementalProcessContext.java | 33 +++++++----
 .../data/pipeline/core/execute/ExecuteEngine.java  | 21 +++++++
 .../pipeline/core/job/AbstractPipelineJob.java     |  6 +-
 .../core/job/AbstractSimplePipelineJob.java        | 15 +++++
 .../util/PipelineLazyInitializer.java}             | 33 +++++++----
 .../context/ConsistencyCheckJobItemContext.java    |  5 +-
 .../context/ConsistencyCheckProcessContext.java    | 68 ++++++++++++++++++++++
 .../task/ConsistencyCheckTasksRunner.java          |  9 ++-
 10 files changed, 178 insertions(+), 28 deletions(-)

diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
index fbfdfe806a6..3a4e490538b 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
@@ -73,6 +73,14 @@ public final class CDCJob extends AbstractPipelineJob implements SimpleJob {
     
     @Override
     public void execute(final ShardingContext shardingContext) {
+        try {
+            execute0(shardingContext);
+        } finally {
+            clean();
+        }
+    }
+    
+    private void execute0(final ShardingContext shardingContext) {
         String jobId = shardingContext.getJobName();
         log.info("Execute job {}", jobId);
         CDCJobConfiguration jobConfig = new YamlCDCJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
@@ -100,6 +108,12 @@ public final class CDCJob extends AbstractPipelineJob implements SimpleJob {
         executeIncrementalTasks(jobItemContexts);
     }
     
+    private void clean() {
+        for (PipelineTasksRunner each : getTasksRunners()) {
+            CloseUtils.closeQuietly(each.getJobItemContext().getJobProcessContext());
+        }
+    }
+    
     private CDCJobItemContext buildPipelineJobItemContext(final CDCJobConfiguration jobConfig, final int shardingItem) {
         Optional<InventoryIncrementalJobItemProgress> initProgress = jobAPI.getJobItemProgress(jobConfig.getJobId(), shardingItem);
         CDCProcessContext jobProcessContext = jobAPI.buildPipelineProcessContext(jobConfig);
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/PipelineProcessContext.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/PipelineProcessContext.java
index 9b41f930bd6..3ba3bafb562 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/PipelineProcessContext.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/PipelineProcessContext.java
@@ -22,7 +22,7 @@ import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProces
 /**
  * Pipeline process context.
  */
-public interface PipelineProcessContext {
+public interface PipelineProcessContext extends AutoCloseable {
     
     /**
      * Get pipeline process config.
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/AbstractInventoryIncrementalProcessContext.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/AbstractInventoryIncrementalProcessContext.java
index e17877a2304..5eb97214c06 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/AbstractInventoryIncrementalProcessContext.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/AbstractInventoryIncrementalProcessContext.java
@@ -20,12 +20,12 @@ package org.apache.shardingsphere.data.pipeline.core.context;
 import lombok.Getter;
 import lombok.SneakyThrows;
 import org.apache.commons.lang3.concurrent.ConcurrentException;
-import org.apache.commons.lang3.concurrent.LazyInitializer;
 import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineReadConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineWriteConfiguration;
 import org.apache.shardingsphere.data.pipeline.core.config.process.PipelineProcessConfigurationUtils;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
+import org.apache.shardingsphere.data.pipeline.core.util.PipelineLazyInitializer;
 import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
 import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
 import org.apache.shardingsphere.infra.config.algorithm.AlgorithmConfiguration;
@@ -45,11 +45,11 @@ public abstract class AbstractInventoryIncrementalProcessContext implements Inve
     
     private final PipelineChannelCreator pipelineChannelCreator;
     
-    private final LazyInitializer<ExecuteEngine> inventoryDumperExecuteEngineLazyInitializer;
+    private final PipelineLazyInitializer<ExecuteEngine> inventoryDumperExecuteEngineLazyInitializer;
     
-    private final LazyInitializer<ExecuteEngine> inventoryImporterExecuteEngineLazyInitializer;
+    private final PipelineLazyInitializer<ExecuteEngine> inventoryImporterExecuteEngineLazyInitializer;
     
-    private final LazyInitializer<ExecuteEngine> incrementalExecuteEngineLazyInitializer;
+    private final PipelineLazyInitializer<ExecuteEngine> incrementalExecuteEngineLazyInitializer;
     
     protected AbstractInventoryIncrementalProcessContext(final String jobId, final PipelineProcessConfiguration originalProcessConfig) {
         PipelineProcessConfiguration processConfig = PipelineProcessConfigurationUtils.convertWithDefaultValue(originalProcessConfig);
@@ -62,24 +62,24 @@ public abstract class AbstractInventoryIncrementalProcessContext implements Inve
         writeRateLimitAlgorithm = null == writeRateLimiter ? null : TypedSPILoader.getService(JobRateLimitAlgorithm.class, writeRateLimiter.getType(), writeRateLimiter.getProps());
         AlgorithmConfiguration streamChannel = processConfig.getStreamChannel();
         pipelineChannelCreator = TypedSPILoader.getService(PipelineChannelCreator.class, streamChannel.getType(), streamChannel.getProps());
-        inventoryDumperExecuteEngineLazyInitializer = new LazyInitializer<ExecuteEngine>() {
+        inventoryDumperExecuteEngineLazyInitializer = new PipelineLazyInitializer<ExecuteEngine>() {
             
             @Override
-            protected ExecuteEngine initialize() {
+            protected ExecuteEngine doInitialize() {
                 return ExecuteEngine.newFixedThreadInstance(readConfig.getWorkerThread(), "Inventory-" + jobId);
             }
         };
-        inventoryImporterExecuteEngineLazyInitializer = new LazyInitializer<ExecuteEngine>() {
+        inventoryImporterExecuteEngineLazyInitializer = new PipelineLazyInitializer<ExecuteEngine>() {
             
             @Override
-            protected ExecuteEngine initialize() {
+            protected ExecuteEngine doInitialize() {
                 return ExecuteEngine.newFixedThreadInstance(writeConfig.getWorkerThread(), "Importer-" + jobId);
             }
         };
-        incrementalExecuteEngineLazyInitializer = new LazyInitializer<ExecuteEngine>() {
+        incrementalExecuteEngineLazyInitializer = new PipelineLazyInitializer<ExecuteEngine>() {
             
             @Override
-            protected ExecuteEngine initialize() {
+            protected ExecuteEngine doInitialize() {
                 return ExecuteEngine.newCachedThreadInstance("Incremental-" + jobId);
             }
         };
@@ -106,4 +106,17 @@ public abstract class AbstractInventoryIncrementalProcessContext implements Inve
     public ExecuteEngine getIncrementalExecuteEngine() {
         return incrementalExecuteEngineLazyInitializer.get();
     }
+    
+    @Override
+    public void close() throws Exception {
+        shutdownExecuteEngine(inventoryDumperExecuteEngineLazyInitializer);
+        shutdownExecuteEngine(inventoryImporterExecuteEngineLazyInitializer);
+        shutdownExecuteEngine(incrementalExecuteEngineLazyInitializer);
+    }
+    
+    private void shutdownExecuteEngine(final PipelineLazyInitializer<ExecuteEngine> lazyInitializer) throws ConcurrentException {
+        if (lazyInitializer.isInitialized()) {
+            lazyInitializer.get().shutdown();
+        }
+    }
 }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ExecuteEngine.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ExecuteEngine.java
index 1d50fd4deec..629a4844a9d 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ExecuteEngine.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ExecuteEngine.java
@@ -86,6 +86,27 @@ public final class ExecuteEngine {
         }, executorService);
     }
     
+    /**
+     * Submit a {@code LifecycleExecutor} to execute.
+     *
+     * @param lifecycleExecutor lifecycle executor
+     * @return execute future
+     */
+    public CompletableFuture<?> submit(final LifecycleExecutor lifecycleExecutor) {
+        return CompletableFuture.runAsync(lifecycleExecutor, executorService);
+    }
+    
+    /**
+     * Shutdown.
+     */
+    public void shutdown() {
+        if (executorService.isShutdown()) {
+            return;
+        }
+        executorService.shutdown();
+        executorService.shutdownNow();
+    }
+    
     /**
      * Trigger.
      *
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
index 6d58c4c8fe7..e2cbaafe8d9 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
@@ -37,6 +37,7 @@ import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
@@ -120,6 +121,10 @@ public abstract class AbstractPipelineJob implements PipelineJob {
         return new ArrayList<>(tasksRunnerMap.keySet());
     }
     
+    protected Collection<PipelineTasksRunner> getTasksRunners() {
+        return Collections.unmodifiableCollection(tasksRunnerMap.values());
+    }
+    
     protected boolean addTasksRunner(final int shardingItem, final PipelineTasksRunner tasksRunner) {
         if (null != tasksRunnerMap.putIfAbsent(shardingItem, tasksRunner)) {
             log.warn("shardingItem {} tasks runner exists, ignore", shardingItem);
@@ -172,7 +177,6 @@ public abstract class AbstractPipelineJob implements PipelineJob {
     }
     
     private void innerClean() {
-        tasksRunnerMap.clear();
         PipelineJobProgressPersistService.remove(jobId);
     }
     
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
index 2aa8225d639..3a99f61280f 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.data.pipeline.core.job;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
 import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
+import org.apache.shardingsphere.data.pipeline.core.util.CloseUtils;
 import org.apache.shardingsphere.elasticjob.api.ShardingContext;
 import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
 
@@ -45,6 +46,14 @@ public abstract class AbstractSimplePipelineJob extends AbstractPipelineJob impl
     
     @Override
     public void execute(final ShardingContext shardingContext) {
+        try {
+            execute0(shardingContext);
+        } finally {
+            clean();
+        }
+    }
+    
+    private void execute0(final ShardingContext shardingContext) {
         String jobId = shardingContext.getJobName();
         int shardingItem = shardingContext.getShardingItem();
         log.info("Execute job {}-{}", jobId, shardingItem);
@@ -62,4 +71,10 @@ public abstract class AbstractSimplePipelineJob extends AbstractPipelineJob impl
         log.info("start tasks runner, jobId={}, shardingItem={}", jobId, shardingItem);
         tasksRunner.start();
     }
+    
+    private void clean() {
+        for (PipelineTasksRunner each : getTasksRunners()) {
+            CloseUtils.closeQuietly(each.getJobItemContext().getJobProcessContext());
+        }
+    }
 }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/PipelineProcessContext.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineLazyInitializer.java
similarity index 50%
copy from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/PipelineProcessContext.java
copy to kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineLazyInitializer.java
index 9b41f930bd6..96b941d37de 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/PipelineProcessContext.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineLazyInitializer.java
@@ -15,19 +15,32 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.api.context;
+package org.apache.shardingsphere.data.pipeline.core.util;
 
-import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
+import org.apache.commons.lang3.concurrent.ConcurrentException;
+import org.apache.commons.lang3.concurrent.LazyInitializer;
+
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
- * Pipeline process context.
+ * Pipeline lazy initializer.
+ *
+ * @param <T> the type of the object managed by this initializer class
  */
-public interface PipelineProcessContext {
+public abstract class PipelineLazyInitializer<T> extends LazyInitializer<T> {
+    
+    private final AtomicBoolean initialized = new AtomicBoolean();
+    
+    @Override
+    protected final T initialize() throws ConcurrentException {
+        T result = doInitialize();
+        initialized.set(true);
+        return result;
+    }
+    
+    public boolean isInitialized() {
+        return initialized.get();
+    }
     
-    /**
-     * Get pipeline process config.
-     *
-     * @return pipeline process config
-     */
-    PipelineProcessConfiguration getPipelineProcessConfig();
+    protected abstract T doInitialize() throws ConcurrentException;
 }
diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContext.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContext.java
index aa937ff48ad..6c312f91193 100644
--- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContext.java
+++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContext.java
@@ -49,6 +49,8 @@ public final class ConsistencyCheckJobItemContext implements PipelineJobItemCont
     
     private final ConsistencyCheckJobItemProgressContext progressContext;
     
+    private final ConsistencyCheckProcessContext processContext;
+    
     public ConsistencyCheckJobItemContext(final ConsistencyCheckJobConfiguration jobConfig, final int shardingItem, final JobStatus status, final ConsistencyCheckJobItemProgress jobItemProgress) {
         this.jobConfig = jobConfig;
         jobId = jobConfig.getJobId();
@@ -59,10 +61,11 @@ public final class ConsistencyCheckJobItemContext implements PipelineJobItemCont
             progressContext.getCheckedRecordsCount().set(Optional.ofNullable(jobItemProgress.getCheckedRecordsCount()).orElse(0L));
             Optional.ofNullable(jobItemProgress.getTableCheckPositions()).ifPresent(progressContext.getTableCheckPositions()::putAll);
         }
+        processContext = new ConsistencyCheckProcessContext(jobId);
     }
     
     @Override
     public PipelineProcessContext getJobProcessContext() {
-        throw new UnsupportedOperationException();
+        return processContext;
     }
 }
diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckProcessContext.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckProcessContext.java
new file mode 100644
index 00000000000..4f6a81cd7b0
--- /dev/null
+++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckProcessContext.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.context;
+
+import lombok.Getter;
+import lombok.SneakyThrows;
+import org.apache.commons.lang3.concurrent.ConcurrentException;
+import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.context.PipelineProcessContext;
+import org.apache.shardingsphere.data.pipeline.core.config.process.PipelineProcessConfigurationUtils;
+import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
+import org.apache.shardingsphere.data.pipeline.core.util.PipelineLazyInitializer;
+
+/**
+ * Consistency check process context.
+ */
+@Getter
+public final class ConsistencyCheckProcessContext implements PipelineProcessContext {
+    
+    private final PipelineLazyInitializer<ExecuteEngine> consistencyCheckExecuteEngineLazyInitializer;
+    
+    public ConsistencyCheckProcessContext(final String jobId) {
+        consistencyCheckExecuteEngineLazyInitializer = new PipelineLazyInitializer<ExecuteEngine>() {
+            
+            @Override
+            protected ExecuteEngine doInitialize() {
+                return ExecuteEngine.newFixedThreadInstance(1, jobId + "-check");
+            }
+        };
+    }
+    
+    @Override
+    public PipelineProcessConfiguration getPipelineProcessConfig() {
+        return PipelineProcessConfigurationUtils.convertWithDefaultValue(null);
+    }
+    
+    /**
+     * Get consistency check execute engine.
+     *
+     * @return consistency check execute engine
+     */
+    @SneakyThrows(ConcurrentException.class)
+    public ExecuteEngine getConsistencyCheckExecuteEngine() {
+        return consistencyCheckExecuteEngineLazyInitializer.get();
+    }
+    
+    @Override
+    public void close() throws Exception {
+        if (consistencyCheckExecuteEngineLazyInitializer.isInitialized()) {
+            consistencyCheckExecuteEngineLazyInitializer.get().shutdown();
+        }
+    }
+}
diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
index 105d1ce09fb..645433fc36f 100644
--- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
+++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
@@ -39,7 +39,9 @@ import org.apache.shardingsphere.data.pipeline.spi.job.JobType;
 import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
 
 import java.sql.SQLException;
+import java.util.Collections;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
@@ -61,8 +63,6 @@ public final class ConsistencyCheckTasksRunner implements PipelineTasksRunner {
     
     private final LifecycleExecutor checkExecutor;
     
-    private final ExecuteCallback checkExecuteCallback;
-    
     private final AtomicReference<DataConsistencyCalculateAlgorithm> calculateAlgorithm = new AtomicReference<>();
     
     public ConsistencyCheckTasksRunner(final ConsistencyCheckJobItemContext jobItemContext) {
@@ -71,7 +71,6 @@ public final class ConsistencyCheckTasksRunner implements PipelineTasksRunner {
         checkJobId = checkJobConfig.getJobId();
         parentJobId = checkJobConfig.getParentJobId();
         checkExecutor = new CheckLifecycleExecutor();
-        checkExecuteCallback = new CheckExecuteCallback();
     }
     
     @Override
@@ -80,8 +79,8 @@ public final class ConsistencyCheckTasksRunner implements PipelineTasksRunner {
             return;
         }
         TypedSPILoader.getService(PipelineJobAPI.class, PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getTypeName()).persistJobItemProgress(jobItemContext);
-        ExecuteEngine executeEngine = ExecuteEngine.newFixedThreadInstance(1, checkJobId + "-check");
-        executeEngine.submit(checkExecutor, checkExecuteCallback);
+        CompletableFuture<?> future = jobItemContext.getProcessContext().getConsistencyCheckExecuteEngine().submit(checkExecutor);
+        ExecuteEngine.trigger(Collections.singletonList(future), new CheckExecuteCallback());
     }
     
     @Override