You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/08/01 03:01:30 UTC
[shardingsphere] branch master updated: Extract job progress persistence from RuleAlteredJobSchedulerCenter (#19695)
This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 1633afeefe7 Extract job progress persistence from RuleAlteredJobSchedulerCenter (#19695)
1633afeefe7 is described below
commit 1633afeefe71419b052911baa357021c4f3e77d8
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Mon Aug 1 11:01:23 2022 +0800
Extract job progress persistence from RuleAlteredJobSchedulerCenter (#19695)
---
.../job/persist/PipelineJobPersistCallback.java | 41 +++----
.../api/job/persist/PipelineJobPersistContext.java | 30 ++----
.../pipeline/core/importer/AbstractImporter.java | 8 +-
.../data/pipeline/core/task/IncrementalTask.java | 10 +-
.../data/pipeline/core/task/InventoryTask.java | 5 +-
.../rulealtered/RuleAlteredJobPersistService.java | 119 +++++++++++++++++++++
.../rulealtered/RuleAlteredJobPreparer.java | 19 ++--
.../rulealtered/RuleAlteredJobSchedulerCenter.java | 37 ++-----
.../rulealtered/prepare/InventoryTaskSplitter.java | 4 +-
.../scaling/core/job/importer/ImporterFactory.java | 10 +-
.../persist/AsyncPipelineJobPersistCallback.java | 30 +++---
.../pipeline/mysql/importer/MySQLImporter.java | 6 +-
.../opengauss/importer/OpenGaussImporter.java | 6 +-
.../postgresql/importer/PostgreSQLImporter.java | 6 +-
.../data/pipeline/cases/base/BaseITCase.java | 11 +-
.../api/impl/GovernanceRepositoryAPIImplTest.java | 5 +-
.../pipeline/core/fixture/FixtureImporter.java | 4 +-
...java => FixturePipelineJobPersistCallback.java} | 19 ++--
.../core/importer/AbstractImporterTest.java | 3 +-
.../pipeline/core/task/IncrementalTaskTest.java | 3 +-
.../data/pipeline/core/task/InventoryTaskTest.java | 5 +-
21 files changed, 253 insertions(+), 128 deletions(-)
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/persist/PipelineJobPersistCallback.java
similarity index 51%
copy from shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporter.java
copy to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/persist/PipelineJobPersistCallback.java
index 214ad50cccf..09260f40aff 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/persist/PipelineJobPersistCallback.java
@@ -15,28 +15,29 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.fixture;
+package org.apache.shardingsphere.data.pipeline.api.job.persist;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
-import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import org.apache.shardingsphere.data.pipeline.spi.importer.Importer;
-
-public final class FixtureImporter implements Importer {
-
- public FixtureImporter(final ImporterConfiguration importerConfig, final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel) {
- }
+/**
+ * Job persist callback.
+ */
+public interface PipelineJobPersistCallback {
- @Override
- public void start() {
- }
+ /**
+ * Get job id.
+ *
+ * @return job id
+ */
+ String getJobId();
- @Override
- public void stop() {
- }
+ /**
+ * Get sharding item.
+ *
+ * @return sharding item
+ */
+ int getShardingItem();
- @Override
- public void run() {
- start();
- }
+ /**
+ * Push persist event.
+ */
+ void pushPersistEvent();
}
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/persist/PipelineJobPersistContext.java
similarity index 51%
copy from shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporter.java
copy to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/persist/PipelineJobPersistContext.java
index 214ad50cccf..2f0b185e097 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/persist/PipelineJobPersistContext.java
@@ -15,28 +15,20 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.fixture;
+package org.apache.shardingsphere.data.pipeline.api.job.persist;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
-import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import org.apache.shardingsphere.data.pipeline.spi.importer.Importer;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
-public final class FixtureImporter implements Importer {
-
- public FixtureImporter(final ImporterConfiguration importerConfig, final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel) {
- }
+import java.util.concurrent.atomic.AtomicBoolean;
+
+@Getter
+@RequiredArgsConstructor
+public final class PipelineJobPersistContext {
- @Override
- public void start() {
- }
+ private final String jobId;
- @Override
- public void stop() {
- }
+ private final int shardingItem;
- @Override
- public void run() {
- start();
- }
+ private final AtomicBoolean alreadyPersisted = new AtomicBoolean(false);
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporter.java
index c606d4ef103..97f914398cd 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporter.java
@@ -28,6 +28,7 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.GroupedDataRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
+import org.apache.shardingsphere.data.pipeline.api.job.persist.PipelineJobPersistCallback;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobExecutionException;
import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
@@ -62,11 +63,15 @@ public abstract class AbstractImporter extends AbstractLifecycleExecutor impleme
private final PipelineChannel channel;
- protected AbstractImporter(final ImporterConfiguration importerConfig, final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel) {
+ private final PipelineJobPersistCallback pipelineJobPersistCallback;
+
+ protected AbstractImporter(final ImporterConfiguration importerConfig, final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel,
+ final PipelineJobPersistCallback pipelineJobPersistCallback) {
this.importerConfig = importerConfig;
this.dataSourceManager = dataSourceManager;
this.channel = channel;
pipelineSqlBuilder = PipelineSQLBuilderFactory.getInstance(importerConfig.getDataSourceConfig().getDatabaseType().getType());
+ this.pipelineJobPersistCallback = pipelineJobPersistCallback;
}
@Override
@@ -87,6 +92,7 @@ public abstract class AbstractImporter extends AbstractLifecycleExecutor impleme
rowCount += records.size();
flush(dataSourceManager.getDataSource(importerConfig.getDataSourceConfig()), records);
channel.ack(records);
+ pipelineJobPersistCallback.pushPersistEvent();
if (0 == round % 50) {
log.info("importer write, round={}, rowCount={}", round, rowCount);
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
index 568c0620242..161ed5fdfbb 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
@@ -27,6 +27,7 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChanne
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
+import org.apache.shardingsphere.data.pipeline.api.job.persist.PipelineJobPersistCallback;
import org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskProgress;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobExecutionException;
@@ -67,7 +68,7 @@ public final class IncrementalTask extends AbstractLifecycleExecutor implements
public IncrementalTask(final int concurrency, final DumperConfiguration dumperConfig, final ImporterConfiguration importerConfig,
final PipelineChannelCreator pipelineChannelCreator, final PipelineDataSourceManager dataSourceManager,
- final PipelineTableMetaDataLoader sourceMetaDataLoader, final ExecuteEngine incrementalDumperExecuteEngine) {
+ final PipelineTableMetaDataLoader sourceMetaDataLoader, final ExecuteEngine incrementalDumperExecuteEngine, final PipelineJobPersistCallback pipelineJobPersistCallback) {
this.incrementalDumperExecuteEngine = incrementalDumperExecuteEngine;
taskId = dumperConfig.getDataSourceName();
progress = new IncrementalTaskProgress();
@@ -75,7 +76,7 @@ public final class IncrementalTask extends AbstractLifecycleExecutor implements
progress.setPosition(position);
channel = createChannel(concurrency, pipelineChannelCreator, progress);
dumper = DumperFactory.createIncrementalDumper(dumperConfig, position, channel, sourceMetaDataLoader);
- importers = createImporters(concurrency, importerConfig, dataSourceManager, channel);
+ importers = createImporters(concurrency, importerConfig, dataSourceManager, channel, pipelineJobPersistCallback);
}
@Override
@@ -86,10 +87,11 @@ public final class IncrementalTask extends AbstractLifecycleExecutor implements
waitForResult(future);
}
- private Collection<Importer> createImporters(final int concurrency, final ImporterConfiguration importerConfig, final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel) {
+ private Collection<Importer> createImporters(final int concurrency, final ImporterConfiguration importerConfig, final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel,
+ final PipelineJobPersistCallback pipelineJobPersistCallback) {
Collection<Importer> result = new LinkedList<>();
for (int i = 0; i < concurrency; i++) {
- result.add(ImporterFactory.createImporter(importerConfig, dataSourceManager, channel));
+ result.add(ImporterFactory.createImporter(importerConfig, dataSourceManager, channel, pipelineJobPersistCallback));
}
return result;
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
index ea115a4b734..b04caf2da2b 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
@@ -27,6 +27,7 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChanne
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
+import org.apache.shardingsphere.data.pipeline.api.job.persist.PipelineJobPersistCallback;
import org.apache.shardingsphere.data.pipeline.api.task.progress.InventoryTaskProgress;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobExecutionException;
@@ -67,12 +68,12 @@ public final class InventoryTask extends AbstractLifecycleExecutor implements Pi
public InventoryTask(final InventoryDumperConfiguration inventoryDumperConfig, final ImporterConfiguration importerConfig,
final PipelineChannelCreator pipelineChannelCreator, final PipelineDataSourceManager dataSourceManager,
final DataSource sourceDataSource, final PipelineTableMetaDataLoader sourceMetaDataLoader,
- final ExecuteEngine importerExecuteEngine) {
+ final ExecuteEngine importerExecuteEngine, final PipelineJobPersistCallback pipelineJobPersistCallback) {
this.importerExecuteEngine = importerExecuteEngine;
taskId = generateTaskId(inventoryDumperConfig);
channel = createChannel(pipelineChannelCreator);
dumper = DumperFactory.createInventoryDumper(inventoryDumperConfig, channel, sourceDataSource, sourceMetaDataLoader);
- importer = ImporterFactory.createImporter(importerConfig, dataSourceManager, channel);
+ importer = ImporterFactory.createImporter(importerConfig, dataSourceManager, channel, pipelineJobPersistCallback);
position = inventoryDumperConfig.getPosition();
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPersistService.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPersistService.java
new file mode 100644
index 00000000000..dcda0743b23
--- /dev/null
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPersistService.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.job.persist.PipelineJobPersistContext;
+import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+
+/**
+ * Rule altered job persist service.
+ */
+
+@Slf4j
+public final class RuleAlteredJobPersistService {
+
+ private static final Map<String, Map<Integer, PipelineJobPersistContext>> JOB_PERSIST_MAP = new ConcurrentHashMap<>();
+
+ private static final GovernanceRepositoryAPI REPOSITORY_API = PipelineAPIFactory.getGovernanceRepositoryAPI();
+
+ private static final ScheduledExecutorService JOB_PERSIST_EXECUTOR = Executors.newSingleThreadScheduledExecutor(ExecutorThreadFactoryBuilder.build("scaling-job-schedule-%d"));
+
+ static {
+ JOB_PERSIST_EXECUTOR.scheduleWithFixedDelay(new PersistJobContextRunnable(), 5, 1, TimeUnit.SECONDS);
+ }
+
+ /**
+ * Remove job schedule parameter by job id.
+ *
+ * @param jobId job id
+ */
+ public static void removeJobPersistParameter(final String jobId) {
+ log.info("Remove job persist, job id: {}", jobId);
+ JOB_PERSIST_MAP.remove(jobId);
+ }
+
+ /**
+ * Add job schedule parameter.
+ *
+ * @param jobId job id
+ * @param shardingItem sharding item
+ */
+ public static void addJobPersistParameter(final String jobId, final int shardingItem) {
+ log.info("Add job schedule, jobId={}, shardingItem={}", jobId, shardingItem);
+ JOB_PERSIST_MAP.computeIfAbsent(jobId, key -> new ConcurrentHashMap<>()).put(shardingItem, new PipelineJobPersistContext(jobId, shardingItem));
+ }
+
+ /**
+ * Persist job process, may not be implemented immediately, depending on persist interval.
+ *
+ * @param jobId job id
+ * @param shardingItem sharding item
+ */
+ public static void triggerPersist(final String jobId, final int shardingItem) {
+ Map<Integer, PipelineJobPersistContext> intervalParamMap = JOB_PERSIST_MAP.getOrDefault(jobId, Collections.emptyMap());
+ PipelineJobPersistContext parameter = intervalParamMap.get(shardingItem);
+ if (null == parameter) {
+ log.debug("Persist interval parameter is null, jobId={}, shardingItem={}", jobId, shardingItem);
+ return;
+ }
+ parameter.getAlreadyPersisted().compareAndSet(true, false);
+ }
+
+ private static void persist(final String jobId, final int shardingItem, final long persistTimeMillis, final PipelineJobPersistContext param) {
+ Map<Integer, RuleAlteredJobScheduler> schedulerMap = RuleAlteredJobSchedulerCenter.getJobSchedulerMap(jobId);
+ RuleAlteredJobScheduler scheduler = schedulerMap.get(shardingItem);
+ if (null == scheduler) {
+ log.warn("job schedule not exists, job id: {}, sharding item: {}", jobId, shardingItem);
+ return;
+ }
+ log.info("execute persist, job id={}, sharding item={}, persistTimeMillis={}", jobId, shardingItem, persistTimeMillis);
+ REPOSITORY_API.persistJobProgress(scheduler.getJobContext());
+ param.getAlreadyPersisted().set(true);
+ }
+
+ private static final class PersistJobContextRunnable implements Runnable {
+
+ @Override
+ public void run() {
+ long currentTimeMillis = System.currentTimeMillis();
+ for (Entry<String, Map<Integer, PipelineJobPersistContext>> entry : JOB_PERSIST_MAP.entrySet()) {
+ entry.getValue().forEach((shardingItem, param) -> {
+ AtomicBoolean alreadyPersisted = param.getAlreadyPersisted();
+ if (alreadyPersisted.get()) {
+ return;
+ }
+ persist(entry.getKey(), shardingItem, currentTimeMillis, param);
+ alreadyPersisted.set(true);
+ });
+ }
+ }
+ }
+}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
index 6c56ec6179c..03404707be8 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
@@ -17,13 +17,6 @@
package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
-import java.sql.SQLException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Optional;
-import java.util.concurrent.TimeUnit;
-import javax.sql.DataSource;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
@@ -61,6 +54,15 @@ import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
import org.apache.shardingsphere.infra.lock.LockScope;
import org.apache.shardingsphere.infra.lock.ShardingSphereLock;
import org.apache.shardingsphere.infra.yaml.config.swapper.YamlDataSourceConfigurationSwapper;
+import org.apache.shardingsphere.scaling.core.job.persist.AsyncPipelineJobPersistCallback;
+
+import javax.sql.DataSource;
+import java.sql.SQLException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
/**
* Rule altered job preparer.
@@ -191,8 +193,9 @@ public final class RuleAlteredJobPreparer {
PipelineDataSourceManager dataSourceManager = jobContext.getDataSourceManager();
taskConfig.getDumperConfig().setPosition(getIncrementalPosition(jobContext, taskConfig, dataSourceManager));
PipelineTableMetaDataLoader sourceMetaDataLoader = jobContext.getSourceMetaDataLoader();
+ AsyncPipelineJobPersistCallback persistCallback = new AsyncPipelineJobPersistCallback(jobContext.getJobId(), jobContext.getShardingItem());
IncrementalTask incrementalTask = new IncrementalTask(taskConfig.getJobConfig().getConcurrency(),
- taskConfig.getDumperConfig(), taskConfig.getImporterConfig(), pipelineChannelCreator, dataSourceManager, sourceMetaDataLoader, incrementalDumperExecuteEngine);
+ taskConfig.getDumperConfig(), taskConfig.getImporterConfig(), pipelineChannelCreator, dataSourceManager, sourceMetaDataLoader, incrementalDumperExecuteEngine, persistCallback);
jobContext.getIncrementalTasks().add(incrementalTask);
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobSchedulerCenter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobSchedulerCenter.java
index 6b0f22b27b6..620f3ec611a 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobSchedulerCenter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobSchedulerCenter.java
@@ -21,16 +21,10 @@ import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
-import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
-import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
-import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
/**
* Rule altered job scheduler center.
@@ -42,10 +36,14 @@ public final class RuleAlteredJobSchedulerCenter {
private static final Map<String, Map<Integer, RuleAlteredJobScheduler>> JOB_SCHEDULER_MAP = new ConcurrentHashMap<>();
- private static final ScheduledExecutorService JOB_PERSIST_EXECUTOR = Executors.newSingleThreadScheduledExecutor(ExecutorThreadFactoryBuilder.build("scaling-job-persist-%d"));
-
- static {
- JOB_PERSIST_EXECUTOR.scheduleWithFixedDelay(new PersistJobContextRunnable(), 10, 10, TimeUnit.SECONDS);
+ /**
+ * Get job scheduler map.
+ *
+ * @param jobId job id
+ * @return job scheduler
+ */
+ public static Map<Integer, RuleAlteredJobScheduler> getJobSchedulerMap(final String jobId) {
+ return JOB_SCHEDULER_MAP.computeIfAbsent(jobId, k -> new ConcurrentHashMap<>());
}
/**
@@ -65,6 +63,7 @@ public final class RuleAlteredJobSchedulerCenter {
RuleAlteredJobScheduler jobScheduler = new RuleAlteredJobScheduler(jobContext);
jobScheduler.start();
schedulerMap.put(shardingItem, jobScheduler);
+ RuleAlteredJobPersistService.addJobPersistParameter(jobId, shardingItem);
}
/**
@@ -83,6 +82,7 @@ public final class RuleAlteredJobSchedulerCenter {
entry.getValue().stop();
}
JOB_SCHEDULER_MAP.remove(jobId);
+ RuleAlteredJobPersistService.removeJobPersistParameter(jobId);
}
/**
@@ -101,21 +101,4 @@ public final class RuleAlteredJobSchedulerCenter {
entry.getValue().getJobContext().setStatus(jobStatus);
}
}
-
- private static final class PersistJobContextRunnable implements Runnable {
-
- @Override
- public void run() {
- GovernanceRepositoryAPI repositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI();
- for (Entry<String, Map<Integer, RuleAlteredJobScheduler>> entry : JOB_SCHEDULER_MAP.entrySet()) {
- try {
- entry.getValue().forEach((shardingItem, jobScheduler) -> repositoryAPI.persistJobProgress(jobScheduler.getJobContext()));
- // CHECKSTYLE:OFF
- } catch (final Exception ex) {
- // CHECKSTYLE:ON
- log.error("persist job {} context failed.", entry.getKey(), ex);
- }
- }
- }
- }
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
index 81d17554660..2dc4079d45f 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
@@ -46,6 +46,7 @@ import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJ
import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
import org.apache.shardingsphere.infra.config.rule.rulealtered.OnRuleAlteredActionConfiguration.InputConfiguration;
+import org.apache.shardingsphere.scaling.core.job.persist.AsyncPipelineJobPersistCallback;
import javax.sql.DataSource;
import java.sql.Connection;
@@ -76,8 +77,9 @@ public final class InventoryTaskSplitter {
DataSource dataSource = jobContext.getSourceDataSource();
PipelineTableMetaDataLoader metaDataLoader = jobContext.getSourceMetaDataLoader();
ExecuteEngine importerExecuteEngine = jobContext.getRuleAlteredContext().getImporterExecuteEngine();
+ AsyncPipelineJobPersistCallback persistCallback = new AsyncPipelineJobPersistCallback(jobContext.getJobId(), jobContext.getShardingItem());
for (InventoryDumperConfiguration each : splitDumperConfig(jobContext, taskConfig.getDumperConfig())) {
- result.add(new InventoryTask(each, taskConfig.getImporterConfig(), pipelineChannelCreator, dataSourceManager, dataSource, metaDataLoader, importerExecuteEngine));
+ result.add(new InventoryTask(each, taskConfig.getImporterConfig(), pipelineChannelCreator, dataSourceManager, dataSource, metaDataLoader, importerExecuteEngine, persistCallback));
}
return result;
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/importer/ImporterFactory.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/importer/ImporterFactory.java
index 430496219ca..16999111f08 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/importer/ImporterFactory.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/importer/ImporterFactory.java
@@ -22,6 +22,7 @@ import lombok.NoArgsConstructor;
import lombok.SneakyThrows;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import org.apache.shardingsphere.data.pipeline.api.job.persist.PipelineJobPersistCallback;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.spi.importer.Importer;
import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
@@ -41,13 +42,16 @@ public final class ImporterFactory {
* @param importerConfig importer configuration
* @param dataSourceManager data source manager
* @param channel channel
+ * @param pipelineJobPersistCallback job persist callback
* @return importer
*/
@SneakyThrows(ReflectiveOperationException.class)
- public static Importer createImporter(final ImporterConfiguration importerConfig, final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel) {
+ public static Importer createImporter(final ImporterConfiguration importerConfig, final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel,
+ final PipelineJobPersistCallback pipelineJobPersistCallback) {
String databaseType = importerConfig.getDataSourceConfig().getDatabaseType().getType();
ScalingEntry scalingEntry = ScalingEntryFactory.getInstance(databaseType);
- Constructor<? extends Importer> constructor = scalingEntry.getImporterClass().getConstructor(ImporterConfiguration.class, PipelineDataSourceManager.class, PipelineChannel.class);
- return constructor.newInstance(importerConfig, dataSourceManager, channel);
+ Constructor<? extends Importer> constructor = scalingEntry.getImporterClass().getConstructor(ImporterConfiguration.class, PipelineDataSourceManager.class, PipelineChannel.class,
+ PipelineJobPersistCallback.class);
+ return constructor.newInstance(importerConfig, dataSourceManager, channel, pipelineJobPersistCallback);
}
}
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/persist/AsyncPipelineJobPersistCallback.java
similarity index 54%
copy from shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporter.java
copy to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/persist/AsyncPipelineJobPersistCallback.java
index 214ad50cccf..08b2b1336a6 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/persist/AsyncPipelineJobPersistCallback.java
@@ -15,28 +15,34 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.fixture;
+package org.apache.shardingsphere.scaling.core.job.persist;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
-import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import org.apache.shardingsphere.data.pipeline.spi.importer.Importer;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.data.pipeline.api.job.persist.PipelineJobPersistCallback;
+import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobPersistService;
-public final class FixtureImporter implements Importer {
+/**
+ * Async job process persist callback.
+ */
+@RequiredArgsConstructor
+public final class AsyncPipelineJobPersistCallback implements PipelineJobPersistCallback {
- public FixtureImporter(final ImporterConfiguration importerConfig, final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel) {
- }
+ private final String jobId;
+
+ private final int shardingItem;
@Override
- public void start() {
+ public String getJobId() {
+ return jobId;
}
@Override
- public void stop() {
+ public int getShardingItem() {
+ return shardingItem;
}
@Override
- public void run() {
- start();
+ public void pushPersistEvent() {
+ RuleAlteredJobPersistService.triggerPersist(jobId, shardingItem);
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/importer/MySQLImporter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/importer/MySQLImporter.java
index 843c6448057..002f0869450 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/importer/MySQLImporter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/importer/MySQLImporter.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.data.pipeline.mysql.importer;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import org.apache.shardingsphere.data.pipeline.api.job.persist.PipelineJobPersistCallback;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.importer.AbstractImporter;
@@ -27,8 +28,9 @@ import org.apache.shardingsphere.data.pipeline.core.importer.AbstractImporter;
*/
public final class MySQLImporter extends AbstractImporter {
- public MySQLImporter(final ImporterConfiguration importerConfig, final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel) {
- super(importerConfig, dataSourceManager, channel);
+ public MySQLImporter(final ImporterConfiguration importerConfig, final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel,
+ final PipelineJobPersistCallback pipelineJobPersistCallback) {
+ super(importerConfig, dataSourceManager, channel, pipelineJobPersistCallback);
}
@Override
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/importer/OpenGaussImporter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/importer/OpenGaussImporter.java
index 6d5be6db7d1..c864e5c04a4 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/importer/OpenGaussImporter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/importer/OpenGaussImporter.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.data.pipeline.opengauss.importer;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import org.apache.shardingsphere.data.pipeline.api.job.persist.PipelineJobPersistCallback;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.importer.AbstractImporter;
@@ -28,8 +29,9 @@ import org.apache.shardingsphere.data.pipeline.core.importer.AbstractImporter;
*/
public final class OpenGaussImporter extends AbstractImporter {
- public OpenGaussImporter(final ImporterConfiguration importerConfig, final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel) {
- super(importerConfig, dataSourceManager, channel);
+ public OpenGaussImporter(final ImporterConfiguration importerConfig, final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel,
+ final PipelineJobPersistCallback pipelineJobPersistCallback) {
+ super(importerConfig, dataSourceManager, channel, pipelineJobPersistCallback);
}
@Override
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/importer/PostgreSQLImporter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/importer/PostgreSQLImporter.java
index 8e96369d70c..26c1bac7aa6 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/importer/PostgreSQLImporter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/importer/PostgreSQLImporter.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.data.pipeline.postgresql.importer;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import org.apache.shardingsphere.data.pipeline.api.job.persist.PipelineJobPersistCallback;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.importer.AbstractImporter;
@@ -28,8 +29,9 @@ import org.apache.shardingsphere.data.pipeline.core.importer.AbstractImporter;
*/
public final class PostgreSQLImporter extends AbstractImporter {
- public PostgreSQLImporter(final ImporterConfiguration importerConfig, final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel) {
- super(importerConfig, dataSourceManager, channel);
+ public PostgreSQLImporter(final ImporterConfiguration importerConfig, final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel,
+ final PipelineJobPersistCallback pipelineJobPersistCallback) {
+ super(importerConfig, dataSourceManager, channel, pipelineJobPersistCallback);
}
@Override
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
index 17da802eeb9..e53c0880a90 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
@@ -23,7 +23,6 @@ import lombok.Getter;
import lombok.Setter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
import org.apache.shardingsphere.infra.database.metadata.url.JdbcUrlAppender;
@@ -53,11 +52,11 @@ import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@@ -351,8 +350,9 @@ public abstract class BaseITCase {
TimeUnit.SECONDS.timedJoin(increaseTaskThread, 60);
}
log.info("jobId: {}", jobId);
- Map<String, String> actualStatusMap = new HashMap<>(2, 1);
+ Set<String> actualStatus = null;
for (int i = 0; i < 15; i++) {
+ actualStatus = new HashSet<>();
List<Map<String, Object>> showScalingStatusResMap = showScalingStatus(jobId);
log.info("show scaling status result: {}", showScalingStatusResMap);
boolean finished = true;
@@ -361,8 +361,7 @@ public abstract class BaseITCase {
assertThat(status, not(JobStatus.PREPARING_FAILURE.name()));
assertThat(status, not(JobStatus.EXECUTE_INVENTORY_TASK_FAILURE.name()));
assertThat(status, not(JobStatus.EXECUTE_INCREMENTAL_TASK_FAILURE.name()));
- String datasourceName = entry.get("data_source").toString();
- actualStatusMap.put(datasourceName, status);
+ actualStatus.add(status);
if (!Objects.equals(status, JobStatus.EXECUTE_INCREMENTAL_TASK.name())) {
finished = false;
break;
@@ -374,7 +373,7 @@ public abstract class BaseITCase {
assertBeforeApplyScalingMetadataCorrectly();
ThreadUtil.sleep(4, TimeUnit.SECONDS);
}
- assertThat(actualStatusMap.values().stream().filter(StringUtils::isNotBlank).collect(Collectors.toSet()), is(Collections.singleton(JobStatus.EXECUTE_INCREMENTAL_TASK.name())));
+ assertThat(actualStatus, is(Collections.singleton(JobStatus.EXECUTE_INCREMENTAL_TASK.name())));
}
protected List<Map<String, Object>> showScalingStatus(final String jobId) {
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
index aee49a75fdc..12cfdc7c982 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
@@ -28,6 +28,7 @@ import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
+import org.apache.shardingsphere.data.pipeline.core.fixture.FixturePipelineJobPersistCallback;
import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.JobProgressYamlSwapper;
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
@@ -159,7 +160,7 @@ public final class GovernanceRepositoryAPIImplTest {
PipelineDataSourceWrapper dataSource = mock(PipelineDataSourceWrapper.class);
PipelineTableMetaDataLoader metaDataLoader = new PipelineTableMetaDataLoader(dataSource);
return new InventoryTask(dumperConfig, taskConfig.getImporterConfig(), PipelineContextUtil.getPipelineChannelCreator(),
- new PipelineDataSourceManager(), dataSource, metaDataLoader, PipelineContextUtil.getExecuteEngine());
+ new PipelineDataSourceManager(), dataSource, metaDataLoader, PipelineContextUtil.getExecuteEngine(), new FixturePipelineJobPersistCallback());
}
private IncrementalTask mockIncrementalTask(final TaskConfiguration taskConfig) {
@@ -167,6 +168,6 @@ public final class GovernanceRepositoryAPIImplTest {
dumperConfig.setPosition(new PlaceholderPosition());
PipelineTableMetaDataLoader metaDataLoader = new PipelineTableMetaDataLoader(mock(PipelineDataSourceWrapper.class));
return new IncrementalTask(3, dumperConfig, taskConfig.getImporterConfig(), PipelineContextUtil.getPipelineChannelCreator(),
- new PipelineDataSourceManager(), metaDataLoader, PipelineContextUtil.getExecuteEngine());
+ new PipelineDataSourceManager(), metaDataLoader, PipelineContextUtil.getExecuteEngine(), new FixturePipelineJobPersistCallback());
}
}
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporter.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporter.java
index 214ad50cccf..4932eb50cea 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporter.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporter.java
@@ -19,12 +19,14 @@ package org.apache.shardingsphere.data.pipeline.core.fixture;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import org.apache.shardingsphere.data.pipeline.api.job.persist.PipelineJobPersistCallback;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.spi.importer.Importer;
public final class FixtureImporter implements Importer {
- public FixtureImporter(final ImporterConfiguration importerConfig, final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel) {
+ public FixtureImporter(final ImporterConfiguration importerConfig, final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel,
+ final PipelineJobPersistCallback pipelineJobPersistCallback) {
}
@Override
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporter.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixturePipelineJobPersistCallback.java
similarity index 59%
copy from shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporter.java
copy to shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixturePipelineJobPersistCallback.java
index 214ad50cccf..00e1a3f30d8 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporter.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixturePipelineJobPersistCallback.java
@@ -17,26 +17,21 @@
package org.apache.shardingsphere.data.pipeline.core.fixture;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
-import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import org.apache.shardingsphere.data.pipeline.spi.importer.Importer;
+import org.apache.shardingsphere.data.pipeline.api.job.persist.PipelineJobPersistCallback;
-public final class FixtureImporter implements Importer {
-
- public FixtureImporter(final ImporterConfiguration importerConfig, final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel) {
- }
+public final class FixturePipelineJobPersistCallback implements PipelineJobPersistCallback {
@Override
- public void start() {
+ public String getJobId() {
+ return null;
}
@Override
- public void stop() {
+ public int getShardingItem() {
+ return 0;
}
@Override
- public void run() {
- start();
+ public void pushPersistEvent() {
}
}
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporterTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporterTest.java
index 6b1b19fabea..14406182f8b 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporterTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporterTest.java
@@ -30,6 +30,7 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
+import org.apache.shardingsphere.data.pipeline.core.fixture.FixturePipelineJobPersistCallback;
import org.apache.shardingsphere.data.pipeline.core.record.RecordUtil;
import org.junit.Before;
import org.junit.Test;
@@ -81,7 +82,7 @@ public final class AbstractImporterTest {
@Before
public void setUp() throws SQLException {
- jdbcImporter = new AbstractImporter(mockImporterConfiguration(), dataSourceManager, channel) {
+ jdbcImporter = new AbstractImporter(mockImporterConfiguration(), dataSourceManager, channel, new FixturePipelineJobPersistCallback()) {
@Override
protected String getSchemaName(final String logicTableName) {
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
index b9fe4b5aade..af11ca297ec 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
@@ -22,6 +22,7 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSource
import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
+import org.apache.shardingsphere.data.pipeline.core.fixture.FixturePipelineJobPersistCallback;
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
@@ -54,7 +55,7 @@ public final class IncrementalTaskTest {
PipelineTableMetaDataLoader metaDataLoader = new PipelineTableMetaDataLoader(mock(PipelineDataSourceWrapper.class));
incrementalTask = new IncrementalTask(3, taskConfig.getDumperConfig(), taskConfig.getImporterConfig(),
PipelineContextUtil.getPipelineChannelCreator(),
- new PipelineDataSourceManager(), metaDataLoader, PipelineContextUtil.getExecuteEngine());
+ new PipelineDataSourceManager(), metaDataLoader, PipelineContextUtil.getExecuteEngine(), new FixturePipelineJobPersistCallback());
}
@Test
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
index 45583ff664b..dc14ac4d84c 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
@@ -24,6 +24,7 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSource
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IntegerPrimaryKeyPosition;
import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
+import org.apache.shardingsphere.data.pipeline.core.fixture.FixturePipelineJobPersistCallback;
import org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
@@ -72,7 +73,7 @@ public final class InventoryTaskTest {
try (
InventoryTask inventoryTask = new InventoryTask(inventoryDumperConfig, taskConfig.getImporterConfig(),
PipelineContextUtil.getPipelineChannelCreator(),
- DATA_SOURCE_MANAGER, dataSource, metaDataLoader, PipelineContextUtil.getExecuteEngine())) {
+ DATA_SOURCE_MANAGER, dataSource, metaDataLoader, PipelineContextUtil.getExecuteEngine(), new FixturePipelineJobPersistCallback())) {
inventoryTask.start();
}
}
@@ -87,7 +88,7 @@ public final class InventoryTaskTest {
try (
InventoryTask inventoryTask = new InventoryTask(inventoryDumperConfig, taskConfig.getImporterConfig(),
PipelineContextUtil.getPipelineChannelCreator(),
- new PipelineDataSourceManager(), dataSource, metaDataLoader, PipelineContextUtil.getExecuteEngine())) {
+ new PipelineDataSourceManager(), dataSource, metaDataLoader, PipelineContextUtil.getExecuteEngine(), new FixturePipelineJobPersistCallback())) {
inventoryTask.start();
assertThat(inventoryTask.getProgress().getPosition(), instanceOf(IntegerPrimaryKeyPosition.class));
}