You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2022/11/01 07:26:08 UTC

[shardingsphere] branch master updated: Merge PipelineJobItemAPI into PipelineJobAPI (#21880)

This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new e7050f3d4cc Merge PipelineJobItemAPI into PipelineJobAPI (#21880)
e7050f3d4cc is described below

commit e7050f3d4cc95f6b2e492b93a18d98c130ad1992
Author: Hongsheng Zhong <zh...@apache.org>
AuthorDate: Tue Nov 1 15:26:01 2022 +0800

    Merge PipelineJobItemAPI into PipelineJobAPI (#21880)
---
 .../data/pipeline/core/api/PipelineJobAPI.java     | 30 ++++++-
 .../data/pipeline/core/api/PipelineJobItemAPI.java | 53 ------------
 .../AbstractInventoryIncrementalJobAPIImpl.java    | 60 +++++++++++---
 .../impl/InventoryIncrementalJobItemAPIImpl.java   | 93 ----------------------
 .../core/task/InventoryIncrementalTasksRunner.java | 16 ++--
 .../consistencycheck/ConsistencyCheckJobAPI.java   |  3 +-
 6 files changed, 91 insertions(+), 164 deletions(-)

diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java
index fa31460c16f..1e8913732ad 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java
@@ -22,8 +22,11 @@ import org.apache.shardingsphere.data.pipeline.api.config.PipelineTaskConfigurat
 import org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlPipelineJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
 import org.apache.shardingsphere.data.pipeline.api.context.PipelineProcessContext;
+import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.PipelineJobItemProgress;
 import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
 import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
 
@@ -33,7 +36,7 @@ import java.util.Optional;
  * Pipeline job API.
  */
 @SingletonSPI
-public interface PipelineJobAPI extends PipelineJobPublicAPI, PipelineJobItemAPI, TypedSPI {
+public interface PipelineJobAPI extends PipelineJobPublicAPI, TypedSPI {
     
     /**
      * Marshal pipeline job id.
@@ -84,6 +87,31 @@ public interface PipelineJobAPI extends PipelineJobPublicAPI, PipelineJobItemAPI
      */
     PipelineJobConfiguration getJobConfiguration(String jobId);
     
+    /**
+     * Persist job item progress.
+     *
+     * @param jobItemContext job item context
+     */
+    void persistJobItemProgress(PipelineJobItemContext jobItemContext);
+    
+    /**
+     * Get job item progress.
+     *
+     * @param jobId job id
+     * @param shardingItem sharding item
+     * @return job item progress, may be null
+     */
+    PipelineJobItemProgress getJobItemProgress(String jobId, int shardingItem);
+    
+    /**
+     * Update job item status.
+     *
+     * @param jobId job id
+     * @param shardingItem sharding item
+     * @param status status
+     */
+    void updateJobItemStatus(String jobId, int shardingItem, JobStatus status);
+    
     /**
      * Get job item error message.
      *
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobItemAPI.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobItemAPI.java
deleted file mode 100644
index 1a492b79c88..00000000000
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobItemAPI.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.api;
-
-import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
-import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
-import org.apache.shardingsphere.data.pipeline.api.job.progress.PipelineJobItemProgress;
-
-/**
- * Pipeline job item API.
- */
-public interface PipelineJobItemAPI {
-    
-    /**
-     * Persist job item progress.
-     *
-     * @param jobItemContext job item context
-     */
-    void persistJobItemProgress(PipelineJobItemContext jobItemContext);
-    
-    /**
-     * Get job item progress.
-     *
-     * @param jobId job id
-     * @param shardingItem sharding item
-     * @return job item progress, may be null
-     */
-    PipelineJobItemProgress getJobItemProgress(String jobId, int shardingItem);
-    
-    /**
-     * Update job item status.
-     *
-     * @param jobId job id
-     * @param shardingItem sharding item
-     * @param status status
-     */
-    void updateJobItemStatus(String jobId, int shardingItem, JobStatus status);
-}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
index 55e3ffafea9..c32105d2f14 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.data.pipeline.core.api.impl;
 
+import com.google.common.base.Strings;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.InventoryIncrementalJobPublicAPI;
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
@@ -26,12 +27,21 @@ import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProces
 import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrementalTasksProgress;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemInventoryTasksProgress;
 import org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
+import org.apache.shardingsphere.data.pipeline.api.task.progress.InventoryTaskProgress;
 import org.apache.shardingsphere.data.pipeline.core.api.InventoryIncrementalJobAPI;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyCalculateAlgorithmChooser;
 import org.apache.shardingsphere.data.pipeline.core.config.process.PipelineProcessConfigurationUtil;
+import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalJobItemContext;
 import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalProcessContext;
 import org.apache.shardingsphere.data.pipeline.core.exception.metadata.AlterNotExistProcessConfigurationException;
+import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlInventoryIncrementalJobItemProgress;
+import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlInventoryIncrementalJobItemProgressSwapper;
+import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
+import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
 import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobItemContext;
 import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
 import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithmFactory;
@@ -40,8 +50,10 @@ import org.apache.shardingsphere.data.pipeline.yaml.process.YamlPipelineProcessC
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import org.apache.shardingsphere.infra.database.type.DatabaseTypeFactory;
 import org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -55,11 +67,11 @@ import java.util.stream.IntStream;
 @Slf4j
 public abstract class AbstractInventoryIncrementalJobAPIImpl extends AbstractPipelineJobAPIImpl implements InventoryIncrementalJobAPI, InventoryIncrementalJobPublicAPI {
     
-    private final YamlPipelineProcessConfigurationSwapper swapper = new YamlPipelineProcessConfigurationSwapper();
+    private final YamlPipelineProcessConfigurationSwapper processConfigSwapper = new YamlPipelineProcessConfigurationSwapper();
     
     private final PipelineProcessConfigurationPersistService processConfigPersistService = new PipelineProcessConfigurationPersistService();
     
-    private final InventoryIncrementalJobItemAPIImpl jobItemAPI = new InventoryIncrementalJobItemAPIImpl();
+    private final YamlInventoryIncrementalJobItemProgressSwapper jobItemProgressSwapper = new YamlInventoryIncrementalJobItemProgressSwapper();
     
     protected abstract String getTargetDatabaseType(PipelineJobConfiguration pipelineJobConfig);
     
@@ -80,7 +92,7 @@ public abstract class AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
     private YamlPipelineProcessConfiguration getTargetYamlProcessConfiguration() {
         PipelineProcessConfiguration existingProcessConfig = processConfigPersistService.load(getJobType());
         ShardingSpherePreconditions.checkNotNull(existingProcessConfig, AlterNotExistProcessConfigurationException::new);
-        return swapper.swapToYamlConfiguration(existingProcessConfig);
+        return processConfigSwapper.swapToYamlConfiguration(existingProcessConfig);
     }
     
     @Override
@@ -89,7 +101,7 @@ public abstract class AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
         PipelineProcessConfigurationUtil.verifyConfPath(confPath);
         YamlPipelineProcessConfiguration targetYamlProcessConfig = getTargetYamlProcessConfiguration();
         PipelineProcessConfigurationUtil.setFieldsNullByConfPath(targetYamlProcessConfig, finalConfPath);
-        processConfigPersistService.persist(getJobType(), swapper.swapToObject(targetYamlProcessConfig));
+        processConfigPersistService.persist(getJobType(), processConfigSwapper.swapToObject(targetYamlProcessConfig));
     }
     
     @Override
@@ -120,18 +132,48 @@ public abstract class AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
     }
     
     @Override
-    public InventoryIncrementalJobItemProgress getJobItemProgress(final String jobId, final int shardingItem) {
-        return jobItemAPI.getJobItemProgress(jobId, shardingItem);
+    public void persistJobItemProgress(final PipelineJobItemContext jobItemContext) {
+        InventoryIncrementalJobItemContext context = (InventoryIncrementalJobItemContext) jobItemContext;
+        InventoryIncrementalJobItemProgress jobItemProgress = new InventoryIncrementalJobItemProgress();
+        jobItemProgress.setStatus(jobItemContext.getStatus());
+        jobItemProgress.setSourceDatabaseType(jobItemContext.getJobConfig().getSourceDatabaseType());
+        jobItemProgress.setDataSourceName(jobItemContext.getDataSourceName());
+        jobItemProgress.setIncremental(getIncrementalTasksProgress(context.getIncrementalTasks()));
+        jobItemProgress.setInventory(getInventoryTasksProgress(context.getInventoryTasks()));
+        jobItemProgress.setProcessedRecordsCount(context.getProcessedRecordsCount());
+        jobItemProgress.setInventoryRecordsCount(context.getInventoryRecordsCount());
+        String value = YamlEngine.marshal(jobItemProgressSwapper.swapToYamlConfiguration(jobItemProgress));
+        PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem(), value);
+    }
+    
+    private JobItemIncrementalTasksProgress getIncrementalTasksProgress(final Collection<IncrementalTask> incrementalTasks) {
+        IncrementalTask incrementalTask = incrementalTasks.size() > 0 ? incrementalTasks.iterator().next() : null;
+        return new JobItemIncrementalTasksProgress(null != incrementalTask ? incrementalTask.getTaskProgress() : null);
+    }
+    
+    private JobItemInventoryTasksProgress getInventoryTasksProgress(final Collection<InventoryTask> inventoryTasks) {
+        Map<String, InventoryTaskProgress> inventoryTaskProgressMap = new HashMap<>();
+        for (InventoryTask each : inventoryTasks) {
+            inventoryTaskProgressMap.put(each.getTaskId(), each.getTaskProgress());
+        }
+        return new JobItemInventoryTasksProgress(inventoryTaskProgressMap);
     }
     
     @Override
-    public void persistJobItemProgress(final PipelineJobItemContext jobItemContext) {
-        jobItemAPI.persistJobItemProgress(jobItemContext);
+    public InventoryIncrementalJobItemProgress getJobItemProgress(final String jobId, final int shardingItem) {
+        String data = PipelineAPIFactory.getGovernanceRepositoryAPI().getJobItemProgress(jobId, shardingItem);
+        return Strings.isNullOrEmpty(data) ? null : jobItemProgressSwapper.swapToObject(YamlEngine.unmarshal(data, YamlInventoryIncrementalJobItemProgress.class));
     }
     
     @Override
     public void updateJobItemStatus(final String jobId, final int shardingItem, final JobStatus status) {
-        jobItemAPI.updateJobItemStatus(jobId, shardingItem, status);
+        InventoryIncrementalJobItemProgress jobItemProgress = getJobItemProgress(jobId, shardingItem);
+        if (null == jobItemProgress) {
+            log.warn("updateJobItemStatus, jobItemProgress is null, jobId={}, shardingItem={}", jobId, shardingItem);
+            return;
+        }
+        jobItemProgress.setStatus(status);
+        PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobId, shardingItem, YamlEngine.marshal(jobItemProgressSwapper.swapToYamlConfiguration(jobItemProgress)));
     }
     
     @Override
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/InventoryIncrementalJobItemAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/InventoryIncrementalJobItemAPIImpl.java
deleted file mode 100644
index 01eda47a28c..00000000000
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/InventoryIncrementalJobItemAPIImpl.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.api.impl;
-
-import com.google.common.base.Strings;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
-import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
-import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
-import org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrementalTasksProgress;
-import org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemInventoryTasksProgress;
-import org.apache.shardingsphere.data.pipeline.api.task.progress.InventoryTaskProgress;
-import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
-import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobItemAPI;
-import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalJobItemContext;
-import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlInventoryIncrementalJobItemProgress;
-import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlInventoryIncrementalJobItemProgressSwapper;
-import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
-import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
-import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Inventory incremental job item API implementation.
- */
-@Slf4j
-public final class InventoryIncrementalJobItemAPIImpl implements PipelineJobItemAPI {
-    
-    private static final YamlInventoryIncrementalJobItemProgressSwapper SWAPPER = new YamlInventoryIncrementalJobItemProgressSwapper();
-    
-    @Override
-    public void persistJobItemProgress(final PipelineJobItemContext jobItemContext) {
-        InventoryIncrementalJobItemContext context = (InventoryIncrementalJobItemContext) jobItemContext;
-        InventoryIncrementalJobItemProgress jobItemProgress = new InventoryIncrementalJobItemProgress();
-        jobItemProgress.setStatus(jobItemContext.getStatus());
-        jobItemProgress.setSourceDatabaseType(jobItemContext.getJobConfig().getSourceDatabaseType());
-        jobItemProgress.setDataSourceName(jobItemContext.getDataSourceName());
-        jobItemProgress.setIncremental(getIncrementalTasksProgress(context.getIncrementalTasks()));
-        jobItemProgress.setInventory(getInventoryTasksProgress(context.getInventoryTasks()));
-        jobItemProgress.setProcessedRecordsCount(context.getProcessedRecordsCount());
-        jobItemProgress.setInventoryRecordsCount(context.getInventoryRecordsCount());
-        String value = YamlEngine.marshal(SWAPPER.swapToYamlConfiguration(jobItemProgress));
-        PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem(), value);
-    }
-    
-    private JobItemIncrementalTasksProgress getIncrementalTasksProgress(final Collection<IncrementalTask> incrementalTasks) {
-        IncrementalTask incrementalTask = incrementalTasks.size() > 0 ? incrementalTasks.iterator().next() : null;
-        return new JobItemIncrementalTasksProgress(null != incrementalTask ? incrementalTask.getTaskProgress() : null);
-    }
-    
-    private JobItemInventoryTasksProgress getInventoryTasksProgress(final Collection<InventoryTask> inventoryTasks) {
-        Map<String, InventoryTaskProgress> inventoryTaskProgressMap = new HashMap<>();
-        for (InventoryTask each : inventoryTasks) {
-            inventoryTaskProgressMap.put(each.getTaskId(), each.getTaskProgress());
-        }
-        return new JobItemInventoryTasksProgress(inventoryTaskProgressMap);
-    }
-    
-    @Override
-    public InventoryIncrementalJobItemProgress getJobItemProgress(final String jobId, final int shardingItem) {
-        String data = PipelineAPIFactory.getGovernanceRepositoryAPI().getJobItemProgress(jobId, shardingItem);
-        return Strings.isNullOrEmpty(data) ? null : SWAPPER.swapToObject(YamlEngine.unmarshal(data, YamlInventoryIncrementalJobItemProgress.class));
-    }
-    
-    @Override
-    public void updateJobItemStatus(final String jobId, final int shardingItem, final JobStatus status) {
-        InventoryIncrementalJobItemProgress jobItemProgress = getJobItemProgress(jobId, shardingItem);
-        if (null == jobItemProgress) {
-            log.warn("updateJobItemStatus, jobItemProgress is null, jobId={}, shardingItem={}", jobId, shardingItem);
-            return;
-        }
-        jobItemProgress.setStatus(status);
-        PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobId, shardingItem, YamlEngine.marshal(SWAPPER.swapToYamlConfiguration(jobItemProgress)));
-    }
-}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
index 40047a92600..4c7b7459edf 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
@@ -26,8 +26,6 @@ import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI;
-import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobItemAPI;
-import org.apache.shardingsphere.data.pipeline.core.api.impl.InventoryIncrementalJobItemAPIImpl;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector;
 
@@ -42,8 +40,6 @@ import java.util.concurrent.CompletableFuture;
 @Slf4j
 public final class InventoryIncrementalTasksRunner implements PipelineTasksRunner {
     
-    private final PipelineJobItemAPI jobItemAPI = new InventoryIncrementalJobItemAPIImpl();
-    
     @Getter
     private final PipelineJobItemContext jobItemContext;
     
@@ -51,6 +47,15 @@ public final class InventoryIncrementalTasksRunner implements PipelineTasksRunne
     
     private final Collection<IncrementalTask> incrementalTasks;
     
+    private final PipelineJobAPI jobAPI;
+    
+    public InventoryIncrementalTasksRunner(final PipelineJobItemContext jobItemContext, final Collection<InventoryTask> inventoryTasks, final Collection<IncrementalTask> incrementalTasks) {
+        this.jobItemContext = jobItemContext;
+        this.inventoryTasks = inventoryTasks;
+        this.incrementalTasks = incrementalTasks;
+        jobAPI = PipelineAPIFactory.getPipelineJobAPI(PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()));
+    }
+    
     @Override
     public void stop() {
         jobItemContext.setStopping(true);
@@ -100,7 +105,6 @@ public final class InventoryIncrementalTasksRunner implements PipelineTasksRunne
                 log.error("onFailure, inventory task execute failed.", throwable);
                 updateLocalAndRemoteJobItemStatus(JobStatus.EXECUTE_INVENTORY_TASK_FAILURE);
                 String jobId = jobItemContext.getJobId();
-                PipelineJobAPI jobAPI = PipelineAPIFactory.getPipelineJobAPI(PipelineJobIdUtils.parseJobType(jobId));
                 jobAPI.persistJobItemErrorMessage(jobId, jobItemContext.getShardingItem(), throwable);
                 jobAPI.stop(jobId);
             }
@@ -120,7 +124,7 @@ public final class InventoryIncrementalTasksRunner implements PipelineTasksRunne
     
     private void updateLocalAndRemoteJobItemStatus(final JobStatus jobStatus) {
         jobItemContext.setStatus(jobStatus);
-        jobItemAPI.updateJobItemStatus(jobItemContext.getJobId(), jobItemContext.getShardingItem(), jobStatus);
+        jobAPI.updateJobItemStatus(jobItemContext.getJobId(), jobItemContext.getShardingItem(), jobStatus);
     }
     
     private synchronized void executeIncrementalTask() {
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPI.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPI.java
index d8495582568..3d328d8fffb 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPI.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPI.java
@@ -19,10 +19,9 @@ package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
 
 import org.apache.shardingsphere.data.pipeline.api.ConsistencyCheckJobPublicAPI;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI;
-import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobItemAPI;
 
 /**
  * Consistency check job API.
  */
-public interface ConsistencyCheckJobAPI extends ConsistencyCheckJobPublicAPI, PipelineJobAPI, PipelineJobItemAPI {
+public interface ConsistencyCheckJobAPI extends ConsistencyCheckJobPublicAPI, PipelineJobAPI {
 }