You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by az...@apache.org on 2023/06/09 07:38:34 UTC
[shardingsphere] branch master updated: Fix execute engine not closed after job stopping (#26212)
This is an automated email from the ASF dual-hosted git repository.
azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 9389d93f340 Fix execute engine not closed after job stopping (#26212)
9389d93f340 is described below
commit 9389d93f340dc71618ed6e4884e15efaa24e09fb
Author: Hongsheng Zhong <zh...@apache.org>
AuthorDate: Fri Jun 9 15:38:24 2023 +0800
Fix execute engine not closed after job stopping (#26212)
* Fix execute engine not closed after job stopping
* Impl getJobProcessContext() for consistency check
* Manage consistency check execute engine in process context
* Shutdown execute engine after all runnable completed
* Block on ConsistencyCheckTasksRunner.start
* Shutdown thread pool for CDC job
* Refactor resources clean
---
.../data/pipeline/cdc/core/job/CDCJob.java | 14 +++++
.../api/context/PipelineProcessContext.java | 2 +-
...AbstractInventoryIncrementalProcessContext.java | 33 +++++++----
.../data/pipeline/core/execute/ExecuteEngine.java | 21 +++++++
.../pipeline/core/job/AbstractPipelineJob.java | 6 +-
.../core/job/AbstractSimplePipelineJob.java | 15 +++++
.../util/PipelineLazyInitializer.java} | 33 +++++++----
.../context/ConsistencyCheckJobItemContext.java | 5 +-
.../context/ConsistencyCheckProcessContext.java | 68 ++++++++++++++++++++++
.../task/ConsistencyCheckTasksRunner.java | 9 ++-
10 files changed, 178 insertions(+), 28 deletions(-)
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
index fbfdfe806a6..3a4e490538b 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
@@ -73,6 +73,14 @@ public final class CDCJob extends AbstractPipelineJob implements SimpleJob {
@Override
public void execute(final ShardingContext shardingContext) {
+ try {
+ execute0(shardingContext);
+ } finally {
+ clean();
+ }
+ }
+
+ private void execute0(final ShardingContext shardingContext) {
String jobId = shardingContext.getJobName();
log.info("Execute job {}", jobId);
CDCJobConfiguration jobConfig = new YamlCDCJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
@@ -100,6 +108,12 @@ public final class CDCJob extends AbstractPipelineJob implements SimpleJob {
executeIncrementalTasks(jobItemContexts);
}
+ private void clean() {
+ for (PipelineTasksRunner each : getTasksRunners()) {
+ CloseUtils.closeQuietly(each.getJobItemContext().getJobProcessContext());
+ }
+ }
+
private CDCJobItemContext buildPipelineJobItemContext(final CDCJobConfiguration jobConfig, final int shardingItem) {
Optional<InventoryIncrementalJobItemProgress> initProgress = jobAPI.getJobItemProgress(jobConfig.getJobId(), shardingItem);
CDCProcessContext jobProcessContext = jobAPI.buildPipelineProcessContext(jobConfig);
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/PipelineProcessContext.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/PipelineProcessContext.java
index 9b41f930bd6..3ba3bafb562 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/PipelineProcessContext.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/PipelineProcessContext.java
@@ -22,7 +22,7 @@ import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProces
/**
* Pipeline process context.
*/
-public interface PipelineProcessContext {
+public interface PipelineProcessContext extends AutoCloseable {
/**
* Get pipeline process config.
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/AbstractInventoryIncrementalProcessContext.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/AbstractInventoryIncrementalProcessContext.java
index e17877a2304..5eb97214c06 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/AbstractInventoryIncrementalProcessContext.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/AbstractInventoryIncrementalProcessContext.java
@@ -20,12 +20,12 @@ package org.apache.shardingsphere.data.pipeline.core.context;
import lombok.Getter;
import lombok.SneakyThrows;
import org.apache.commons.lang3.concurrent.ConcurrentException;
-import org.apache.commons.lang3.concurrent.LazyInitializer;
import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineReadConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineWriteConfiguration;
import org.apache.shardingsphere.data.pipeline.core.config.process.PipelineProcessConfigurationUtils;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
+import org.apache.shardingsphere.data.pipeline.core.util.PipelineLazyInitializer;
import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
import org.apache.shardingsphere.infra.config.algorithm.AlgorithmConfiguration;
@@ -45,11 +45,11 @@ public abstract class AbstractInventoryIncrementalProcessContext implements Inve
private final PipelineChannelCreator pipelineChannelCreator;
- private final LazyInitializer<ExecuteEngine> inventoryDumperExecuteEngineLazyInitializer;
+ private final PipelineLazyInitializer<ExecuteEngine> inventoryDumperExecuteEngineLazyInitializer;
- private final LazyInitializer<ExecuteEngine> inventoryImporterExecuteEngineLazyInitializer;
+ private final PipelineLazyInitializer<ExecuteEngine> inventoryImporterExecuteEngineLazyInitializer;
- private final LazyInitializer<ExecuteEngine> incrementalExecuteEngineLazyInitializer;
+ private final PipelineLazyInitializer<ExecuteEngine> incrementalExecuteEngineLazyInitializer;
protected AbstractInventoryIncrementalProcessContext(final String jobId, final PipelineProcessConfiguration originalProcessConfig) {
PipelineProcessConfiguration processConfig = PipelineProcessConfigurationUtils.convertWithDefaultValue(originalProcessConfig);
@@ -62,24 +62,24 @@ public abstract class AbstractInventoryIncrementalProcessContext implements Inve
writeRateLimitAlgorithm = null == writeRateLimiter ? null : TypedSPILoader.getService(JobRateLimitAlgorithm.class, writeRateLimiter.getType(), writeRateLimiter.getProps());
AlgorithmConfiguration streamChannel = processConfig.getStreamChannel();
pipelineChannelCreator = TypedSPILoader.getService(PipelineChannelCreator.class, streamChannel.getType(), streamChannel.getProps());
- inventoryDumperExecuteEngineLazyInitializer = new LazyInitializer<ExecuteEngine>() {
+ inventoryDumperExecuteEngineLazyInitializer = new PipelineLazyInitializer<ExecuteEngine>() {
@Override
- protected ExecuteEngine initialize() {
+ protected ExecuteEngine doInitialize() {
return ExecuteEngine.newFixedThreadInstance(readConfig.getWorkerThread(), "Inventory-" + jobId);
}
};
- inventoryImporterExecuteEngineLazyInitializer = new LazyInitializer<ExecuteEngine>() {
+ inventoryImporterExecuteEngineLazyInitializer = new PipelineLazyInitializer<ExecuteEngine>() {
@Override
- protected ExecuteEngine initialize() {
+ protected ExecuteEngine doInitialize() {
return ExecuteEngine.newFixedThreadInstance(writeConfig.getWorkerThread(), "Importer-" + jobId);
}
};
- incrementalExecuteEngineLazyInitializer = new LazyInitializer<ExecuteEngine>() {
+ incrementalExecuteEngineLazyInitializer = new PipelineLazyInitializer<ExecuteEngine>() {
@Override
- protected ExecuteEngine initialize() {
+ protected ExecuteEngine doInitialize() {
return ExecuteEngine.newCachedThreadInstance("Incremental-" + jobId);
}
};
@@ -106,4 +106,17 @@ public abstract class AbstractInventoryIncrementalProcessContext implements Inve
public ExecuteEngine getIncrementalExecuteEngine() {
return incrementalExecuteEngineLazyInitializer.get();
}
+
+ @Override
+ public void close() throws Exception {
+ shutdownExecuteEngine(inventoryDumperExecuteEngineLazyInitializer);
+ shutdownExecuteEngine(inventoryImporterExecuteEngineLazyInitializer);
+ shutdownExecuteEngine(incrementalExecuteEngineLazyInitializer);
+ }
+
+ private void shutdownExecuteEngine(final PipelineLazyInitializer<ExecuteEngine> lazyInitializer) throws ConcurrentException {
+ if (lazyInitializer.isInitialized()) {
+ lazyInitializer.get().shutdown();
+ }
+ }
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ExecuteEngine.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ExecuteEngine.java
index 1d50fd4deec..629a4844a9d 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ExecuteEngine.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/ExecuteEngine.java
@@ -86,6 +86,27 @@ public final class ExecuteEngine {
}, executorService);
}
+ /**
+ * Submit a {@code LifecycleExecutor} to execute.
+ *
+ * @param lifecycleExecutor lifecycle executor
+ * @return execute future
+ */
+ public CompletableFuture<?> submit(final LifecycleExecutor lifecycleExecutor) {
+ return CompletableFuture.runAsync(lifecycleExecutor, executorService);
+ }
+
+ /**
+ * Shutdown.
+ */
+ public void shutdown() {
+ if (executorService.isShutdown()) {
+ return;
+ }
+ executorService.shutdown();
+ executorService.shutdownNow();
+ }
+
/**
* Trigger.
*
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
index 6d58c4c8fe7..e2cbaafe8d9 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
@@ -37,6 +37,7 @@ import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
@@ -120,6 +121,10 @@ public abstract class AbstractPipelineJob implements PipelineJob {
return new ArrayList<>(tasksRunnerMap.keySet());
}
+ protected Collection<PipelineTasksRunner> getTasksRunners() {
+ return Collections.unmodifiableCollection(tasksRunnerMap.values());
+ }
+
protected boolean addTasksRunner(final int shardingItem, final PipelineTasksRunner tasksRunner) {
if (null != tasksRunnerMap.putIfAbsent(shardingItem, tasksRunner)) {
log.warn("shardingItem {} tasks runner exists, ignore", shardingItem);
@@ -172,7 +177,6 @@ public abstract class AbstractPipelineJob implements PipelineJob {
}
private void innerClean() {
- tasksRunnerMap.clear();
PipelineJobProgressPersistService.remove(jobId);
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
index 2aa8225d639..3a99f61280f 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.data.pipeline.core.job;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
+import org.apache.shardingsphere.data.pipeline.core.util.CloseUtils;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
@@ -45,6 +46,14 @@ public abstract class AbstractSimplePipelineJob extends AbstractPipelineJob impl
@Override
public void execute(final ShardingContext shardingContext) {
+ try {
+ execute0(shardingContext);
+ } finally {
+ clean();
+ }
+ }
+
+ private void execute0(final ShardingContext shardingContext) {
String jobId = shardingContext.getJobName();
int shardingItem = shardingContext.getShardingItem();
log.info("Execute job {}-{}", jobId, shardingItem);
@@ -62,4 +71,10 @@ public abstract class AbstractSimplePipelineJob extends AbstractPipelineJob impl
log.info("start tasks runner, jobId={}, shardingItem={}", jobId, shardingItem);
tasksRunner.start();
}
+
+ private void clean() {
+ for (PipelineTasksRunner each : getTasksRunners()) {
+ CloseUtils.closeQuietly(each.getJobItemContext().getJobProcessContext());
+ }
+ }
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/PipelineProcessContext.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineLazyInitializer.java
similarity index 50%
copy from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/PipelineProcessContext.java
copy to kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineLazyInitializer.java
index 9b41f930bd6..96b941d37de 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/PipelineProcessContext.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineLazyInitializer.java
@@ -15,19 +15,32 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.api.context;
+package org.apache.shardingsphere.data.pipeline.core.util;
-import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
+import org.apache.commons.lang3.concurrent.ConcurrentException;
+import org.apache.commons.lang3.concurrent.LazyInitializer;
+
+import java.util.concurrent.atomic.AtomicBoolean;
/**
- * Pipeline process context.
+ * Pipeline lazy initializer.
+ *
+ * @param <T> the type of the object managed by this initializer class
*/
-public interface PipelineProcessContext {
+public abstract class PipelineLazyInitializer<T> extends LazyInitializer<T> {
+
+ private final AtomicBoolean initialized = new AtomicBoolean();
+
+ @Override
+ protected final T initialize() throws ConcurrentException {
+ T result = doInitialize();
+ initialized.set(true);
+ return result;
+ }
+
+ public boolean isInitialized() {
+ return initialized.get();
+ }
- /**
- * Get pipeline process config.
- *
- * @return pipeline process config
- */
- PipelineProcessConfiguration getPipelineProcessConfig();
+ protected abstract T doInitialize() throws ConcurrentException;
}
diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContext.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContext.java
index aa937ff48ad..6c312f91193 100644
--- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContext.java
+++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContext.java
@@ -49,6 +49,8 @@ public final class ConsistencyCheckJobItemContext implements PipelineJobItemCont
private final ConsistencyCheckJobItemProgressContext progressContext;
+ private final ConsistencyCheckProcessContext processContext;
+
public ConsistencyCheckJobItemContext(final ConsistencyCheckJobConfiguration jobConfig, final int shardingItem, final JobStatus status, final ConsistencyCheckJobItemProgress jobItemProgress) {
this.jobConfig = jobConfig;
jobId = jobConfig.getJobId();
@@ -59,10 +61,11 @@ public final class ConsistencyCheckJobItemContext implements PipelineJobItemCont
progressContext.getCheckedRecordsCount().set(Optional.ofNullable(jobItemProgress.getCheckedRecordsCount()).orElse(0L));
Optional.ofNullable(jobItemProgress.getTableCheckPositions()).ifPresent(progressContext.getTableCheckPositions()::putAll);
}
+ processContext = new ConsistencyCheckProcessContext(jobId);
}
@Override
public PipelineProcessContext getJobProcessContext() {
- throw new UnsupportedOperationException();
+ return processContext;
}
}
diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckProcessContext.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckProcessContext.java
new file mode 100644
index 00000000000..4f6a81cd7b0
--- /dev/null
+++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckProcessContext.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.context;
+
+import lombok.Getter;
+import lombok.SneakyThrows;
+import org.apache.commons.lang3.concurrent.ConcurrentException;
+import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.context.PipelineProcessContext;
+import org.apache.shardingsphere.data.pipeline.core.config.process.PipelineProcessConfigurationUtils;
+import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
+import org.apache.shardingsphere.data.pipeline.core.util.PipelineLazyInitializer;
+
+/**
+ * Consistency check process context.
+ */
+@Getter
+public final class ConsistencyCheckProcessContext implements PipelineProcessContext {
+
+ private final PipelineLazyInitializer<ExecuteEngine> consistencyCheckExecuteEngineLazyInitializer;
+
+ public ConsistencyCheckProcessContext(final String jobId) {
+ consistencyCheckExecuteEngineLazyInitializer = new PipelineLazyInitializer<ExecuteEngine>() {
+
+ @Override
+ protected ExecuteEngine doInitialize() {
+ return ExecuteEngine.newFixedThreadInstance(1, jobId + "-check");
+ }
+ };
+ }
+
+ @Override
+ public PipelineProcessConfiguration getPipelineProcessConfig() {
+ return PipelineProcessConfigurationUtils.convertWithDefaultValue(null);
+ }
+
+ /**
+ * Get consistency check execute engine.
+ *
+ * @return consistency check execute engine
+ */
+ @SneakyThrows(ConcurrentException.class)
+ public ExecuteEngine getConsistencyCheckExecuteEngine() {
+ return consistencyCheckExecuteEngineLazyInitializer.get();
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (consistencyCheckExecuteEngineLazyInitializer.isInitialized()) {
+ consistencyCheckExecuteEngineLazyInitializer.get().shutdown();
+ }
+ }
+}
diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
index 105d1ce09fb..645433fc36f 100644
--- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
+++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
@@ -39,7 +39,9 @@ import org.apache.shardingsphere.data.pipeline.spi.job.JobType;
import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
import java.sql.SQLException;
+import java.util.Collections;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
/**
@@ -61,8 +63,6 @@ public final class ConsistencyCheckTasksRunner implements PipelineTasksRunner {
private final LifecycleExecutor checkExecutor;
- private final ExecuteCallback checkExecuteCallback;
-
private final AtomicReference<DataConsistencyCalculateAlgorithm> calculateAlgorithm = new AtomicReference<>();
public ConsistencyCheckTasksRunner(final ConsistencyCheckJobItemContext jobItemContext) {
@@ -71,7 +71,6 @@ public final class ConsistencyCheckTasksRunner implements PipelineTasksRunner {
checkJobId = checkJobConfig.getJobId();
parentJobId = checkJobConfig.getParentJobId();
checkExecutor = new CheckLifecycleExecutor();
- checkExecuteCallback = new CheckExecuteCallback();
}
@Override
@@ -80,8 +79,8 @@ public final class ConsistencyCheckTasksRunner implements PipelineTasksRunner {
return;
}
TypedSPILoader.getService(PipelineJobAPI.class, PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getTypeName()).persistJobItemProgress(jobItemContext);
- ExecuteEngine executeEngine = ExecuteEngine.newFixedThreadInstance(1, checkJobId + "-check");
- executeEngine.submit(checkExecutor, checkExecuteCallback);
+ CompletableFuture<?> future = jobItemContext.getProcessContext().getConsistencyCheckExecuteEngine().submit(checkExecutor);
+ ExecuteEngine.trigger(Collections.singletonList(future), new CheckExecuteCallback());
}
@Override