You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2023/05/31 05:52:46 UTC
[shardingsphere] branch master updated: Use interface PipelineTask instead of implementation as possible (#25955)
This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 2932943a6da Use interface PipelineTask instead of implementation as possible (#25955)
2932943a6da is described below
commit 2932943a6da367fadadac0fb9f60e197ca066168
Author: 吴伟杰 <wu...@apache.org>
AuthorDate: Wed May 31 13:52:29 2023 +0800
Use interface PipelineTask instead of implementation as possible (#25955)
---
.../cdc/context/job/CDCJobItemContext.java | 7 +++----
.../pipeline/cdc/core/task/CDCTasksRunner.java | 5 ++---
.../api/task/progress/IncrementalTaskProgress.java | 6 +-----
.../pipeline/api/task/progress/TaskProgress.java | 9 ++++++++
.../AbstractInventoryIncrementalJobAPIImpl.java | 14 ++++++-------
.../InventoryIncrementalJobItemContext.java | 7 +++----
.../job/progress/PipelineJobProgressDetector.java | 7 ++++---
.../data/pipeline/core/task/IncrementalTask.java | 2 +-
.../core/task/InventoryIncrementalTasksRunner.java | 14 ++++++-------
.../data/pipeline/core/task/InventoryTask.java | 2 +-
.../data/pipeline/core/task/PipelineTask.java | 5 +++++
.../migration/context/MigrationJobItemContext.java | 7 +++----
.../FixtureInventoryIncrementalJobItemContext.java | 7 +++----
.../data/pipeline/core/task/InventoryTaskTest.java | 24 ++++++++++------------
14 files changed, 60 insertions(+), 56 deletions(-)
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/job/CDCJobItemContext.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/job/CDCJobItemContext.java
index e5111de8626..f42ef50242b 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/job/CDCJobItemContext.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/job/CDCJobItemContext.java
@@ -34,8 +34,7 @@ import org.apache.shardingsphere.data.pipeline.cdc.context.CDCProcessContext;
import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalJobItemContext;
import org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
-import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
-import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
+import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
import org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
import java.util.Collection;
@@ -68,9 +67,9 @@ public final class CDCJobItemContext implements InventoryIncrementalJobItemConte
private final ImporterConnector importerConnector;
- private final Collection<InventoryTask> inventoryTasks = new LinkedList<>();
+ private final Collection<PipelineTask> inventoryTasks = new LinkedList<>();
- private final Collection<IncrementalTask> incrementalTasks = new LinkedList<>();
+ private final Collection<PipelineTask> incrementalTasks = new LinkedList<>();
private final AtomicLong processedRecordsCount = new AtomicLong(0);
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCTasksRunner.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCTasksRunner.java
index 1497458c92d..9f78ddf90b8 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCTasksRunner.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCTasksRunner.java
@@ -18,9 +18,8 @@
package org.apache.shardingsphere.data.pipeline.cdc.core.task;
import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
-import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
import org.apache.shardingsphere.data.pipeline.core.task.InventoryIncrementalTasksRunner;
-import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
+import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
import java.util.Collection;
@@ -29,7 +28,7 @@ import java.util.Collection;
*/
public final class CDCTasksRunner extends InventoryIncrementalTasksRunner {
- public CDCTasksRunner(final PipelineJobItemContext jobItemContext, final Collection<InventoryTask> inventoryTasks, final Collection<IncrementalTask> incrementalTasks) {
+ public CDCTasksRunner(final PipelineJobItemContext jobItemContext, final Collection<PipelineTask> inventoryTasks, final Collection<PipelineTask> incrementalTasks) {
super(jobItemContext, inventoryTasks, incrementalTasks);
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/task/progress/IncrementalTaskProgress.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/task/progress/IncrementalTaskProgress.java
index d50e5755150..04870659724 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/task/progress/IncrementalTaskProgress.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/task/progress/IncrementalTaskProgress.java
@@ -35,11 +35,7 @@ public final class IncrementalTaskProgress implements TaskProgress {
incrementalTaskDelay.set(new IncrementalTaskDelay());
}
- /**
- * Get position.
- *
- * @return position
- */
+ @Override
public IngestPosition getPosition() {
return position.get();
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/task/progress/TaskProgress.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/task/progress/TaskProgress.java
index aec203b033d..957c03a8d9a 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/task/progress/TaskProgress.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/task/progress/TaskProgress.java
@@ -17,8 +17,17 @@
package org.apache.shardingsphere.data.pipeline.api.task.progress;
+import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
+
/**
* Task progress.
*/
public interface TaskProgress {
+
+ /**
+ * Get position.
+ *
+ * @return position
+ */
+ IngestPosition getPosition();
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
index cb65c7538b0..73ff026a3ae 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
@@ -33,6 +33,7 @@ import org.apache.shardingsphere.data.pipeline.api.job.progress.JobOffsetInfo;
import org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
import org.apache.shardingsphere.data.pipeline.api.pojo.InventoryIncrementalJobItemInfo;
import org.apache.shardingsphere.data.pipeline.api.pojo.TableBasedPipelineJobInfo;
+import org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskProgress;
import org.apache.shardingsphere.data.pipeline.api.task.progress.InventoryTaskProgress;
import org.apache.shardingsphere.data.pipeline.core.api.InventoryIncrementalJobAPI;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
@@ -47,8 +48,7 @@ import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlInvent
import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlInventoryIncrementalJobItemProgressSwapper;
import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlJobOffsetInfo;
import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlJobOffsetInfoSwapper;
-import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
-import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
+import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
@@ -158,14 +158,14 @@ public abstract class AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).persistJobItemProgress(jobId, context.getShardingItem(), value);
}
- private JobItemIncrementalTasksProgress getIncrementalTasksProgress(final Collection<IncrementalTask> incrementalTasks) {
- return new JobItemIncrementalTasksProgress(incrementalTasks.isEmpty() ? null : incrementalTasks.iterator().next().getTaskProgress());
+ private JobItemIncrementalTasksProgress getIncrementalTasksProgress(final Collection<PipelineTask> incrementalTasks) {
+ return new JobItemIncrementalTasksProgress(incrementalTasks.isEmpty() ? null : (IncrementalTaskProgress) incrementalTasks.iterator().next().getTaskProgress());
}
- private JobItemInventoryTasksProgress getInventoryTasksProgress(final Collection<InventoryTask> inventoryTasks) {
+ private JobItemInventoryTasksProgress getInventoryTasksProgress(final Collection<PipelineTask> inventoryTasks) {
Map<String, InventoryTaskProgress> inventoryTaskProgressMap = new HashMap<>();
- for (InventoryTask each : inventoryTasks) {
- inventoryTaskProgressMap.put(each.getTaskId(), each.getTaskProgress());
+ for (PipelineTask each : inventoryTasks) {
+ inventoryTaskProgressMap.put(each.getTaskId(), (InventoryTaskProgress) each.getTaskProgress());
}
return new JobItemInventoryTasksProgress(inventoryTaskProgressMap);
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/InventoryIncrementalJobItemContext.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/InventoryIncrementalJobItemContext.java
index fb9361baff4..c77a33219b2 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/InventoryIncrementalJobItemContext.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/InventoryIncrementalJobItemContext.java
@@ -21,8 +21,7 @@ import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContex
import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
-import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
-import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
+import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
import org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
import java.util.Collection;
@@ -40,14 +39,14 @@ public interface InventoryIncrementalJobItemContext extends PipelineJobItemConte
*
* @return inventory tasks
*/
- Collection<InventoryTask> getInventoryTasks();
+ Collection<PipelineTask> getInventoryTasks();
/**
* Get incremental tasks.
*
* @return incremental tasks
*/
- Collection<IncrementalTask> getIncrementalTasks();
+ Collection<PipelineTask> getIncrementalTasks();
/**
* Get init progress.
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/PipelineJobProgressDetector.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/PipelineJobProgressDetector.java
index f583b736789..d018ddf09ed 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/PipelineJobProgressDetector.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/PipelineJobProgressDetector.java
@@ -21,7 +21,8 @@ import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition;
-import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
+import org.apache.shardingsphere.data.pipeline.api.task.progress.InventoryTaskProgress;
+import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
import java.util.Collection;
@@ -38,10 +39,10 @@ public final class PipelineJobProgressDetector {
* @param inventoryTasks to check inventory tasks
* @return is finished
*/
- public static boolean allInventoryTasksFinished(final Collection<InventoryTask> inventoryTasks) {
+ public static boolean allInventoryTasksFinished(final Collection<PipelineTask> inventoryTasks) {
if (inventoryTasks.isEmpty()) {
log.warn("inventoryTasks is empty");
}
- return inventoryTasks.stream().allMatch(each -> each.getTaskProgress().getPosition() instanceof FinishedPosition);
+ return inventoryTasks.stream().allMatch(each -> ((InventoryTaskProgress) each.getTaskProgress()).getPosition() instanceof FinishedPosition);
}
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
index 75426260d90..e028e86a492 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
@@ -54,7 +54,7 @@ import java.util.concurrent.CompletableFuture;
*/
@Slf4j
@ToString(exclude = {"incrementalExecuteEngine", "channel", "dumper", "importers", "taskProgress"})
-public final class IncrementalTask implements PipelineTask, AutoCloseable {
+public final class IncrementalTask implements PipelineTask {
@Getter
private final String taskId;
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
index 13432217b60..e5dc2fceb3b 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
@@ -45,13 +45,13 @@ public class InventoryIncrementalTasksRunner implements PipelineTasksRunner {
@Getter
private final PipelineJobItemContext jobItemContext;
- private final Collection<InventoryTask> inventoryTasks;
+ private final Collection<PipelineTask> inventoryTasks;
- private final Collection<IncrementalTask> incrementalTasks;
+ private final Collection<PipelineTask> incrementalTasks;
private final PipelineJobAPI jobAPI;
- public InventoryIncrementalTasksRunner(final PipelineJobItemContext jobItemContext, final Collection<InventoryTask> inventoryTasks, final Collection<IncrementalTask> incrementalTasks) {
+ public InventoryIncrementalTasksRunner(final PipelineJobItemContext jobItemContext, final Collection<PipelineTask> inventoryTasks, final Collection<PipelineTask> incrementalTasks) {
this.jobItemContext = jobItemContext;
this.inventoryTasks = inventoryTasks;
this.incrementalTasks = incrementalTasks;
@@ -61,11 +61,11 @@ public class InventoryIncrementalTasksRunner implements PipelineTasksRunner {
@Override
public void stop() {
jobItemContext.setStopping(true);
- for (InventoryTask each : inventoryTasks) {
+ for (PipelineTask each : inventoryTasks) {
each.stop();
each.close();
}
- for (IncrementalTask each : incrementalTasks) {
+ for (PipelineTask each : incrementalTasks) {
each.stop();
each.close();
}
@@ -88,7 +88,7 @@ public class InventoryIncrementalTasksRunner implements PipelineTasksRunner {
private synchronized void executeInventoryTask() {
updateLocalAndRemoteJobItemStatus(JobStatus.EXECUTE_INVENTORY_TASK);
Collection<CompletableFuture<?>> futures = new LinkedList<>();
- for (InventoryTask each : inventoryTasks) {
+ for (PipelineTask each : inventoryTasks) {
if (each.getTaskProgress().getPosition() instanceof FinishedPosition) {
continue;
}
@@ -113,7 +113,7 @@ public class InventoryIncrementalTasksRunner implements PipelineTasksRunner {
}
updateLocalAndRemoteJobItemStatus(JobStatus.EXECUTE_INCREMENTAL_TASK);
Collection<CompletableFuture<?>> futures = new LinkedList<>();
- for (IncrementalTask each : incrementalTasks) {
+ for (PipelineTask each : incrementalTasks) {
if (each.getTaskProgress().getPosition() instanceof FinishedPosition) {
continue;
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
index 852230ed3c0..4adad77d972 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
@@ -50,7 +50,7 @@ import java.util.concurrent.atomic.AtomicReference;
*/
@ToString(exclude = {"inventoryDumperExecuteEngine", "inventoryImporterExecuteEngine", "channel", "dumper", "importer"})
@Slf4j
-public final class InventoryTask implements PipelineTask, AutoCloseable {
+public final class InventoryTask implements PipelineTask {
@Getter
private final String taskId;
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTask.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTask.java
index 98690873d30..1b090ee9b2a 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTask.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTask.java
@@ -52,4 +52,9 @@ public interface PipelineTask {
* @return task progress
*/
TaskProgress getTaskProgress();
+
+ /**
+ * Close.
+ */
+ void close();
}
diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java
index 0cfc9de76b6..2643b37ddd4 100644
--- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java
+++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java
@@ -32,8 +32,7 @@ import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncremental
import org.apache.shardingsphere.data.pipeline.core.importer.connector.DataSourceImporterConnector;
import org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
-import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
-import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
+import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
import org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
@@ -63,9 +62,9 @@ public final class MigrationJobItemContext implements InventoryIncrementalJobIte
private final MigrationTaskConfiguration taskConfig;
- private final Collection<InventoryTask> inventoryTasks = new LinkedList<>();
+ private final Collection<PipelineTask> inventoryTasks = new LinkedList<>();
- private final Collection<IncrementalTask> incrementalTasks = new LinkedList<>();
+ private final Collection<PipelineTask> incrementalTasks = new LinkedList<>();
private final AtomicLong processedRecordsCount = new AtomicLong(0);
diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureInventoryIncrementalJobItemContext.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureInventoryIncrementalJobItemContext.java
index cb30e1c817a..e344c3123e9 100644
--- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureInventoryIncrementalJobItemContext.java
+++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureInventoryIncrementalJobItemContext.java
@@ -24,8 +24,7 @@ import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.Pipelin
import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalJobItemContext;
import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalProcessContext;
-import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
-import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
+import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
import org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
import java.util.Collection;
@@ -42,12 +41,12 @@ public final class FixtureInventoryIncrementalJobItemContext implements Inventor
}
@Override
- public Collection<InventoryTask> getInventoryTasks() {
+ public Collection<PipelineTask> getInventoryTasks() {
return null;
}
@Override
- public Collection<IncrementalTask> getIncrementalTasks() {
+ public Collection<PipelineTask> getIncrementalTasks() {
return null;
}
diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/InventoryTaskTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/InventoryTaskTest.java
index 0de03d0f2fd..8aeb66607a5 100644
--- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/InventoryTaskTest.java
+++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/InventoryTaskTest.java
@@ -75,12 +75,11 @@ class InventoryTaskTest {
InventoryDumperConfiguration inventoryDumperConfig = createInventoryDumperConfiguration("t_non_exist", "t_non_exist");
PipelineDataSourceWrapper dataSource = DATA_SOURCE_MANAGER.getDataSource(inventoryDumperConfig.getDataSourceConfig());
PipelineTableMetaDataLoader metaDataLoader = new StandardPipelineTableMetaDataLoader(dataSource);
- try (
- InventoryTask inventoryTask = new InventoryTask(inventoryDumperConfig, taskConfig.getImporterConfig(),
- PipelineContextUtils.getPipelineChannelCreator(), new FixtureImporterConnector(), dataSource,
- metaDataLoader, PipelineContextUtils.getExecuteEngine(), PipelineContextUtils.getExecuteEngine(), new FixtureInventoryIncrementalJobItemContext())) {
- assertThrows(ExecutionException.class, () -> CompletableFuture.allOf(inventoryTask.start().toArray(new CompletableFuture[0])).get(10L, TimeUnit.SECONDS));
- }
+ InventoryTask inventoryTask = new InventoryTask(inventoryDumperConfig, taskConfig.getImporterConfig(),
+ PipelineContextUtils.getPipelineChannelCreator(), new FixtureImporterConnector(), dataSource,
+ metaDataLoader, PipelineContextUtils.getExecuteEngine(), PipelineContextUtils.getExecuteEngine(), new FixtureInventoryIncrementalJobItemContext());
+ assertThrows(ExecutionException.class, () -> CompletableFuture.allOf(inventoryTask.start().toArray(new CompletableFuture[0])).get(10L, TimeUnit.SECONDS));
+ inventoryTask.close();
}
@Test
@@ -90,13 +89,12 @@ class InventoryTaskTest {
InventoryDumperConfiguration inventoryDumperConfig = createInventoryDumperConfiguration("t_order", "t_order");
PipelineDataSourceWrapper dataSource = DATA_SOURCE_MANAGER.getDataSource(inventoryDumperConfig.getDataSourceConfig());
PipelineTableMetaDataLoader metaDataLoader = new StandardPipelineTableMetaDataLoader(dataSource);
- try (
- InventoryTask inventoryTask = new InventoryTask(inventoryDumperConfig, taskConfig.getImporterConfig(),
- PipelineContextUtils.getPipelineChannelCreator(), new FixtureImporterConnector(), dataSource,
- metaDataLoader, PipelineContextUtils.getExecuteEngine(), PipelineContextUtils.getExecuteEngine(), new FixtureInventoryIncrementalJobItemContext())) {
- CompletableFuture.allOf(inventoryTask.start().toArray(new CompletableFuture[0])).get(10L, TimeUnit.SECONDS);
- assertThat(inventoryTask.getTaskProgress().getPosition(), instanceOf(IntegerPrimaryKeyPosition.class));
- }
+ InventoryTask inventoryTask = new InventoryTask(inventoryDumperConfig, taskConfig.getImporterConfig(),
+ PipelineContextUtils.getPipelineChannelCreator(), new FixtureImporterConnector(), dataSource,
+ metaDataLoader, PipelineContextUtils.getExecuteEngine(), PipelineContextUtils.getExecuteEngine(), new FixtureInventoryIncrementalJobItemContext());
+ CompletableFuture.allOf(inventoryTask.start().toArray(new CompletableFuture[0])).get(10L, TimeUnit.SECONDS);
+ assertThat(inventoryTask.getTaskProgress().getPosition(), instanceOf(IntegerPrimaryKeyPosition.class));
+ inventoryTask.close();
}
private void initTableData(final DumperConfiguration dumperConfig) throws SQLException {