You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2022/10/03 13:49:56 UTC

[shardingsphere] branch master updated: Remove AbstractIncrementalDumper (#21330)

This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 67ca6947f89 Remove AbstractIncrementalDumper (#21330)
67ca6947f89 is described below

commit 67ca6947f89ce49debc6ed708aa619848327be6f
Author: Liang Zhang <zh...@apache.org>
AuthorDate: Mon Oct 3 21:49:46 2022 +0800

    Remove AbstractIncrementalDumper (#21330)
---
 .../ingest/dumper/AbstractIncrementalDumper.java   | 37 ---------------
 .../datasource/AbstractDataSourcePreparer.java     |  1 +
 .../mysql/ingest/MySQLIncrementalDumper.java       | 54 +++++++++-------------
 .../opengauss/ingest/OpenGaussWalDumper.java       | 18 ++------
 .../postgresql/ingest/PostgreSQLWalDumper.java     | 18 ++------
 .../core/fixture/FixtureIncrementalDumper.java     | 15 ++----
 .../fixture/FixtureIncrementalDumperCreator.java   |  2 +-
 7 files changed, 34 insertions(+), 111 deletions(-)

diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractIncrementalDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractIncrementalDumper.java
deleted file mode 100644
index bd396dac9cb..00000000000
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractIncrementalDumper.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.ingest.dumper;
-
-import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
-import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
-import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper;
-import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
-import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
-
-/**
- * Abstract incremental dumper.
- *
- * @param <P> generic type of {@linkplain IngestPosition}
- */
-public abstract class AbstractIncrementalDumper<P> extends AbstractLifecycleExecutor implements IncrementalDumper {
-    
-    public AbstractIncrementalDumper(final DumperConfiguration dumperConfig, final IngestPosition<P> position,
-                                     final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) {
-    }
-}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
index 13a134fcd9b..c41301f1190 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
@@ -83,6 +83,7 @@ public abstract class AbstractDataSourcePreparer implements DataSourcePreparer {
                 statement.execute(sql);
             }
         } catch (final SQLException ignored) {
+            // TODO should not ignore the exception, if do not catch it, the scaling IT will fail.
         }
     }
     
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
index 9d3cee95a05..c74e7ab13ff 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
@@ -22,20 +22,20 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlJdbcConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
 import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord;
-import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
 import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
 import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
 import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
 import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
-import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.AbstractIncrementalDumper;
 import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.BinlogPosition;
 import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.AbstractBinlogEvent;
 import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.AbstractRowsEvent;
@@ -59,7 +59,7 @@ import java.util.Optional;
  * MySQL incremental dumper.
  */
 @Slf4j
-public final class MySQLIncrementalDumper extends AbstractIncrementalDumper<BinlogPosition> {
+public final class MySQLIncrementalDumper extends AbstractLifecycleExecutor implements IncrementalDumper {
     
     private final DumperConfiguration dumperConfig;
     
@@ -75,7 +75,6 @@ public final class MySQLIncrementalDumper extends AbstractIncrementalDumper<Binl
     
     public MySQLIncrementalDumper(final DumperConfiguration dumperConfig, final IngestPosition<BinlogPosition> binlogPosition,
                                   final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) {
-        super(dumperConfig, binlogPosition, channel, metaDataLoader);
         Preconditions.checkArgument(dumperConfig.getDataSourceConfig() instanceof StandardPipelineDataSourceConfiguration, "MySQLBinlogDumper only support StandardPipelineDataSourceConfiguration");
         this.dumperConfig = dumperConfig;
         this.binlogPosition = (BinlogPosition) binlogPosition;
@@ -90,10 +89,6 @@ public final class MySQLIncrementalDumper extends AbstractIncrementalDumper<Binl
     
     @Override
     protected void runBlocking() {
-        dump();
-    }
-    
-    private void dump() {
         client.connect();
         client.subscribe(binlogPosition.getFilename(), binlogPosition.getPosition());
         int eventCount = 0;
@@ -105,11 +100,11 @@ public final class MySQLIncrementalDumper extends AbstractIncrementalDumper<Binl
             eventCount += handleEvent(event);
         }
         log.info("incremental dump, eventCount={}", eventCount);
-        pushRecord(new FinishedRecord(new PlaceholderPosition()));
+        channel.pushRecord(new FinishedRecord(new PlaceholderPosition()));
     }
     
     private int handleEvent(final AbstractBinlogEvent event) {
-        if (event instanceof PlaceholderEvent || filter(catalog, (AbstractRowsEvent) event)) {
+        if (event instanceof PlaceholderEvent || !((AbstractRowsEvent) event).getDatabaseName().equals(catalog) || !dumperConfig.containsTable(((AbstractRowsEvent) event).getTableName())) {
             createPlaceholderRecord(event);
             return 0;
         }
@@ -117,21 +112,28 @@ public final class MySQLIncrementalDumper extends AbstractIncrementalDumper<Binl
             PipelineTableMetaData tableMetaData = getPipelineTableMetaData(((WriteRowsEvent) event).getTableName());
             handleWriteRowsEvent((WriteRowsEvent) event, tableMetaData);
             return 1;
-        } else if (event instanceof UpdateRowsEvent) {
+        }
+        if (event instanceof UpdateRowsEvent) {
             PipelineTableMetaData tableMetaData = getPipelineTableMetaData(((UpdateRowsEvent) event).getTableName());
             handleUpdateRowsEvent((UpdateRowsEvent) event, tableMetaData);
             return 1;
-        } else if (event instanceof DeleteRowsEvent) {
+        }
+        if (event instanceof DeleteRowsEvent) {
             PipelineTableMetaData tableMetaData = getPipelineTableMetaData(((DeleteRowsEvent) event).getTableName());
             handleDeleteRowsEvent((DeleteRowsEvent) event, tableMetaData);
             return 1;
-        } else {
-            return 0;
         }
+        return 0;
+    }
+    
+    private void createPlaceholderRecord(final AbstractBinlogEvent event) {
+        PlaceholderRecord record = new PlaceholderRecord(new BinlogPosition(event.getFileName(), event.getPosition(), event.getServerId()));
+        record.setCommitTime(event.getTimestamp() * 1000L);
+        channel.pushRecord(record);
     }
     
-    private boolean filter(final String database, final AbstractRowsEvent event) {
-        return !event.getDatabaseName().equals(database) || !dumperConfig.containsTable(event.getTableName());
+    private PipelineTableMetaData getPipelineTableMetaData(final String actualTableName) {
+        return metaDataLoader.getTableMetaData(dumperConfig.getSchemaName(new ActualTableName(actualTableName)), actualTableName);
     }
     
     private void handleWriteRowsEvent(final WriteRowsEvent event, final PipelineTableMetaData tableMetaData) {
@@ -142,14 +144,10 @@ public final class MySQLIncrementalDumper extends AbstractIncrementalDumper<Binl
                 PipelineColumnMetaData columnMetaData = tableMetaData.getColumnMetaData(i + 1);
                 record.addColumn(new Column(columnMetaData.getName(), handleValue(columnMetaData, each[i]), true, tableMetaData.getColumnMetaData(i + 1).isUniqueKey()));
             }
-            pushRecord(record);
+            channel.pushRecord(record);
         }
     }
     
-    private PipelineTableMetaData getPipelineTableMetaData(final String actualTableName) {
-        return metaDataLoader.getTableMetaData(dumperConfig.getSchemaName(new ActualTableName(actualTableName)), actualTableName);
-    }
-    
     private void handleUpdateRowsEvent(final UpdateRowsEvent event, final PipelineTableMetaData tableMetaData) {
         for (int i = 0; i < event.getBeforeRows().size(); i++) {
             Serializable[] beforeValues = event.getBeforeRows().get(i);
@@ -165,7 +163,7 @@ public final class MySQLIncrementalDumper extends AbstractIncrementalDumper<Binl
                         (columnMetaData.isPrimaryKey() && updated) ? handleValue(columnMetaData, oldValue) : null,
                         handleValue(columnMetaData, newValue), updated, columnMetaData.isPrimaryKey()));
             }
-            pushRecord(record);
+            channel.pushRecord(record);
         }
     }
     
@@ -177,7 +175,7 @@ public final class MySQLIncrementalDumper extends AbstractIncrementalDumper<Binl
                 PipelineColumnMetaData columnMetaData = tableMetaData.getColumnMetaData(i + 1);
                 record.addColumn(new Column(columnMetaData.getName(), handleValue(columnMetaData, each[i]), true, tableMetaData.getColumnMetaData(i + 1).isUniqueKey()));
             }
-            pushRecord(record);
+            channel.pushRecord(record);
         }
     }
     
@@ -193,16 +191,6 @@ public final class MySQLIncrementalDumper extends AbstractIncrementalDumper<Binl
         return result;
     }
     
-    private void createPlaceholderRecord(final AbstractBinlogEvent event) {
-        PlaceholderRecord record = new PlaceholderRecord(new BinlogPosition(event.getFileName(), event.getPosition(), event.getServerId()));
-        record.setCommitTime(event.getTimestamp() * 1000);
-        pushRecord(record);
-    }
-    
-    private void pushRecord(final Record record) {
-        channel.pushRecord(record);
-    }
-    
     @Override
     protected void doStop() {
         if (null != client) {
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
index 7660c49d3ca..247a48bab72 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
@@ -20,11 +20,11 @@ package org.apache.shardingsphere.data.pipeline.opengauss.ingest;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
 import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
-import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
 import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
-import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.AbstractIncrementalDumper;
 import org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
 import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
 import org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.OpenGaussLogicalReplication;
@@ -47,7 +47,7 @@ import java.sql.SQLException;
  * WAL dumper of openGauss.
  */
 @Slf4j
-public final class OpenGaussWalDumper extends AbstractIncrementalDumper<WalPosition> {
+public final class OpenGaussWalDumper extends AbstractLifecycleExecutor implements IncrementalDumper {
     
     private final DumperConfiguration dumperConfig;
     
@@ -61,7 +61,6 @@ public final class OpenGaussWalDumper extends AbstractIncrementalDumper<WalPosit
     
     public OpenGaussWalDumper(final DumperConfiguration dumperConfig, final IngestPosition<WalPosition> position,
                               final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) {
-        super(dumperConfig, position, channel, metaDataLoader);
         ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(dumperConfig.getDataSourceConfig().getClass()),
                 () -> new UnsupportedSQLOperationException("PostgreSQLWalDumper only support PipelineDataSourceConfiguration"));
         this.dumperConfig = dumperConfig;
@@ -73,10 +72,6 @@ public final class OpenGaussWalDumper extends AbstractIncrementalDumper<WalPosit
     
     @Override
     protected void runBlocking() {
-        dump();
-    }
-    
-    private void dump() {
         PGReplicationStream stream = null;
         try (PgConnection connection = getReplicationConnectionUnwrap()) {
             stream = logicalReplication.createReplicationStream(connection, walPosition.getLogSequenceNumber(), OpenGaussPositionInitializer.getUniqueSlotName(connection, dumperConfig.getJobId()));
@@ -88,8 +83,7 @@ public final class OpenGaussWalDumper extends AbstractIncrementalDumper<WalPosit
                     continue;
                 }
                 AbstractWalEvent event = decodingPlugin.decode(message, new OpenGaussLogSequenceNumber(stream.getLastReceiveLSN()));
-                Record record = walEventConverter.convert(event);
-                pushRecord(record);
+                channel.pushRecord(walEventConverter.convert(event));
             }
         } catch (final SQLException ex) {
             throw new IngestException(ex);
@@ -107,10 +101,6 @@ public final class OpenGaussWalDumper extends AbstractIncrementalDumper<WalPosit
         return logicalReplication.createConnection((StandardPipelineDataSourceConfiguration) dumperConfig.getDataSourceConfig()).unwrap(PgConnection.class);
     }
     
-    private void pushRecord(final Record record) {
-        channel.pushRecord(record);
-    }
-    
     @Override
     protected void doStop() {
     }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java
index ab97b500973..9a30a945909 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java
@@ -20,11 +20,11 @@ package org.apache.shardingsphere.data.pipeline.postgresql.ingest;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
 import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
-import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
 import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
-import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.AbstractIncrementalDumper;
 import org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
 import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.LogicalReplication;
@@ -48,7 +48,7 @@ import java.sql.SQLException;
  * PostgreSQL WAL dumper.
  */
 @Slf4j
-public final class PostgreSQLWalDumper extends AbstractIncrementalDumper<WalPosition> {
+public final class PostgreSQLWalDumper extends AbstractLifecycleExecutor implements IncrementalDumper {
     
     private final DumperConfiguration dumperConfig;
     
@@ -62,7 +62,6 @@ public final class PostgreSQLWalDumper extends AbstractIncrementalDumper<WalPosi
     
     public PostgreSQLWalDumper(final DumperConfiguration dumperConfig, final IngestPosition<WalPosition> position,
                                final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) {
-        super(dumperConfig, position, channel, metaDataLoader);
         ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(dumperConfig.getDataSourceConfig().getClass()),
                 () -> new UnsupportedSQLOperationException("PostgreSQLWalDumper only support PipelineDataSourceConfiguration"));
         this.dumperConfig = dumperConfig;
@@ -74,10 +73,6 @@ public final class PostgreSQLWalDumper extends AbstractIncrementalDumper<WalPosi
     
     @Override
     protected void runBlocking() {
-        dump();
-    }
-    
-    private void dump() {
         // TODO use unified PgConnection
         try (
                 Connection connection = logicalReplication.createConnection((StandardPipelineDataSourceConfiguration) dumperConfig.getDataSourceConfig());
@@ -92,18 +87,13 @@ public final class PostgreSQLWalDumper extends AbstractIncrementalDumper<WalPosi
                     continue;
                 }
                 AbstractWalEvent event = decodingPlugin.decode(message, new PostgreSQLLogSequenceNumber(stream.getLastReceiveLSN()));
-                Record record = walEventConverter.convert(event);
-                pushRecord(record);
+                channel.pushRecord(walEventConverter.convert(event));
             }
         } catch (final SQLException ex) {
             throw new IngestException(ex);
         }
     }
     
-    private void pushRecord(final Record record) {
-        channel.pushRecord(record);
-    }
-    
     @Override
     protected void doStop() {
     }
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureIncrementalDumper.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureIncrementalDumper.java
index 24c9d40189f..50b922cc0e0 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureIncrementalDumper.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureIncrementalDumper.java
@@ -17,19 +17,10 @@
 
 package org.apache.shardingsphere.data.pipeline.core.fixture;
 
-import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
-import org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition;
-import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
-import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
-import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.AbstractIncrementalDumper;
+import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
+import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper;
 
-public final class FixtureIncrementalDumper extends AbstractIncrementalDumper<FinishedPosition> {
-    
-    public FixtureIncrementalDumper(final DumperConfiguration dumperConfig, final IngestPosition<FinishedPosition> position,
-                                    final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) {
-        super(dumperConfig, position, channel, metaDataLoader);
-    }
+public final class FixtureIncrementalDumper extends AbstractLifecycleExecutor implements IncrementalDumper {
     
     @Override
     protected void runBlocking() {
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureIncrementalDumperCreator.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureIncrementalDumperCreator.java
index b0152f36f90..168bc9bd21e 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureIncrementalDumperCreator.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureIncrementalDumperCreator.java
@@ -39,7 +39,7 @@ public final class FixtureIncrementalDumperCreator implements IncrementalDumperC
     @Override
     public IncrementalDumper createIncrementalDumper(final DumperConfiguration dumperConfig, final IngestPosition<FinishedPosition> position,
                                                      final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) {
-        return new FixtureIncrementalDumper(dumperConfig, position, channel, metaDataLoader);
+        return new FixtureIncrementalDumper();
     }
     
     @Override