You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/08/06 04:55:39 UTC
[shardingsphere] branch master updated: Revise job progress persistence (#19909)
This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 4bdefa673d8 Revise job progress persistence (#19909)
4bdefa673d8 is described below
commit 4bdefa673d88a362f52fc02550841d58f8b73fe0
Author: Hongsheng Zhong <sa...@126.com>
AuthorDate: Sat Aug 6 12:55:34 2022 +0800
Revise job progress persistence (#19909)
---
.../listener/PipelineJobProgressListener.java} | 24 +++--------
.../pipeline/core/importer/AbstractImporter.java | 10 ++---
.../DefaultPipelineJobProgressListener.java} | 24 ++++-------
.../PipelineJobProgressPersistContext.java} | 7 +++-
.../PipelineJobProgressPersistService.java} | 48 +++++++++++-----------
.../data/pipeline/core/task/IncrementalTask.java | 11 ++---
.../data/pipeline/core/task/InventoryTask.java | 6 +--
.../scenario/rulealtered/RuleAlteredJob.java | 5 ++-
.../rulealtered/RuleAlteredJobPreparer.java | 6 +--
.../rulealtered/prepare/InventoryTaskSplitter.java | 6 +--
.../scaling/core/job/importer/ImporterFactory.java | 10 ++---
.../pipeline/mysql/importer/MySQLImporter.java | 6 +--
.../opengauss/importer/OpenGaussImporter.java | 6 +--
.../postgresql/importer/PostgreSQLImporter.java | 6 +--
.../api/impl/GovernanceRepositoryAPIImplTest.java | 6 +--
.../pipeline/core/fixture/FixtureImporter.java | 4 +-
...ava => FixturePipelineJobProgressListener.java} | 16 ++------
.../core/importer/AbstractImporterTest.java | 4 +-
.../pipeline/core/task/IncrementalTaskTest.java | 4 +-
.../data/pipeline/core/task/InventoryTaskTest.java | 6 +--
20 files changed, 93 insertions(+), 122 deletions(-)
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/persist/PipelineJobPersistCallback.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/listener/PipelineJobProgressListener.java
similarity index 67%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/persist/PipelineJobPersistCallback.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/listener/PipelineJobProgressListener.java
index 09260f40aff..4ca18491ef8 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/persist/PipelineJobPersistCallback.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/listener/PipelineJobProgressListener.java
@@ -15,29 +15,15 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.api.job.persist;
+package org.apache.shardingsphere.data.pipeline.api.job.progress.listener;
/**
- * Job persist callback.
+ * Pipeline job progress listener.
*/
-public interface PipelineJobPersistCallback {
+public interface PipelineJobProgressListener {
/**
- * Get job id.
- *
- * @return job id
+ * Emit on progress updated.
*/
- String getJobId();
-
- /**
- * Get sharding item.
- *
- * @return sharding item
- */
- int getShardingItem();
-
- /**
- * Push persist event.
- */
- void pushPersistEvent();
+ void onProgressUpdated();
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporter.java
index 97f914398cd..f0e3081376d 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporter.java
@@ -28,7 +28,7 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.GroupedDataRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
-import org.apache.shardingsphere.data.pipeline.api.job.persist.PipelineJobPersistCallback;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobExecutionException;
import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
@@ -63,15 +63,15 @@ public abstract class AbstractImporter extends AbstractLifecycleExecutor impleme
private final PipelineChannel channel;
- private final PipelineJobPersistCallback pipelineJobPersistCallback;
+ private final PipelineJobProgressListener jobProgressListener;
protected AbstractImporter(final ImporterConfiguration importerConfig, final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel,
- final PipelineJobPersistCallback pipelineJobPersistCallback) {
+ final PipelineJobProgressListener jobProgressListener) {
this.importerConfig = importerConfig;
this.dataSourceManager = dataSourceManager;
this.channel = channel;
pipelineSqlBuilder = PipelineSQLBuilderFactory.getInstance(importerConfig.getDataSourceConfig().getDatabaseType().getType());
- this.pipelineJobPersistCallback = pipelineJobPersistCallback;
+ this.jobProgressListener = jobProgressListener;
}
@Override
@@ -92,7 +92,7 @@ public abstract class AbstractImporter extends AbstractLifecycleExecutor impleme
rowCount += records.size();
flush(dataSourceManager.getDataSource(importerConfig.getDataSourceConfig()), records);
channel.ack(records);
- pipelineJobPersistCallback.pushPersistEvent();
+ jobProgressListener.onProgressUpdated();
if (0 == round % 50) {
log.info("importer write, round={}, rowCount={}", round, rowCount);
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/persist/AsyncPipelineJobPersistCallback.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/listener/DefaultPipelineJobProgressListener.java
similarity index 59%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/persist/AsyncPipelineJobPersistCallback.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/listener/DefaultPipelineJobProgressListener.java
index 08b2b1336a6..97b55c4e48a 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/persist/AsyncPipelineJobPersistCallback.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/listener/DefaultPipelineJobProgressListener.java
@@ -15,34 +15,24 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.scaling.core.job.persist;
+package org.apache.shardingsphere.data.pipeline.core.job.progress.listener;
import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.data.pipeline.api.job.persist.PipelineJobPersistCallback;
-import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobPersistService;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
+import org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
/**
- * Async job process persist callback.
+ * Default pipeline job progress listener implementation.
*/
@RequiredArgsConstructor
-public final class AsyncPipelineJobPersistCallback implements PipelineJobPersistCallback {
+public final class DefaultPipelineJobProgressListener implements PipelineJobProgressListener {
private final String jobId;
private final int shardingItem;
@Override
- public String getJobId() {
- return jobId;
- }
-
- @Override
- public int getShardingItem() {
- return shardingItem;
- }
-
- @Override
- public void pushPersistEvent() {
- RuleAlteredJobPersistService.triggerPersist(jobId, shardingItem);
+ public void onProgressUpdated() {
+ PipelineJobProgressPersistService.notifyPersist(jobId, shardingItem);
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/persist/PipelineJobPersistContext.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistContext.java
similarity index 87%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/persist/PipelineJobPersistContext.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistContext.java
index ea590ea94f0..b62d0b35ea9 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/persist/PipelineJobPersistContext.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistContext.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.api.job.persist;
+package org.apache.shardingsphere.data.pipeline.core.job.progress.persist;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
@@ -23,9 +23,12 @@ import lombok.RequiredArgsConstructor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
+/**
+ * Pipeline job progress persist context.
+ */
@Getter
@RequiredArgsConstructor
-public final class PipelineJobPersistContext {
+public final class PipelineJobProgressPersistContext {
private final String jobId;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPersistService.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
similarity index 66%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPersistService.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
index e7bc4d46a32..8380f08f906 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPersistService.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
@@ -15,12 +15,13 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
+package org.apache.shardingsphere.data.pipeline.core.job.progress.persist;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.job.persist.PipelineJobPersistContext;
import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobCenter;
+import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobScheduler;
import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
import java.util.Collections;
@@ -33,17 +34,16 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
/**
- * Rule altered job persist service.
+ * Pipeline job progress persist service.
*/
-
@Slf4j
-public final class RuleAlteredJobPersistService {
+public final class PipelineJobProgressPersistService {
- private static final Map<String, Map<Integer, PipelineJobPersistContext>> JOB_PERSIST_MAP = new ConcurrentHashMap<>();
+ private static final Map<String, Map<Integer, PipelineJobProgressPersistContext>> JOB_PROGRESS_PERSIST_MAP = new ConcurrentHashMap<>();
private static final GovernanceRepositoryAPI REPOSITORY_API = PipelineAPIFactory.getGovernanceRepositoryAPI();
- private static final ScheduledExecutorService JOB_PERSIST_EXECUTOR = Executors.newSingleThreadScheduledExecutor(ExecutorThreadFactoryBuilder.build("scaling-job-schedule-%d"));
+ private static final ScheduledExecutorService JOB_PERSIST_EXECUTOR = Executors.newSingleThreadScheduledExecutor(ExecutorThreadFactoryBuilder.build("pipeline-progress-persist-%d"));
private static final long DELAY_SECONDS = 1;
@@ -52,43 +52,43 @@ public final class RuleAlteredJobPersistService {
}
/**
- * Remove job schedule parameter by job id.
+ * Remove job progress persist context.
*
* @param jobId job id
*/
- public static void removeJobPersistParameter(final String jobId) {
- log.info("Remove job persist, job id: {}", jobId);
- JOB_PERSIST_MAP.remove(jobId);
+ public static void removeJobProgressPersistContext(final String jobId) {
+ log.info("Remove job progress persist context, jobId={}", jobId);
+ JOB_PROGRESS_PERSIST_MAP.remove(jobId);
}
/**
- * Add job schedule parameter.
+ * Add job progress persist context.
*
* @param jobId job id
* @param shardingItem sharding item
*/
- public static void addJobPersistParameter(final String jobId, final int shardingItem) {
- log.info("Add job schedule, jobId={}, shardingItem={}", jobId, shardingItem);
- JOB_PERSIST_MAP.computeIfAbsent(jobId, key -> new ConcurrentHashMap<>()).put(shardingItem, new PipelineJobPersistContext(jobId, shardingItem));
+ public static void addJobProgressPersistContext(final String jobId, final int shardingItem) {
+ log.info("Add job progress persist context, jobId={}, shardingItem={}", jobId, shardingItem);
+ JOB_PROGRESS_PERSIST_MAP.computeIfAbsent(jobId, key -> new ConcurrentHashMap<>()).put(shardingItem, new PipelineJobProgressPersistContext(jobId, shardingItem));
}
/**
- * Persist job process, may not be implemented immediately, depending on persist interval.
+ * Notify persist.
*
* @param jobId job id
* @param shardingItem sharding item
*/
- public static void triggerPersist(final String jobId, final int shardingItem) {
- Map<Integer, PipelineJobPersistContext> intervalParamMap = JOB_PERSIST_MAP.getOrDefault(jobId, Collections.emptyMap());
- PipelineJobPersistContext parameter = intervalParamMap.get(shardingItem);
- if (null == parameter) {
- log.debug("Persist interval parameter is null, jobId={}, shardingItem={}", jobId, shardingItem);
+ public static void notifyPersist(final String jobId, final int shardingItem) {
+ Map<Integer, PipelineJobProgressPersistContext> persistContextMap = JOB_PROGRESS_PERSIST_MAP.getOrDefault(jobId, Collections.emptyMap());
+ PipelineJobProgressPersistContext persistContext = persistContextMap.get(shardingItem);
+ if (null == persistContext) {
+ log.debug("persistContext is null, jobId={}, shardingItem={}", jobId, shardingItem);
return;
}
- parameter.getHasNewEvents().set(true);
+ persistContext.getHasNewEvents().set(true);
}
- private static void persist(final String jobId, final int shardingItem, final PipelineJobPersistContext persistContext) {
+ private static void persist(final String jobId, final int shardingItem, final PipelineJobProgressPersistContext persistContext) {
Long beforePersistingProgressMillis = persistContext.getBeforePersistingProgressMillis().get();
if ((null == beforePersistingProgressMillis || System.currentTimeMillis() - beforePersistingProgressMillis < TimeUnit.SECONDS.toMillis(DELAY_SECONDS))
&& !persistContext.getHasNewEvents().get()) {
@@ -116,7 +116,7 @@ public final class RuleAlteredJobPersistService {
@Override
public void run() {
- for (Entry<String, Map<Integer, PipelineJobPersistContext>> entry : JOB_PERSIST_MAP.entrySet()) {
+ for (Entry<String, Map<Integer, PipelineJobProgressPersistContext>> entry : JOB_PROGRESS_PERSIST_MAP.entrySet()) {
entry.getValue().forEach((shardingItem, persistContext) -> {
persist(entry.getKey(), shardingItem, persistContext);
});
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
index 55a8ab607a1..abafe1c9600 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
@@ -27,7 +27,7 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChanne
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
-import org.apache.shardingsphere.data.pipeline.api.job.persist.PipelineJobPersistCallback;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
import org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskProgress;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobExecutionException;
@@ -68,14 +68,15 @@ public final class IncrementalTask extends AbstractLifecycleExecutor implements
public IncrementalTask(final int concurrency, final DumperConfiguration dumperConfig, final ImporterConfiguration importerConfig,
final PipelineChannelCreator pipelineChannelCreator, final PipelineDataSourceManager dataSourceManager,
- final PipelineTableMetaDataLoader sourceMetaDataLoader, final ExecuteEngine incrementalDumperExecuteEngine, final PipelineJobPersistCallback pipelineJobPersistCallback) {
+ final PipelineTableMetaDataLoader sourceMetaDataLoader, final ExecuteEngine incrementalDumperExecuteEngine,
+ final PipelineJobProgressListener jobProgressListener) {
this.incrementalDumperExecuteEngine = incrementalDumperExecuteEngine;
taskId = dumperConfig.getDataSourceName();
IngestPosition<?> position = dumperConfig.getPosition();
progress = createIncrementalTaskProgress(position);
channel = createChannel(concurrency, pipelineChannelCreator, progress);
dumper = DumperFactory.createIncrementalDumper(dumperConfig, position, channel, sourceMetaDataLoader);
- importers = createImporters(concurrency, importerConfig, dataSourceManager, channel, pipelineJobPersistCallback);
+ importers = createImporters(concurrency, importerConfig, dataSourceManager, channel, jobProgressListener);
}
private IncrementalTaskProgress createIncrementalTaskProgress(final IngestPosition<?> position) {
@@ -93,10 +94,10 @@ public final class IncrementalTask extends AbstractLifecycleExecutor implements
}
private Collection<Importer> createImporters(final int concurrency, final ImporterConfiguration importerConfig, final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel,
- final PipelineJobPersistCallback pipelineJobPersistCallback) {
+ final PipelineJobProgressListener jobProgressListener) {
Collection<Importer> result = new LinkedList<>();
for (int i = 0; i < concurrency; i++) {
- result.add(ImporterFactory.createImporter(importerConfig, dataSourceManager, channel, pipelineJobPersistCallback));
+ result.add(ImporterFactory.createImporter(importerConfig, dataSourceManager, channel, jobProgressListener));
}
return result;
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
index b04caf2da2b..85a1f9cdfde 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
@@ -27,7 +27,7 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChanne
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
-import org.apache.shardingsphere.data.pipeline.api.job.persist.PipelineJobPersistCallback;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
import org.apache.shardingsphere.data.pipeline.api.task.progress.InventoryTaskProgress;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobExecutionException;
@@ -68,12 +68,12 @@ public final class InventoryTask extends AbstractLifecycleExecutor implements Pi
public InventoryTask(final InventoryDumperConfiguration inventoryDumperConfig, final ImporterConfiguration importerConfig,
final PipelineChannelCreator pipelineChannelCreator, final PipelineDataSourceManager dataSourceManager,
final DataSource sourceDataSource, final PipelineTableMetaDataLoader sourceMetaDataLoader,
- final ExecuteEngine importerExecuteEngine, final PipelineJobPersistCallback pipelineJobPersistCallback) {
+ final ExecuteEngine importerExecuteEngine, final PipelineJobProgressListener jobProgressListener) {
this.importerExecuteEngine = importerExecuteEngine;
taskId = generateTaskId(inventoryDumperConfig);
channel = createChannel(pipelineChannelCreator);
dumper = DumperFactory.createInventoryDumper(inventoryDumperConfig, channel, sourceDataSource, sourceMetaDataLoader);
- importer = ImporterFactory.createImporter(importerConfig, dataSourceManager, channel, pipelineJobPersistCallback);
+ importer = ImporterFactory.createImporter(importerConfig, dataSourceManager, channel, jobProgressListener);
position = inventoryDumperConfig.getPosition();
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
index f2f763efd69..ede339c7606 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
@@ -28,6 +28,7 @@ import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
+import org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
@@ -80,7 +81,7 @@ public final class RuleAlteredJob implements SimpleJob, PipelineJob {
RuleAlteredJobScheduler jobScheduler = new RuleAlteredJobScheduler(jobContext);
jobScheduler.start();
jobSchedulerMap.put(shardingItem, jobScheduler);
- RuleAlteredJobPersistService.addJobPersistParameter(jobId, shardingItem);
+ PipelineJobProgressPersistService.addJobProgressPersistContext(jobId, shardingItem);
}
/**
@@ -101,6 +102,6 @@ public final class RuleAlteredJob implements SimpleJob, PipelineJob {
each.stop();
}
jobSchedulerMap.clear();
- RuleAlteredJobPersistService.removeJobPersistParameter(jobId);
+ PipelineJobProgressPersistService.removeJobProgressPersistContext(jobId);
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
index 7c84cf39468..a7e32400aaa 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
@@ -34,6 +34,7 @@ import org.apache.shardingsphere.data.pipeline.core.exception.PipelineIgnoredExc
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
import org.apache.shardingsphere.data.pipeline.core.ingest.position.PositionInitializerFactory;
+import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.DefaultPipelineJobProgressListener;
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.DataSourcePreparer;
import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.DataSourcePreparerFactory;
@@ -53,7 +54,6 @@ import org.apache.shardingsphere.infra.datasource.pool.creator.DataSourcePoolCre
import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
import org.apache.shardingsphere.infra.lock.ShardingSphereLock;
import org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
-import org.apache.shardingsphere.scaling.core.job.persist.AsyncPipelineJobPersistCallback;
import javax.sql.DataSource;
import java.sql.SQLException;
@@ -192,9 +192,9 @@ public final class RuleAlteredJobPreparer {
PipelineDataSourceManager dataSourceManager = jobContext.getDataSourceManager();
taskConfig.getDumperConfig().setPosition(getIncrementalPosition(jobContext, taskConfig, dataSourceManager));
PipelineTableMetaDataLoader sourceMetaDataLoader = jobContext.getSourceMetaDataLoader();
- AsyncPipelineJobPersistCallback persistCallback = new AsyncPipelineJobPersistCallback(jobContext.getJobId(), jobContext.getShardingItem());
+ DefaultPipelineJobProgressListener jobProgressListener = new DefaultPipelineJobProgressListener(jobContext.getJobId(), jobContext.getShardingItem());
IncrementalTask incrementalTask = new IncrementalTask(taskConfig.getImporterConfig().getConcurrency(),
- taskConfig.getDumperConfig(), taskConfig.getImporterConfig(), pipelineChannelCreator, dataSourceManager, sourceMetaDataLoader, incrementalDumperExecuteEngine, persistCallback);
+ taskConfig.getDumperConfig(), taskConfig.getImporterConfig(), pipelineChannelCreator, dataSourceManager, sourceMetaDataLoader, incrementalDumperExecuteEngine, jobProgressListener);
jobContext.getIncrementalTasks().add(incrementalTask);
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
index fb4609f1dec..7d681d90413 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
@@ -34,6 +34,7 @@ import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourc
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCreationException;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
+import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.DefaultPipelineJobProgressListener;
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineIndexMetaData;
@@ -46,7 +47,6 @@ import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJ
import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
import org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineInputConfiguration;
-import org.apache.shardingsphere.scaling.core.job.persist.AsyncPipelineJobPersistCallback;
import javax.sql.DataSource;
import java.sql.Connection;
@@ -77,9 +77,9 @@ public final class InventoryTaskSplitter {
DataSource dataSource = jobContext.getSourceDataSource();
PipelineTableMetaDataLoader metaDataLoader = jobContext.getSourceMetaDataLoader();
ExecuteEngine importerExecuteEngine = jobContext.getRuleAlteredContext().getImporterExecuteEngine();
- AsyncPipelineJobPersistCallback persistCallback = new AsyncPipelineJobPersistCallback(jobContext.getJobId(), jobContext.getShardingItem());
+ DefaultPipelineJobProgressListener jobProgressListener = new DefaultPipelineJobProgressListener(jobContext.getJobId(), jobContext.getShardingItem());
for (InventoryDumperConfiguration each : splitDumperConfig(jobContext, taskConfig.getDumperConfig())) {
- result.add(new InventoryTask(each, taskConfig.getImporterConfig(), pipelineChannelCreator, dataSourceManager, dataSource, metaDataLoader, importerExecuteEngine, persistCallback));
+ result.add(new InventoryTask(each, taskConfig.getImporterConfig(), pipelineChannelCreator, dataSourceManager, dataSource, metaDataLoader, importerExecuteEngine, jobProgressListener));
}
return result;
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/importer/ImporterFactory.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/importer/ImporterFactory.java
index 16999111f08..fbe1bfeb305 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/importer/ImporterFactory.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/importer/ImporterFactory.java
@@ -22,7 +22,7 @@ import lombok.NoArgsConstructor;
import lombok.SneakyThrows;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
-import org.apache.shardingsphere.data.pipeline.api.job.persist.PipelineJobPersistCallback;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.spi.importer.Importer;
import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
@@ -42,16 +42,16 @@ public final class ImporterFactory {
* @param importerConfig importer configuration
* @param dataSourceManager data source manager
* @param channel channel
- * @param pipelineJobPersistCallback job persist callback
+ * @param jobProgressListener job progress listener
* @return importer
*/
@SneakyThrows(ReflectiveOperationException.class)
public static Importer createImporter(final ImporterConfiguration importerConfig, final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel,
- final PipelineJobPersistCallback pipelineJobPersistCallback) {
+ final PipelineJobProgressListener jobProgressListener) {
String databaseType = importerConfig.getDataSourceConfig().getDatabaseType().getType();
ScalingEntry scalingEntry = ScalingEntryFactory.getInstance(databaseType);
Constructor<? extends Importer> constructor = scalingEntry.getImporterClass().getConstructor(ImporterConfiguration.class, PipelineDataSourceManager.class, PipelineChannel.class,
- PipelineJobPersistCallback.class);
- return constructor.newInstance(importerConfig, dataSourceManager, channel, pipelineJobPersistCallback);
+ PipelineJobProgressListener.class);
+ return constructor.newInstance(importerConfig, dataSourceManager, channel, jobProgressListener);
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/importer/MySQLImporter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/importer/MySQLImporter.java
index 002f0869450..09af98ea2fe 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/importer/MySQLImporter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/importer/MySQLImporter.java
@@ -19,7 +19,7 @@ package org.apache.shardingsphere.data.pipeline.mysql.importer;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
-import org.apache.shardingsphere.data.pipeline.api.job.persist.PipelineJobPersistCallback;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.importer.AbstractImporter;
@@ -29,8 +29,8 @@ import org.apache.shardingsphere.data.pipeline.core.importer.AbstractImporter;
public final class MySQLImporter extends AbstractImporter {
public MySQLImporter(final ImporterConfiguration importerConfig, final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel,
- final PipelineJobPersistCallback pipelineJobPersistCallback) {
- super(importerConfig, dataSourceManager, channel, pipelineJobPersistCallback);
+ final PipelineJobProgressListener jobProgressListener) {
+ super(importerConfig, dataSourceManager, channel, jobProgressListener);
}
@Override
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/importer/OpenGaussImporter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/importer/OpenGaussImporter.java
index c864e5c04a4..831c3510b32 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/importer/OpenGaussImporter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/importer/OpenGaussImporter.java
@@ -19,7 +19,7 @@ package org.apache.shardingsphere.data.pipeline.opengauss.importer;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
-import org.apache.shardingsphere.data.pipeline.api.job.persist.PipelineJobPersistCallback;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.importer.AbstractImporter;
@@ -30,8 +30,8 @@ import org.apache.shardingsphere.data.pipeline.core.importer.AbstractImporter;
public final class OpenGaussImporter extends AbstractImporter {
public OpenGaussImporter(final ImporterConfiguration importerConfig, final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel,
- final PipelineJobPersistCallback pipelineJobPersistCallback) {
- super(importerConfig, dataSourceManager, channel, pipelineJobPersistCallback);
+ final PipelineJobProgressListener jobProgressListener) {
+ super(importerConfig, dataSourceManager, channel, jobProgressListener);
}
@Override
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/importer/PostgreSQLImporter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/importer/PostgreSQLImporter.java
index 26c1bac7aa6..571915d5c16 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/importer/PostgreSQLImporter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/importer/PostgreSQLImporter.java
@@ -19,7 +19,7 @@ package org.apache.shardingsphere.data.pipeline.postgresql.importer;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
-import org.apache.shardingsphere.data.pipeline.api.job.persist.PipelineJobPersistCallback;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.importer.AbstractImporter;
@@ -30,8 +30,8 @@ import org.apache.shardingsphere.data.pipeline.core.importer.AbstractImporter;
public final class PostgreSQLImporter extends AbstractImporter {
public PostgreSQLImporter(final ImporterConfiguration importerConfig, final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel,
- final PipelineJobPersistCallback pipelineJobPersistCallback) {
- super(importerConfig, dataSourceManager, channel, pipelineJobPersistCallback);
+ final PipelineJobProgressListener jobProgressListener) {
+ super(importerConfig, dataSourceManager, channel, jobProgressListener);
}
@Override
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
index 7a2100d2e12..83615933662 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
@@ -28,7 +28,7 @@ import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import org.apache.shardingsphere.data.pipeline.core.fixture.FixturePipelineJobPersistCallback;
+import org.apache.shardingsphere.data.pipeline.core.fixture.FixturePipelineJobProgressListener;
import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlJobProgressSwapper;
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
@@ -160,7 +160,7 @@ public final class GovernanceRepositoryAPIImplTest {
PipelineDataSourceWrapper dataSource = mock(PipelineDataSourceWrapper.class);
PipelineTableMetaDataLoader metaDataLoader = new PipelineTableMetaDataLoader(dataSource);
return new InventoryTask(dumperConfig, taskConfig.getImporterConfig(), PipelineContextUtil.getPipelineChannelCreator(),
- new PipelineDataSourceManager(), dataSource, metaDataLoader, PipelineContextUtil.getExecuteEngine(), new FixturePipelineJobPersistCallback());
+ new PipelineDataSourceManager(), dataSource, metaDataLoader, PipelineContextUtil.getExecuteEngine(), new FixturePipelineJobProgressListener());
}
private IncrementalTask mockIncrementalTask(final TaskConfiguration taskConfig) {
@@ -168,6 +168,6 @@ public final class GovernanceRepositoryAPIImplTest {
dumperConfig.setPosition(new PlaceholderPosition());
PipelineTableMetaDataLoader metaDataLoader = new PipelineTableMetaDataLoader(mock(PipelineDataSourceWrapper.class));
return new IncrementalTask(3, dumperConfig, taskConfig.getImporterConfig(), PipelineContextUtil.getPipelineChannelCreator(),
- new PipelineDataSourceManager(), metaDataLoader, PipelineContextUtil.getExecuteEngine(), new FixturePipelineJobPersistCallback());
+ new PipelineDataSourceManager(), metaDataLoader, PipelineContextUtil.getExecuteEngine(), new FixturePipelineJobProgressListener());
}
}
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporter.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporter.java
index 4932eb50cea..fdbd30b03dc 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporter.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporter.java
@@ -19,14 +19,14 @@ package org.apache.shardingsphere.data.pipeline.core.fixture;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
-import org.apache.shardingsphere.data.pipeline.api.job.persist.PipelineJobPersistCallback;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.spi.importer.Importer;
public final class FixtureImporter implements Importer {
public FixtureImporter(final ImporterConfiguration importerConfig, final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel,
- final PipelineJobPersistCallback pipelineJobPersistCallback) {
+ final PipelineJobProgressListener jobProgressListener) {
}
@Override
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixturePipelineJobPersistCallback.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixturePipelineJobProgressListener.java
similarity index 70%
rename from shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixturePipelineJobPersistCallback.java
rename to shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixturePipelineJobProgressListener.java
index 00e1a3f30d8..8a2a7a11702 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixturePipelineJobPersistCallback.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixturePipelineJobProgressListener.java
@@ -17,21 +17,11 @@
package org.apache.shardingsphere.data.pipeline.core.fixture;
-import org.apache.shardingsphere.data.pipeline.api.job.persist.PipelineJobPersistCallback;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
-public final class FixturePipelineJobPersistCallback implements PipelineJobPersistCallback {
+public final class FixturePipelineJobProgressListener implements PipelineJobProgressListener {
@Override
- public String getJobId() {
- return null;
- }
-
- @Override
- public int getShardingItem() {
- return 0;
- }
-
- @Override
- public void pushPersistEvent() {
+ public void onProgressUpdated() {
}
}
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporterTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporterTest.java
index 22b9c7c2b70..92d5a4632b1 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporterTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporterTest.java
@@ -30,7 +30,7 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import org.apache.shardingsphere.data.pipeline.core.fixture.FixturePipelineJobPersistCallback;
+import org.apache.shardingsphere.data.pipeline.core.fixture.FixturePipelineJobProgressListener;
import org.apache.shardingsphere.data.pipeline.core.record.RecordUtil;
import org.junit.Before;
import org.junit.Test;
@@ -82,7 +82,7 @@ public final class AbstractImporterTest {
@Before
public void setUp() throws SQLException {
- jdbcImporter = new AbstractImporter(mockImporterConfiguration(), dataSourceManager, channel, new FixturePipelineJobPersistCallback()) {
+ jdbcImporter = new AbstractImporter(mockImporterConfiguration(), dataSourceManager, channel, new FixturePipelineJobProgressListener()) {
@Override
protected String getSchemaName(final String logicTableName) {
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
index af11ca297ec..e81a8913b09 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
@@ -22,7 +22,7 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSource
import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import org.apache.shardingsphere.data.pipeline.core.fixture.FixturePipelineJobPersistCallback;
+import org.apache.shardingsphere.data.pipeline.core.fixture.FixturePipelineJobProgressListener;
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
@@ -55,7 +55,7 @@ public final class IncrementalTaskTest {
PipelineTableMetaDataLoader metaDataLoader = new PipelineTableMetaDataLoader(mock(PipelineDataSourceWrapper.class));
incrementalTask = new IncrementalTask(3, taskConfig.getDumperConfig(), taskConfig.getImporterConfig(),
PipelineContextUtil.getPipelineChannelCreator(),
- new PipelineDataSourceManager(), metaDataLoader, PipelineContextUtil.getExecuteEngine(), new FixturePipelineJobPersistCallback());
+ new PipelineDataSourceManager(), metaDataLoader, PipelineContextUtil.getExecuteEngine(), new FixturePipelineJobProgressListener());
}
@Test
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
index dc14ac4d84c..afde39ffba6 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
@@ -24,7 +24,7 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSource
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IntegerPrimaryKeyPosition;
import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import org.apache.shardingsphere.data.pipeline.core.fixture.FixturePipelineJobPersistCallback;
+import org.apache.shardingsphere.data.pipeline.core.fixture.FixturePipelineJobProgressListener;
import org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
@@ -73,7 +73,7 @@ public final class InventoryTaskTest {
try (
InventoryTask inventoryTask = new InventoryTask(inventoryDumperConfig, taskConfig.getImporterConfig(),
PipelineContextUtil.getPipelineChannelCreator(),
- DATA_SOURCE_MANAGER, dataSource, metaDataLoader, PipelineContextUtil.getExecuteEngine(), new FixturePipelineJobPersistCallback())) {
+ DATA_SOURCE_MANAGER, dataSource, metaDataLoader, PipelineContextUtil.getExecuteEngine(), new FixturePipelineJobProgressListener())) {
inventoryTask.start();
}
}
@@ -88,7 +88,7 @@ public final class InventoryTaskTest {
try (
InventoryTask inventoryTask = new InventoryTask(inventoryDumperConfig, taskConfig.getImporterConfig(),
PipelineContextUtil.getPipelineChannelCreator(),
- new PipelineDataSourceManager(), dataSource, metaDataLoader, PipelineContextUtil.getExecuteEngine(), new FixturePipelineJobPersistCallback())) {
+ new PipelineDataSourceManager(), dataSource, metaDataLoader, PipelineContextUtil.getExecuteEngine(), new FixturePipelineJobProgressListener())) {
inventoryTask.start();
assertThat(inventoryTask.getProgress().getPosition(), instanceOf(IntegerPrimaryKeyPosition.class));
}