You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2022/01/27 08:36:01 UTC

[shardingsphere] branch master updated: Refactor start() and stop() of AbstractLifecycleExecutor; Refactor close() in inventory task and incremental task; Refactor Importer.write() (#15112)

This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 2f41cb0  Refactor start() and stop() of AbstractLifecycleExecutor; Refactor close() in inventory task and incremental task; Refactor Importer.write() (#15112)
2f41cb0 is described below

commit 2f41cb09fff8fa18965163a434b7fd646ca2a634
Author: Hongsheng Zhong <sa...@126.com>
AuthorDate: Thu Jan 27 16:34:54 2022 +0800

    Refactor start() and stop() of AbstractLifecycleExecutor; Refactor close() in inventory task and incremental task; Refactor Importer.write() (#15112)
---
 .../pipeline/core/execute/FinishedCheckJobExecutor.java |  8 +++++---
 .../data/pipeline/core/execute/PipelineJobExecutor.java |  8 +++++---
 .../data/pipeline/core/importer/AbstractImporter.java   | 10 ++++++----
 .../core/ingest/dumper/AbstractInventoryDumper.java     | 17 ++++++++++++++---
 .../data/pipeline/core/task/IncrementalTask.java        | 10 +++++++---
 .../data/pipeline/core/task/InventoryTask.java          |  7 +++----
 .../scenario/rulealtered/RuleAlteredJobScheduler.java   |  7 ++++---
 .../pipeline/mysql/ingest/MySQLIncrementalDumper.java   |  7 +++++--
 .../pipeline/opengauss/ingest/OpenGaussWalDumper.java   |  9 ++++++---
 .../pipeline/postgresql/ingest/PostgreSQLWalDumper.java |  8 +++++---
 .../api/executor/AbstractLifecycleExecutor.java         | 17 ++++++++++++++---
 .../data/pipeline/spi/importer/Importer.java            |  5 -----
 .../data/pipeline/core/fixture/FixtureImporter.java     |  5 -----
 .../pipeline/core/fixture/FixtureIncrementalDumper.java |  4 ++--
 14 files changed, 76 insertions(+), 46 deletions(-)

diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/FinishedCheckJobExecutor.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/FinishedCheckJobExecutor.java
index 3ea73ca..2f287e9 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/FinishedCheckJobExecutor.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/FinishedCheckJobExecutor.java
@@ -35,13 +35,15 @@ public final class FinishedCheckJobExecutor extends AbstractLifecycleExecutor {
     private static final String CRON_EXPRESSION = "0 * * * * ?";
     
     @Override
-    public void start() {
-        super.start();
-        log.info("Start finished check job executor.");
+    protected void doStart() {
         new ScheduleJobBootstrap(PipelineAPIFactory.getRegistryCenter(), new FinishedCheckJob(), createJobConfig()).schedule();
     }
     
     private JobConfiguration createJobConfig() {
         return JobConfiguration.newBuilder(JOB_NAME, 1).cron(CRON_EXPRESSION).build();
     }
+    
+    @Override
+    protected void doStop() {
+    }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
index e4a9509..76125d4 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
@@ -44,9 +44,7 @@ public final class PipelineJobExecutor extends AbstractLifecycleExecutor {
     private static final Set<String> EXECUTING_JOBS = Sets.newConcurrentHashSet();
     
     @Override
-    public void start() {
-        super.start();
-        log.info("Start scaling job executor.");
+    protected void doStart() {
         watchGovernanceRepositoryConfiguration();
     }
     
@@ -96,4 +94,8 @@ public final class PipelineJobExecutor extends AbstractLifecycleExecutor {
             log.info("{} added to executing jobs failed since it already exists", jobConfigPOJO.getJobName());
         }
     }
+    
+    @Override
+    protected void doStop() {
+    }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporter.java
index ff0d17c..bb154a2 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporter.java
@@ -76,13 +76,11 @@ public abstract class AbstractImporter extends AbstractLifecycleExecutor impleme
     protected abstract PipelineSQLBuilder createSQLBuilder(Map<String, Set<String>> shardingColumnsMap);
     
     @Override
-    public final void start() {
-        super.start();
+    protected void doStart() {
         write();
     }
     
-    @Override
-    public final void write() {
+    private void write() {
         log.info("importer write");
         int round = 1;
         int rowCount = 0;
@@ -218,4 +216,8 @@ public abstract class AbstractImporter extends AbstractLifecycleExecutor impleme
             ps.executeBatch();
         }
     }
+    
+    @Override
+    protected void doStop() {
+    }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
index 4819a1b..dbb54ae 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
@@ -88,8 +88,7 @@ public abstract class AbstractInventoryDumper extends AbstractLifecycleExecutor
     }
     
     @Override
-    public final void start() {
-        super.start();
+    protected void doStart() {
         dump();
     }
     
@@ -103,6 +102,10 @@ public abstract class AbstractInventoryDumper extends AbstractLifecycleExecutor
             Optional<Number> maxUniqueKeyValue;
             while ((maxUniqueKeyValue = dump0(conn, sql, startUniqueKeyValue, round++)).isPresent()) {
                 startUniqueKeyValue = maxUniqueKeyValue.get();
+                if (!isRunning()) {
+                    log.info("inventory dump, running is false, break");
+                    break;
+                }
             }
             log.info("inventory dump done, round={}, maxUniqueKeyValue={}", round, maxUniqueKeyValue);
         } catch (final SQLException ex) {
@@ -132,7 +135,7 @@ public abstract class AbstractInventoryDumper extends AbstractLifecycleExecutor
                 ResultSetMetaData metaData = resultSet.getMetaData();
                 int rowCount = 0;
                 Number maxUniqueKeyValue = null;
-                while (isRunning() && resultSet.next()) {
+                while (resultSet.next()) {
                     DataRecord record = new DataRecord(newPosition(resultSet), metaData.getColumnCount());
                     record.setType(IngestDataChangeType.INSERT);
                     record.setTableName(inventoryDumperConfig.getTableNameMap().get(inventoryDumperConfig.getTableName()));
@@ -146,6 +149,10 @@ public abstract class AbstractInventoryDumper extends AbstractLifecycleExecutor
                     }
                     pushRecord(record);
                     rowCount++;
+                    if (!isRunning()) {
+                        log.info("dump, running is false, break");
+                        break;
+                    }
                 }
                 if (log.isDebugEnabled()) {
                     log.debug("dump, round={}, rowCount={}, maxUniqueKeyValue={}", round, rowCount, maxUniqueKeyValue);
@@ -193,4 +200,8 @@ public abstract class AbstractInventoryDumper extends AbstractLifecycleExecutor
     private void pushRecord(final Record record) {
         channel.pushRecord(record);
     }
+    
+    @Override
+    protected void doStop() {
+    }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
index 4c967f2..166e689 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
@@ -48,7 +48,7 @@ import java.util.concurrent.Future;
  */
 @Slf4j
 @ToString(exclude = {"incrementalDumperExecuteEngine", "dataSourceManager", "dumper", "progress"})
-public final class IncrementalTask extends AbstractLifecycleExecutor implements PipelineTask {
+public final class IncrementalTask extends AbstractLifecycleExecutor implements PipelineTask, AutoCloseable {
     
     @Getter
     private final String taskId;
@@ -81,7 +81,7 @@ public final class IncrementalTask extends AbstractLifecycleExecutor implements
     }
     
     @Override
-    public void start() {
+    protected void doStart() {
         progress.getIncrementalTaskDelay().setLatestActiveTimeMillis(System.currentTimeMillis());
         Future<?> future = incrementalDumperExecuteEngine.submitAll(importers, getExecuteCallback());
         dumper.start();
@@ -133,11 +133,15 @@ public final class IncrementalTask extends AbstractLifecycleExecutor implements
     }
     
     @Override
-    public void stop() {
+    protected void doStop() {
         dumper.stop();
         for (Importer each : importers) {
             each.stop();
         }
+    }
+    
+    @Override
+    public void close() {
         channel.close();
     }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
index e02c4a1..ffd0f97 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
@@ -82,7 +82,7 @@ public final class InventoryTask extends AbstractLifecycleExecutor implements Pi
     }
     
     @Override
-    public void start() {
+    protected void doStart() {
         Future<?> future = importerExecuteEngine.submit(importer, new ExecuteCallback() {
             
             @Override
@@ -131,10 +131,9 @@ public final class InventoryTask extends AbstractLifecycleExecutor implements Pi
     }
     
     @Override
-    public void stop() {
+    protected void doStop() {
         dumper.stop();
         importer.stop();
-        channel.close();
     }
     
     @Override
@@ -144,6 +143,6 @@ public final class InventoryTask extends AbstractLifecycleExecutor implements Pi
     
     @Override
     public void close() {
-        dataSourceManager.close();
+        channel.close();
     }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
index d7011b2..e48fc0e 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
@@ -25,7 +25,6 @@ import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
 import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
 import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
-import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
 
 /**
  * Rule altered job scheduler.
@@ -58,13 +57,15 @@ public final class RuleAlteredJobScheduler implements Runnable {
                 jobPreparer.cleanup(jobContext);
             }
         }
-        for (PipelineTask each : jobContext.getInventoryTasks()) {
+        for (InventoryTask each : jobContext.getInventoryTasks()) {
             log.info("stop inventory task {} - {}", jobContext.getJobId(), each.getTaskId());
             each.stop();
+            each.close();
         }
-        for (PipelineTask each : jobContext.getIncrementalTasks()) {
+        for (IncrementalTask each : jobContext.getIncrementalTasks()) {
             log.info("stop incremental task {} - {}", jobContext.getJobId(), each.getTaskId());
             each.stop();
+            each.close();
         }
         jobContext.close();
     }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
index aac2245..5395875 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
@@ -88,8 +88,7 @@ public final class MySQLIncrementalDumper extends AbstractIncrementalDumper<Binl
     }
     
     @Override
-    public void start() {
-        super.start();
+    protected void doStart() {
         dump();
     }
     
@@ -197,4 +196,8 @@ public final class MySQLIncrementalDumper extends AbstractIncrementalDumper<Binl
     private void pushRecord(final Record record) {
         channel.pushRecord(record);
     }
+    
+    @Override
+    protected void doStop() {
+    }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
index ad83280..5c0c601 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
@@ -74,11 +74,10 @@ public final class OpenGaussWalDumper extends AbstractLifecycleExecutor implemen
     }
     
     @Override
-    public void start() {
-        super.start();
+    protected void doStart() {
         dump();
     }
-
+    
     private PgConnection getReplicationConn() throws SQLException {
         return logicalReplication
                 .createPgConnection((StandardPipelineDataSourceConfiguration) dumperConfig.getDataSourceConfig())
@@ -131,5 +130,9 @@ public final class OpenGaussWalDumper extends AbstractLifecycleExecutor implemen
     private void pushRecord(final Record record) {
         channel.pushRecord(record);
     }
+    
+    @Override
+    protected void doStop() {
+    }
 }
 
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java
index 7f81e63..7a0acd7 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java
@@ -70,8 +70,7 @@ public final class PostgreSQLWalDumper extends AbstractIncrementalDumper<WalPosi
     }
     
     @Override
-    public void start() {
-        super.start();
+    protected void doStart() {
         dump();
     }
     
@@ -101,5 +100,8 @@ public final class PostgreSQLWalDumper extends AbstractIncrementalDumper<WalPosi
     private void pushRecord(final Record record) {
         channel.pushRecord(record);
     }
+    
+    @Override
+    protected void doStop() {
+    }
 }
-
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/executor/AbstractLifecycleExecutor.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/executor/AbstractLifecycleExecutor.java
index e358bf7..a71d863 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/executor/AbstractLifecycleExecutor.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/executor/AbstractLifecycleExecutor.java
@@ -25,8 +25,6 @@ import lombok.extern.slf4j.Slf4j;
 /**
  * Abstract lifecycle executor.
  */
-@Getter
-@Setter
 @Slf4j
 public abstract class AbstractLifecycleExecutor implements LifecycleExecutor {
     
@@ -34,17 +32,30 @@ public abstract class AbstractLifecycleExecutor implements LifecycleExecutor {
     @Getter(AccessLevel.PROTECTED)
     private volatile boolean running;
     
+    private volatile boolean stopped;
+    
     @Override
     public void start() {
+        log.info("start lifecycle executor: {}", super.toString());
         running = true;
+        doStart();
     }
     
+    protected abstract void doStart();
+    
     @Override
-    public void stop() {
+    public final void stop() {
+        if (stopped) {
+            return;
+        }
         log.info("stop lifecycle executor: {}", super.toString());
         running = false;
+        doStop();
+        stopped = true;
     }
     
+    protected abstract void doStop();
+    
     @Override
     public final void run() {
         start();
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/Importer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/Importer.java
index 1639ced..2ecb413 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/Importer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/Importer.java
@@ -23,9 +23,4 @@ import org.apache.shardingsphere.data.pipeline.api.executor.LifecycleExecutor;
  * Importer.
  */
 public interface Importer extends LifecycleExecutor {
-    
-    /**
-     * Write data to channel.
-     */
-    void write();
 }
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporter.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporter.java
index e5a6349..214ad50 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporter.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporter.java
@@ -28,10 +28,6 @@ public final class FixtureImporter implements Importer {
     }
     
     @Override
-    public void write() {
-    }
-    
-    @Override
     public void start() {
     }
     
@@ -42,6 +38,5 @@ public final class FixtureImporter implements Importer {
     @Override
     public void run() {
         start();
-        write();
     }
 }
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureIncrementalDumper.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureIncrementalDumper.java
index 979194a..096d6b8 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureIncrementalDumper.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureIncrementalDumper.java
@@ -30,10 +30,10 @@ public final class FixtureIncrementalDumper extends AbstractIncrementalDumper<Fi
     }
     
     @Override
-    public void start() {
+    protected void doStart() {
     }
     
     @Override
-    public void stop() {
+    protected void doStop() {
     }
 }