You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2023/05/31 05:52:46 UTC

[shardingsphere] branch master updated: Use interface PipelineTask instead of implementation as possible (#25955)

This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 2932943a6da Use interface PipelineTask instead of implementation as possible (#25955)
2932943a6da is described below

commit 2932943a6da367fadadac0fb9f60e197ca066168
Author: 吴伟杰 <wu...@apache.org>
AuthorDate: Wed May 31 13:52:29 2023 +0800

    Use interface PipelineTask instead of implementation as possible (#25955)
---
 .../cdc/context/job/CDCJobItemContext.java         |  7 +++----
 .../pipeline/cdc/core/task/CDCTasksRunner.java     |  5 ++---
 .../api/task/progress/IncrementalTaskProgress.java |  6 +-----
 .../pipeline/api/task/progress/TaskProgress.java   |  9 ++++++++
 .../AbstractInventoryIncrementalJobAPIImpl.java    | 14 ++++++-------
 .../InventoryIncrementalJobItemContext.java        |  7 +++----
 .../job/progress/PipelineJobProgressDetector.java  |  7 ++++---
 .../data/pipeline/core/task/IncrementalTask.java   |  2 +-
 .../core/task/InventoryIncrementalTasksRunner.java | 14 ++++++-------
 .../data/pipeline/core/task/InventoryTask.java     |  2 +-
 .../data/pipeline/core/task/PipelineTask.java      |  5 +++++
 .../migration/context/MigrationJobItemContext.java |  7 +++----
 .../FixtureInventoryIncrementalJobItemContext.java |  7 +++----
 .../data/pipeline/core/task/InventoryTaskTest.java | 24 ++++++++++------------
 14 files changed, 60 insertions(+), 56 deletions(-)

diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/job/CDCJobItemContext.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/job/CDCJobItemContext.java
index e5111de8626..f42ef50242b 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/job/CDCJobItemContext.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/job/CDCJobItemContext.java
@@ -34,8 +34,7 @@ import org.apache.shardingsphere.data.pipeline.cdc.context.CDCProcessContext;
 import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalJobItemContext;
 import org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
 import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
-import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
-import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
+import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
 import org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
 
 import java.util.Collection;
@@ -68,9 +67,9 @@ public final class CDCJobItemContext implements InventoryIncrementalJobItemConte
     
     private final ImporterConnector importerConnector;
     
-    private final Collection<InventoryTask> inventoryTasks = new LinkedList<>();
+    private final Collection<PipelineTask> inventoryTasks = new LinkedList<>();
     
-    private final Collection<IncrementalTask> incrementalTasks = new LinkedList<>();
+    private final Collection<PipelineTask> incrementalTasks = new LinkedList<>();
     
     private final AtomicLong processedRecordsCount = new AtomicLong(0);
     
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCTasksRunner.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCTasksRunner.java
index 1497458c92d..9f78ddf90b8 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCTasksRunner.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCTasksRunner.java
@@ -18,9 +18,8 @@
 package org.apache.shardingsphere.data.pipeline.cdc.core.task;
 
 import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
-import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
 import org.apache.shardingsphere.data.pipeline.core.task.InventoryIncrementalTasksRunner;
-import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
+import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
 
 import java.util.Collection;
 
@@ -29,7 +28,7 @@ import java.util.Collection;
  */
 public final class CDCTasksRunner extends InventoryIncrementalTasksRunner {
     
-    public CDCTasksRunner(final PipelineJobItemContext jobItemContext, final Collection<InventoryTask> inventoryTasks, final Collection<IncrementalTask> incrementalTasks) {
+    public CDCTasksRunner(final PipelineJobItemContext jobItemContext, final Collection<PipelineTask> inventoryTasks, final Collection<PipelineTask> incrementalTasks) {
         super(jobItemContext, inventoryTasks, incrementalTasks);
     }
     
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/task/progress/IncrementalTaskProgress.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/task/progress/IncrementalTaskProgress.java
index d50e5755150..04870659724 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/task/progress/IncrementalTaskProgress.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/task/progress/IncrementalTaskProgress.java
@@ -35,11 +35,7 @@ public final class IncrementalTaskProgress implements TaskProgress {
         incrementalTaskDelay.set(new IncrementalTaskDelay());
     }
     
-    /**
-     * Get position.
-     * 
-     * @return position
-     */
+    @Override
     public IngestPosition getPosition() {
         return position.get();
     }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/task/progress/TaskProgress.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/task/progress/TaskProgress.java
index aec203b033d..957c03a8d9a 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/task/progress/TaskProgress.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/task/progress/TaskProgress.java
@@ -17,8 +17,17 @@
 
 package org.apache.shardingsphere.data.pipeline.api.task.progress;
 
+import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
+
 /**
  * Task progress.
  */
 public interface TaskProgress {
+    
+    /**
+     * Get position.
+     *
+     * @return position
+     */
+    IngestPosition getPosition();
 }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
index cb65c7538b0..73ff026a3ae 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
@@ -33,6 +33,7 @@ import org.apache.shardingsphere.data.pipeline.api.job.progress.JobOffsetInfo;
 import org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
 import org.apache.shardingsphere.data.pipeline.api.pojo.InventoryIncrementalJobItemInfo;
 import org.apache.shardingsphere.data.pipeline.api.pojo.TableBasedPipelineJobInfo;
+import org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskProgress;
 import org.apache.shardingsphere.data.pipeline.api.task.progress.InventoryTaskProgress;
 import org.apache.shardingsphere.data.pipeline.core.api.InventoryIncrementalJobAPI;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
@@ -47,8 +48,7 @@ import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlInvent
 import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlInventoryIncrementalJobItemProgressSwapper;
 import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlJobOffsetInfo;
 import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlJobOffsetInfoSwapper;
-import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
-import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
+import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
 import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
@@ -158,14 +158,14 @@ public abstract class AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
         PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).persistJobItemProgress(jobId, context.getShardingItem(), value);
     }
     
-    private JobItemIncrementalTasksProgress getIncrementalTasksProgress(final Collection<IncrementalTask> incrementalTasks) {
-        return new JobItemIncrementalTasksProgress(incrementalTasks.isEmpty() ? null : incrementalTasks.iterator().next().getTaskProgress());
+    private JobItemIncrementalTasksProgress getIncrementalTasksProgress(final Collection<PipelineTask> incrementalTasks) {
+        return new JobItemIncrementalTasksProgress(incrementalTasks.isEmpty() ? null : (IncrementalTaskProgress) incrementalTasks.iterator().next().getTaskProgress());
     }
     
-    private JobItemInventoryTasksProgress getInventoryTasksProgress(final Collection<InventoryTask> inventoryTasks) {
+    private JobItemInventoryTasksProgress getInventoryTasksProgress(final Collection<PipelineTask> inventoryTasks) {
         Map<String, InventoryTaskProgress> inventoryTaskProgressMap = new HashMap<>();
-        for (InventoryTask each : inventoryTasks) {
-            inventoryTaskProgressMap.put(each.getTaskId(), each.getTaskProgress());
+        for (PipelineTask each : inventoryTasks) {
+            inventoryTaskProgressMap.put(each.getTaskId(), (InventoryTaskProgress) each.getTaskProgress());
         }
         return new JobItemInventoryTasksProgress(inventoryTaskProgressMap);
     }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/InventoryIncrementalJobItemContext.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/InventoryIncrementalJobItemContext.java
index fb9361baff4..c77a33219b2 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/InventoryIncrementalJobItemContext.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/InventoryIncrementalJobItemContext.java
@@ -21,8 +21,7 @@ import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContex
 import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
 import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
-import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
-import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
+import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
 import org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
 
 import java.util.Collection;
@@ -40,14 +39,14 @@ public interface InventoryIncrementalJobItemContext extends PipelineJobItemConte
      *
      * @return inventory tasks
      */
-    Collection<InventoryTask> getInventoryTasks();
+    Collection<PipelineTask> getInventoryTasks();
     
     /**
      * Get incremental tasks.
      *
      * @return incremental tasks
      */
-    Collection<IncrementalTask> getIncrementalTasks();
+    Collection<PipelineTask> getIncrementalTasks();
     
     /**
      * Get init progress.
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/PipelineJobProgressDetector.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/PipelineJobProgressDetector.java
index f583b736789..d018ddf09ed 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/PipelineJobProgressDetector.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/PipelineJobProgressDetector.java
@@ -21,7 +21,8 @@ import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition;
-import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
+import org.apache.shardingsphere.data.pipeline.api.task.progress.InventoryTaskProgress;
+import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
 
 import java.util.Collection;
 
@@ -38,10 +39,10 @@ public final class PipelineJobProgressDetector {
      * @param inventoryTasks to check inventory tasks
      * @return is finished
      */
-    public static boolean allInventoryTasksFinished(final Collection<InventoryTask> inventoryTasks) {
+    public static boolean allInventoryTasksFinished(final Collection<PipelineTask> inventoryTasks) {
         if (inventoryTasks.isEmpty()) {
             log.warn("inventoryTasks is empty");
         }
-        return inventoryTasks.stream().allMatch(each -> each.getTaskProgress().getPosition() instanceof FinishedPosition);
+        return inventoryTasks.stream().allMatch(each -> ((InventoryTaskProgress) each.getTaskProgress()).getPosition() instanceof FinishedPosition);
     }
 }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
index 75426260d90..e028e86a492 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
@@ -54,7 +54,7 @@ import java.util.concurrent.CompletableFuture;
  */
 @Slf4j
 @ToString(exclude = {"incrementalExecuteEngine", "channel", "dumper", "importers", "taskProgress"})
-public final class IncrementalTask implements PipelineTask, AutoCloseable {
+public final class IncrementalTask implements PipelineTask {
     
     @Getter
     private final String taskId;
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
index 13432217b60..e5dc2fceb3b 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
@@ -45,13 +45,13 @@ public class InventoryIncrementalTasksRunner implements PipelineTasksRunner {
     @Getter
     private final PipelineJobItemContext jobItemContext;
     
-    private final Collection<InventoryTask> inventoryTasks;
+    private final Collection<PipelineTask> inventoryTasks;
     
-    private final Collection<IncrementalTask> incrementalTasks;
+    private final Collection<PipelineTask> incrementalTasks;
     
     private final PipelineJobAPI jobAPI;
     
-    public InventoryIncrementalTasksRunner(final PipelineJobItemContext jobItemContext, final Collection<InventoryTask> inventoryTasks, final Collection<IncrementalTask> incrementalTasks) {
+    public InventoryIncrementalTasksRunner(final PipelineJobItemContext jobItemContext, final Collection<PipelineTask> inventoryTasks, final Collection<PipelineTask> incrementalTasks) {
         this.jobItemContext = jobItemContext;
         this.inventoryTasks = inventoryTasks;
         this.incrementalTasks = incrementalTasks;
@@ -61,11 +61,11 @@ public class InventoryIncrementalTasksRunner implements PipelineTasksRunner {
     @Override
     public void stop() {
         jobItemContext.setStopping(true);
-        for (InventoryTask each : inventoryTasks) {
+        for (PipelineTask each : inventoryTasks) {
             each.stop();
             each.close();
         }
-        for (IncrementalTask each : incrementalTasks) {
+        for (PipelineTask each : incrementalTasks) {
             each.stop();
             each.close();
         }
@@ -88,7 +88,7 @@ public class InventoryIncrementalTasksRunner implements PipelineTasksRunner {
     private synchronized void executeInventoryTask() {
         updateLocalAndRemoteJobItemStatus(JobStatus.EXECUTE_INVENTORY_TASK);
         Collection<CompletableFuture<?>> futures = new LinkedList<>();
-        for (InventoryTask each : inventoryTasks) {
+        for (PipelineTask each : inventoryTasks) {
             if (each.getTaskProgress().getPosition() instanceof FinishedPosition) {
                 continue;
             }
@@ -113,7 +113,7 @@ public class InventoryIncrementalTasksRunner implements PipelineTasksRunner {
         }
         updateLocalAndRemoteJobItemStatus(JobStatus.EXECUTE_INCREMENTAL_TASK);
         Collection<CompletableFuture<?>> futures = new LinkedList<>();
-        for (IncrementalTask each : incrementalTasks) {
+        for (PipelineTask each : incrementalTasks) {
             if (each.getTaskProgress().getPosition() instanceof FinishedPosition) {
                 continue;
             }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
index 852230ed3c0..4adad77d972 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
@@ -50,7 +50,7 @@ import java.util.concurrent.atomic.AtomicReference;
  */
 @ToString(exclude = {"inventoryDumperExecuteEngine", "inventoryImporterExecuteEngine", "channel", "dumper", "importer"})
 @Slf4j
-public final class InventoryTask implements PipelineTask, AutoCloseable {
+public final class InventoryTask implements PipelineTask {
     
     @Getter
     private final String taskId;
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTask.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTask.java
index 98690873d30..1b090ee9b2a 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTask.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTask.java
@@ -52,4 +52,9 @@ public interface PipelineTask {
      * @return task progress
      */
     TaskProgress getTaskProgress();
+    
+    /**
+     * Close.
+     */
+    void close();
 }
diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java
index 0cfc9de76b6..2643b37ddd4 100644
--- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java
+++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java
@@ -32,8 +32,7 @@ import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncremental
 import org.apache.shardingsphere.data.pipeline.core.importer.connector.DataSourceImporterConnector;
 import org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
 import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
-import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
-import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
+import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
 import org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
@@ -63,9 +62,9 @@ public final class MigrationJobItemContext implements InventoryIncrementalJobIte
     
     private final MigrationTaskConfiguration taskConfig;
     
-    private final Collection<InventoryTask> inventoryTasks = new LinkedList<>();
+    private final Collection<PipelineTask> inventoryTasks = new LinkedList<>();
     
-    private final Collection<IncrementalTask> incrementalTasks = new LinkedList<>();
+    private final Collection<PipelineTask> incrementalTasks = new LinkedList<>();
     
     private final AtomicLong processedRecordsCount = new AtomicLong(0);
     
diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureInventoryIncrementalJobItemContext.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureInventoryIncrementalJobItemContext.java
index cb30e1c817a..e344c3123e9 100644
--- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureInventoryIncrementalJobItemContext.java
+++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureInventoryIncrementalJobItemContext.java
@@ -24,8 +24,7 @@ import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.Pipelin
 import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalJobItemContext;
 import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalProcessContext;
-import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
-import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
+import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
 import org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
 
 import java.util.Collection;
@@ -42,12 +41,12 @@ public final class FixtureInventoryIncrementalJobItemContext implements Inventor
     }
     
     @Override
-    public Collection<InventoryTask> getInventoryTasks() {
+    public Collection<PipelineTask> getInventoryTasks() {
         return null;
     }
     
     @Override
-    public Collection<IncrementalTask> getIncrementalTasks() {
+    public Collection<PipelineTask> getIncrementalTasks() {
         return null;
     }
     
diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/InventoryTaskTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/InventoryTaskTest.java
index 0de03d0f2fd..8aeb66607a5 100644
--- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/InventoryTaskTest.java
+++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/InventoryTaskTest.java
@@ -75,12 +75,11 @@ class InventoryTaskTest {
         InventoryDumperConfiguration inventoryDumperConfig = createInventoryDumperConfiguration("t_non_exist", "t_non_exist");
         PipelineDataSourceWrapper dataSource = DATA_SOURCE_MANAGER.getDataSource(inventoryDumperConfig.getDataSourceConfig());
         PipelineTableMetaDataLoader metaDataLoader = new StandardPipelineTableMetaDataLoader(dataSource);
-        try (
-                InventoryTask inventoryTask = new InventoryTask(inventoryDumperConfig, taskConfig.getImporterConfig(),
-                        PipelineContextUtils.getPipelineChannelCreator(), new FixtureImporterConnector(), dataSource,
-                        metaDataLoader, PipelineContextUtils.getExecuteEngine(), PipelineContextUtils.getExecuteEngine(), new FixtureInventoryIncrementalJobItemContext())) {
-            assertThrows(ExecutionException.class, () -> CompletableFuture.allOf(inventoryTask.start().toArray(new CompletableFuture[0])).get(10L, TimeUnit.SECONDS));
-        }
+        InventoryTask inventoryTask = new InventoryTask(inventoryDumperConfig, taskConfig.getImporterConfig(),
+                PipelineContextUtils.getPipelineChannelCreator(), new FixtureImporterConnector(), dataSource,
+                metaDataLoader, PipelineContextUtils.getExecuteEngine(), PipelineContextUtils.getExecuteEngine(), new FixtureInventoryIncrementalJobItemContext());
+        assertThrows(ExecutionException.class, () -> CompletableFuture.allOf(inventoryTask.start().toArray(new CompletableFuture[0])).get(10L, TimeUnit.SECONDS));
+        inventoryTask.close();
     }
     
     @Test
@@ -90,13 +89,12 @@ class InventoryTaskTest {
         InventoryDumperConfiguration inventoryDumperConfig = createInventoryDumperConfiguration("t_order", "t_order");
         PipelineDataSourceWrapper dataSource = DATA_SOURCE_MANAGER.getDataSource(inventoryDumperConfig.getDataSourceConfig());
         PipelineTableMetaDataLoader metaDataLoader = new StandardPipelineTableMetaDataLoader(dataSource);
-        try (
-                InventoryTask inventoryTask = new InventoryTask(inventoryDumperConfig, taskConfig.getImporterConfig(),
-                        PipelineContextUtils.getPipelineChannelCreator(), new FixtureImporterConnector(), dataSource,
-                        metaDataLoader, PipelineContextUtils.getExecuteEngine(), PipelineContextUtils.getExecuteEngine(), new FixtureInventoryIncrementalJobItemContext())) {
-            CompletableFuture.allOf(inventoryTask.start().toArray(new CompletableFuture[0])).get(10L, TimeUnit.SECONDS);
-            assertThat(inventoryTask.getTaskProgress().getPosition(), instanceOf(IntegerPrimaryKeyPosition.class));
-        }
+        InventoryTask inventoryTask = new InventoryTask(inventoryDumperConfig, taskConfig.getImporterConfig(),
+                PipelineContextUtils.getPipelineChannelCreator(), new FixtureImporterConnector(), dataSource,
+                metaDataLoader, PipelineContextUtils.getExecuteEngine(), PipelineContextUtils.getExecuteEngine(), new FixtureInventoryIncrementalJobItemContext());
+        CompletableFuture.allOf(inventoryTask.start().toArray(new CompletableFuture[0])).get(10L, TimeUnit.SECONDS);
+        assertThat(inventoryTask.getTaskProgress().getPosition(), instanceOf(IntegerPrimaryKeyPosition.class));
+        inventoryTask.close();
     }
     
     private void initTableData(final DumperConfiguration dumperConfig) throws SQLException {