You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2022/01/27 08:36:01 UTC
[shardingsphere] branch master updated: Refactor start() and stop() of AbstractLifecycleExecutor; Refactor close() in inventory task and incremental task; Refactor Importer.write() (#15112)
This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 2f41cb0 Refactor start() and stop() of AbstractLifecycleExecutor; Refactor close() in inventory task and incremental task; Refactor Importer.write() (#15112)
2f41cb0 is described below
commit 2f41cb09fff8fa18965163a434b7fd646ca2a634
Author: Hongsheng Zhong <sa...@126.com>
AuthorDate: Thu Jan 27 16:34:54 2022 +0800
Refactor start() and stop() of AbstractLifecycleExecutor; Refactor close() in inventory task and incremental task; Refactor Importer.write() (#15112)
---
.../pipeline/core/execute/FinishedCheckJobExecutor.java | 8 +++++---
.../data/pipeline/core/execute/PipelineJobExecutor.java | 8 +++++---
.../data/pipeline/core/importer/AbstractImporter.java | 10 ++++++----
.../core/ingest/dumper/AbstractInventoryDumper.java | 17 ++++++++++++++---
.../data/pipeline/core/task/IncrementalTask.java | 10 +++++++---
.../data/pipeline/core/task/InventoryTask.java | 7 +++----
.../scenario/rulealtered/RuleAlteredJobScheduler.java | 7 ++++---
.../pipeline/mysql/ingest/MySQLIncrementalDumper.java | 7 +++++--
.../pipeline/opengauss/ingest/OpenGaussWalDumper.java | 9 ++++++---
.../pipeline/postgresql/ingest/PostgreSQLWalDumper.java | 8 +++++---
.../api/executor/AbstractLifecycleExecutor.java | 17 ++++++++++++++---
.../data/pipeline/spi/importer/Importer.java | 5 -----
.../data/pipeline/core/fixture/FixtureImporter.java | 5 -----
.../pipeline/core/fixture/FixtureIncrementalDumper.java | 4 ++--
14 files changed, 76 insertions(+), 46 deletions(-)
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/FinishedCheckJobExecutor.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/FinishedCheckJobExecutor.java
index 3ea73ca..2f287e9 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/FinishedCheckJobExecutor.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/FinishedCheckJobExecutor.java
@@ -35,13 +35,15 @@ public final class FinishedCheckJobExecutor extends AbstractLifecycleExecutor {
private static final String CRON_EXPRESSION = "0 * * * * ?";
@Override
- public void start() {
- super.start();
- log.info("Start finished check job executor.");
+ protected void doStart() {
new ScheduleJobBootstrap(PipelineAPIFactory.getRegistryCenter(), new FinishedCheckJob(), createJobConfig()).schedule();
}
private JobConfiguration createJobConfig() {
return JobConfiguration.newBuilder(JOB_NAME, 1).cron(CRON_EXPRESSION).build();
}
+
+ @Override
+ protected void doStop() {
+ }
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
index e4a9509..76125d4 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
@@ -44,9 +44,7 @@ public final class PipelineJobExecutor extends AbstractLifecycleExecutor {
private static final Set<String> EXECUTING_JOBS = Sets.newConcurrentHashSet();
@Override
- public void start() {
- super.start();
- log.info("Start scaling job executor.");
+ protected void doStart() {
watchGovernanceRepositoryConfiguration();
}
@@ -96,4 +94,8 @@ public final class PipelineJobExecutor extends AbstractLifecycleExecutor {
log.info("{} added to executing jobs failed since it already exists", jobConfigPOJO.getJobName());
}
}
+
+ @Override
+ protected void doStop() {
+ }
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporter.java
index ff0d17c..bb154a2 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporter.java
@@ -76,13 +76,11 @@ public abstract class AbstractImporter extends AbstractLifecycleExecutor impleme
protected abstract PipelineSQLBuilder createSQLBuilder(Map<String, Set<String>> shardingColumnsMap);
@Override
- public final void start() {
- super.start();
+ protected void doStart() {
write();
}
- @Override
- public final void write() {
+ private void write() {
log.info("importer write");
int round = 1;
int rowCount = 0;
@@ -218,4 +216,8 @@ public abstract class AbstractImporter extends AbstractLifecycleExecutor impleme
ps.executeBatch();
}
}
+
+ @Override
+ protected void doStop() {
+ }
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
index 4819a1b..dbb54ae 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
@@ -88,8 +88,7 @@ public abstract class AbstractInventoryDumper extends AbstractLifecycleExecutor
}
@Override
- public final void start() {
- super.start();
+ protected void doStart() {
dump();
}
@@ -103,6 +102,10 @@ public abstract class AbstractInventoryDumper extends AbstractLifecycleExecutor
Optional<Number> maxUniqueKeyValue;
while ((maxUniqueKeyValue = dump0(conn, sql, startUniqueKeyValue, round++)).isPresent()) {
startUniqueKeyValue = maxUniqueKeyValue.get();
+ if (!isRunning()) {
+ log.info("inventory dump, running is false, break");
+ break;
+ }
}
log.info("inventory dump done, round={}, maxUniqueKeyValue={}", round, maxUniqueKeyValue);
} catch (final SQLException ex) {
@@ -132,7 +135,7 @@ public abstract class AbstractInventoryDumper extends AbstractLifecycleExecutor
ResultSetMetaData metaData = resultSet.getMetaData();
int rowCount = 0;
Number maxUniqueKeyValue = null;
- while (isRunning() && resultSet.next()) {
+ while (resultSet.next()) {
DataRecord record = new DataRecord(newPosition(resultSet), metaData.getColumnCount());
record.setType(IngestDataChangeType.INSERT);
record.setTableName(inventoryDumperConfig.getTableNameMap().get(inventoryDumperConfig.getTableName()));
@@ -146,6 +149,10 @@ public abstract class AbstractInventoryDumper extends AbstractLifecycleExecutor
}
pushRecord(record);
rowCount++;
+ if (!isRunning()) {
+ log.info("dump, running is false, break");
+ break;
+ }
}
if (log.isDebugEnabled()) {
log.debug("dump, round={}, rowCount={}, maxUniqueKeyValue={}", round, rowCount, maxUniqueKeyValue);
@@ -193,4 +200,8 @@ public abstract class AbstractInventoryDumper extends AbstractLifecycleExecutor
private void pushRecord(final Record record) {
channel.pushRecord(record);
}
+
+ @Override
+ protected void doStop() {
+ }
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
index 4c967f2..166e689 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
@@ -48,7 +48,7 @@ import java.util.concurrent.Future;
*/
@Slf4j
@ToString(exclude = {"incrementalDumperExecuteEngine", "dataSourceManager", "dumper", "progress"})
-public final class IncrementalTask extends AbstractLifecycleExecutor implements PipelineTask {
+public final class IncrementalTask extends AbstractLifecycleExecutor implements PipelineTask, AutoCloseable {
@Getter
private final String taskId;
@@ -81,7 +81,7 @@ public final class IncrementalTask extends AbstractLifecycleExecutor implements
}
@Override
- public void start() {
+ protected void doStart() {
progress.getIncrementalTaskDelay().setLatestActiveTimeMillis(System.currentTimeMillis());
Future<?> future = incrementalDumperExecuteEngine.submitAll(importers, getExecuteCallback());
dumper.start();
@@ -133,11 +133,15 @@ public final class IncrementalTask extends AbstractLifecycleExecutor implements
}
@Override
- public void stop() {
+ protected void doStop() {
dumper.stop();
for (Importer each : importers) {
each.stop();
}
+ }
+
+ @Override
+ public void close() {
channel.close();
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
index e02c4a1..ffd0f97 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
@@ -82,7 +82,7 @@ public final class InventoryTask extends AbstractLifecycleExecutor implements Pi
}
@Override
- public void start() {
+ protected void doStart() {
Future<?> future = importerExecuteEngine.submit(importer, new ExecuteCallback() {
@Override
@@ -131,10 +131,9 @@ public final class InventoryTask extends AbstractLifecycleExecutor implements Pi
}
@Override
- public void stop() {
+ protected void doStop() {
dumper.stop();
importer.stop();
- channel.close();
}
@Override
@@ -144,6 +143,6 @@ public final class InventoryTask extends AbstractLifecycleExecutor implements Pi
@Override
public void close() {
- dataSourceManager.close();
+ channel.close();
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
index d7011b2..e48fc0e 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
@@ -25,7 +25,6 @@ import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
-import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
/**
* Rule altered job scheduler.
@@ -58,13 +57,15 @@ public final class RuleAlteredJobScheduler implements Runnable {
jobPreparer.cleanup(jobContext);
}
}
- for (PipelineTask each : jobContext.getInventoryTasks()) {
+ for (InventoryTask each : jobContext.getInventoryTasks()) {
log.info("stop inventory task {} - {}", jobContext.getJobId(), each.getTaskId());
each.stop();
+ each.close();
}
- for (PipelineTask each : jobContext.getIncrementalTasks()) {
+ for (IncrementalTask each : jobContext.getIncrementalTasks()) {
log.info("stop incremental task {} - {}", jobContext.getJobId(), each.getTaskId());
each.stop();
+ each.close();
}
jobContext.close();
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
index aac2245..5395875 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
@@ -88,8 +88,7 @@ public final class MySQLIncrementalDumper extends AbstractIncrementalDumper<Binl
}
@Override
- public void start() {
- super.start();
+ protected void doStart() {
dump();
}
@@ -197,4 +196,8 @@ public final class MySQLIncrementalDumper extends AbstractIncrementalDumper<Binl
private void pushRecord(final Record record) {
channel.pushRecord(record);
}
+
+ @Override
+ protected void doStop() {
+ }
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
index ad83280..5c0c601 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
@@ -74,11 +74,10 @@ public final class OpenGaussWalDumper extends AbstractLifecycleExecutor implemen
}
@Override
- public void start() {
- super.start();
+ protected void doStart() {
dump();
}
-
+
private PgConnection getReplicationConn() throws SQLException {
return logicalReplication
.createPgConnection((StandardPipelineDataSourceConfiguration) dumperConfig.getDataSourceConfig())
@@ -131,5 +130,9 @@ public final class OpenGaussWalDumper extends AbstractLifecycleExecutor implemen
private void pushRecord(final Record record) {
channel.pushRecord(record);
}
+
+ @Override
+ protected void doStop() {
+ }
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java
index 7f81e63..7a0acd7 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java
@@ -70,8 +70,7 @@ public final class PostgreSQLWalDumper extends AbstractIncrementalDumper<WalPosi
}
@Override
- public void start() {
- super.start();
+ protected void doStart() {
dump();
}
@@ -101,5 +100,8 @@ public final class PostgreSQLWalDumper extends AbstractIncrementalDumper<WalPosi
private void pushRecord(final Record record) {
channel.pushRecord(record);
}
+
+ @Override
+ protected void doStop() {
+ }
}
-
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/executor/AbstractLifecycleExecutor.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/executor/AbstractLifecycleExecutor.java
index e358bf7..a71d863 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/executor/AbstractLifecycleExecutor.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/executor/AbstractLifecycleExecutor.java
@@ -25,8 +25,6 @@ import lombok.extern.slf4j.Slf4j;
/**
* Abstract lifecycle executor.
*/
-@Getter
-@Setter
@Slf4j
public abstract class AbstractLifecycleExecutor implements LifecycleExecutor {
@@ -34,17 +32,30 @@ public abstract class AbstractLifecycleExecutor implements LifecycleExecutor {
@Getter(AccessLevel.PROTECTED)
private volatile boolean running;
+ private volatile boolean stopped;
+
@Override
public void start() {
+ log.info("start lifecycle executor: {}", super.toString());
running = true;
+ doStart();
}
+ protected abstract void doStart();
+
@Override
- public void stop() {
+ public final void stop() {
+ if (stopped) {
+ return;
+ }
log.info("stop lifecycle executor: {}", super.toString());
running = false;
+ doStop();
+ stopped = true;
}
+ protected abstract void doStop();
+
@Override
public final void run() {
start();
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/Importer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/Importer.java
index 1639ced..2ecb413 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/Importer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/Importer.java
@@ -23,9 +23,4 @@ import org.apache.shardingsphere.data.pipeline.api.executor.LifecycleExecutor;
* Importer.
*/
public interface Importer extends LifecycleExecutor {
-
- /**
- * Write data to channel.
- */
- void write();
}
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporter.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporter.java
index e5a6349..214ad50 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporter.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporter.java
@@ -28,10 +28,6 @@ public final class FixtureImporter implements Importer {
}
@Override
- public void write() {
- }
-
- @Override
public void start() {
}
@@ -42,6 +38,5 @@ public final class FixtureImporter implements Importer {
@Override
public void run() {
start();
- write();
}
}
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureIncrementalDumper.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureIncrementalDumper.java
index 979194a..096d6b8 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureIncrementalDumper.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureIncrementalDumper.java
@@ -30,10 +30,10 @@ public final class FixtureIncrementalDumper extends AbstractIncrementalDumper<Fi
}
@Override
- public void start() {
+ protected void doStart() {
}
@Override
- public void stop() {
+ protected void doStop() {
}
}