You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2022/10/03 13:49:56 UTC
[shardingsphere] branch master updated: Remove AbstractIncrementalDumper (#21330)
This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 67ca6947f89 Remove AbstractIncrementalDumper (#21330)
67ca6947f89 is described below
commit 67ca6947f89ce49debc6ed708aa619848327be6f
Author: Liang Zhang <zh...@apache.org>
AuthorDate: Mon Oct 3 21:49:46 2022 +0800
Remove AbstractIncrementalDumper (#21330)
---
.../ingest/dumper/AbstractIncrementalDumper.java | 37 ---------------
.../datasource/AbstractDataSourcePreparer.java | 1 +
.../mysql/ingest/MySQLIncrementalDumper.java | 54 +++++++++-------------
.../opengauss/ingest/OpenGaussWalDumper.java | 18 ++------
.../postgresql/ingest/PostgreSQLWalDumper.java | 18 ++------
.../core/fixture/FixtureIncrementalDumper.java | 15 ++----
.../fixture/FixtureIncrementalDumperCreator.java | 2 +-
7 files changed, 34 insertions(+), 111 deletions(-)
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractIncrementalDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractIncrementalDumper.java
deleted file mode 100644
index bd396dac9cb..00000000000
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractIncrementalDumper.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.ingest.dumper;
-
-import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
-import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
-import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper;
-import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
-import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
-
-/**
- * Abstract incremental dumper.
- *
- * @param <P> generic type of {@linkplain IngestPosition}
- */
-public abstract class AbstractIncrementalDumper<P> extends AbstractLifecycleExecutor implements IncrementalDumper {
-
- public AbstractIncrementalDumper(final DumperConfiguration dumperConfig, final IngestPosition<P> position,
- final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) {
- }
-}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
index 13a134fcd9b..c41301f1190 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
@@ -83,6 +83,7 @@ public abstract class AbstractDataSourcePreparer implements DataSourcePreparer {
statement.execute(sql);
}
} catch (final SQLException ignored) {
+ // TODO should not ignore the exception, if do not catch it, the scaling IT will fail.
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
index 9d3cee95a05..c74e7ab13ff 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
@@ -22,20 +22,20 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlJdbcConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord;
-import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
-import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.AbstractIncrementalDumper;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.BinlogPosition;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.AbstractBinlogEvent;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.AbstractRowsEvent;
@@ -59,7 +59,7 @@ import java.util.Optional;
* MySQL incremental dumper.
*/
@Slf4j
-public final class MySQLIncrementalDumper extends AbstractIncrementalDumper<BinlogPosition> {
+public final class MySQLIncrementalDumper extends AbstractLifecycleExecutor implements IncrementalDumper {
private final DumperConfiguration dumperConfig;
@@ -75,7 +75,6 @@ public final class MySQLIncrementalDumper extends AbstractIncrementalDumper<Binl
public MySQLIncrementalDumper(final DumperConfiguration dumperConfig, final IngestPosition<BinlogPosition> binlogPosition,
final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) {
- super(dumperConfig, binlogPosition, channel, metaDataLoader);
Preconditions.checkArgument(dumperConfig.getDataSourceConfig() instanceof StandardPipelineDataSourceConfiguration, "MySQLBinlogDumper only support StandardPipelineDataSourceConfiguration");
this.dumperConfig = dumperConfig;
this.binlogPosition = (BinlogPosition) binlogPosition;
@@ -90,10 +89,6 @@ public final class MySQLIncrementalDumper extends AbstractIncrementalDumper<Binl
@Override
protected void runBlocking() {
- dump();
- }
-
- private void dump() {
client.connect();
client.subscribe(binlogPosition.getFilename(), binlogPosition.getPosition());
int eventCount = 0;
@@ -105,11 +100,11 @@ public final class MySQLIncrementalDumper extends AbstractIncrementalDumper<Binl
eventCount += handleEvent(event);
}
log.info("incremental dump, eventCount={}", eventCount);
- pushRecord(new FinishedRecord(new PlaceholderPosition()));
+ channel.pushRecord(new FinishedRecord(new PlaceholderPosition()));
}
private int handleEvent(final AbstractBinlogEvent event) {
- if (event instanceof PlaceholderEvent || filter(catalog, (AbstractRowsEvent) event)) {
+ if (event instanceof PlaceholderEvent || !((AbstractRowsEvent) event).getDatabaseName().equals(catalog) || !dumperConfig.containsTable(((AbstractRowsEvent) event).getTableName())) {
createPlaceholderRecord(event);
return 0;
}
@@ -117,21 +112,28 @@ public final class MySQLIncrementalDumper extends AbstractIncrementalDumper<Binl
PipelineTableMetaData tableMetaData = getPipelineTableMetaData(((WriteRowsEvent) event).getTableName());
handleWriteRowsEvent((WriteRowsEvent) event, tableMetaData);
return 1;
- } else if (event instanceof UpdateRowsEvent) {
+ }
+ if (event instanceof UpdateRowsEvent) {
PipelineTableMetaData tableMetaData = getPipelineTableMetaData(((UpdateRowsEvent) event).getTableName());
handleUpdateRowsEvent((UpdateRowsEvent) event, tableMetaData);
return 1;
- } else if (event instanceof DeleteRowsEvent) {
+ }
+ if (event instanceof DeleteRowsEvent) {
PipelineTableMetaData tableMetaData = getPipelineTableMetaData(((DeleteRowsEvent) event).getTableName());
handleDeleteRowsEvent((DeleteRowsEvent) event, tableMetaData);
return 1;
- } else {
- return 0;
}
+ return 0;
+ }
+
+ private void createPlaceholderRecord(final AbstractBinlogEvent event) {
+ PlaceholderRecord record = new PlaceholderRecord(new BinlogPosition(event.getFileName(), event.getPosition(), event.getServerId()));
+ record.setCommitTime(event.getTimestamp() * 1000L);
+ channel.pushRecord(record);
}
- private boolean filter(final String database, final AbstractRowsEvent event) {
- return !event.getDatabaseName().equals(database) || !dumperConfig.containsTable(event.getTableName());
+ private PipelineTableMetaData getPipelineTableMetaData(final String actualTableName) {
+ return metaDataLoader.getTableMetaData(dumperConfig.getSchemaName(new ActualTableName(actualTableName)), actualTableName);
}
private void handleWriteRowsEvent(final WriteRowsEvent event, final PipelineTableMetaData tableMetaData) {
@@ -142,14 +144,10 @@ public final class MySQLIncrementalDumper extends AbstractIncrementalDumper<Binl
PipelineColumnMetaData columnMetaData = tableMetaData.getColumnMetaData(i + 1);
record.addColumn(new Column(columnMetaData.getName(), handleValue(columnMetaData, each[i]), true, tableMetaData.getColumnMetaData(i + 1).isUniqueKey()));
}
- pushRecord(record);
+ channel.pushRecord(record);
}
}
- private PipelineTableMetaData getPipelineTableMetaData(final String actualTableName) {
- return metaDataLoader.getTableMetaData(dumperConfig.getSchemaName(new ActualTableName(actualTableName)), actualTableName);
- }
-
private void handleUpdateRowsEvent(final UpdateRowsEvent event, final PipelineTableMetaData tableMetaData) {
for (int i = 0; i < event.getBeforeRows().size(); i++) {
Serializable[] beforeValues = event.getBeforeRows().get(i);
@@ -165,7 +163,7 @@ public final class MySQLIncrementalDumper extends AbstractIncrementalDumper<Binl
(columnMetaData.isPrimaryKey() && updated) ? handleValue(columnMetaData, oldValue) : null,
handleValue(columnMetaData, newValue), updated, columnMetaData.isPrimaryKey()));
}
- pushRecord(record);
+ channel.pushRecord(record);
}
}
@@ -177,7 +175,7 @@ public final class MySQLIncrementalDumper extends AbstractIncrementalDumper<Binl
PipelineColumnMetaData columnMetaData = tableMetaData.getColumnMetaData(i + 1);
record.addColumn(new Column(columnMetaData.getName(), handleValue(columnMetaData, each[i]), true, tableMetaData.getColumnMetaData(i + 1).isUniqueKey()));
}
- pushRecord(record);
+ channel.pushRecord(record);
}
}
@@ -193,16 +191,6 @@ public final class MySQLIncrementalDumper extends AbstractIncrementalDumper<Binl
return result;
}
- private void createPlaceholderRecord(final AbstractBinlogEvent event) {
- PlaceholderRecord record = new PlaceholderRecord(new BinlogPosition(event.getFileName(), event.getPosition(), event.getServerId()));
- record.setCommitTime(event.getTimestamp() * 1000);
- pushRecord(record);
- }
-
- private void pushRecord(final Record record) {
- channel.pushRecord(record);
- }
-
@Override
protected void doStop() {
if (null != client) {
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
index 7660c49d3ca..247a48bab72 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
@@ -20,11 +20,11 @@ package org.apache.shardingsphere.data.pipeline.opengauss.ingest;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
-import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
-import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.AbstractIncrementalDumper;
import org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
import org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.OpenGaussLogicalReplication;
@@ -47,7 +47,7 @@ import java.sql.SQLException;
* WAL dumper of openGauss.
*/
@Slf4j
-public final class OpenGaussWalDumper extends AbstractIncrementalDumper<WalPosition> {
+public final class OpenGaussWalDumper extends AbstractLifecycleExecutor implements IncrementalDumper {
private final DumperConfiguration dumperConfig;
@@ -61,7 +61,6 @@ public final class OpenGaussWalDumper extends AbstractIncrementalDumper<WalPosit
public OpenGaussWalDumper(final DumperConfiguration dumperConfig, final IngestPosition<WalPosition> position,
final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) {
- super(dumperConfig, position, channel, metaDataLoader);
ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(dumperConfig.getDataSourceConfig().getClass()),
() -> new UnsupportedSQLOperationException("PostgreSQLWalDumper only support PipelineDataSourceConfiguration"));
this.dumperConfig = dumperConfig;
@@ -73,10 +72,6 @@ public final class OpenGaussWalDumper extends AbstractIncrementalDumper<WalPosit
@Override
protected void runBlocking() {
- dump();
- }
-
- private void dump() {
PGReplicationStream stream = null;
try (PgConnection connection = getReplicationConnectionUnwrap()) {
stream = logicalReplication.createReplicationStream(connection, walPosition.getLogSequenceNumber(), OpenGaussPositionInitializer.getUniqueSlotName(connection, dumperConfig.getJobId()));
@@ -88,8 +83,7 @@ public final class OpenGaussWalDumper extends AbstractIncrementalDumper<WalPosit
continue;
}
AbstractWalEvent event = decodingPlugin.decode(message, new OpenGaussLogSequenceNumber(stream.getLastReceiveLSN()));
- Record record = walEventConverter.convert(event);
- pushRecord(record);
+ channel.pushRecord(walEventConverter.convert(event));
}
} catch (final SQLException ex) {
throw new IngestException(ex);
@@ -107,10 +101,6 @@ public final class OpenGaussWalDumper extends AbstractIncrementalDumper<WalPosit
return logicalReplication.createConnection((StandardPipelineDataSourceConfiguration) dumperConfig.getDataSourceConfig()).unwrap(PgConnection.class);
}
- private void pushRecord(final Record record) {
- channel.pushRecord(record);
- }
-
@Override
protected void doStop() {
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java
index ab97b500973..9a30a945909 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java
@@ -20,11 +20,11 @@ package org.apache.shardingsphere.data.pipeline.postgresql.ingest;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
-import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
-import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.AbstractIncrementalDumper;
import org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.LogicalReplication;
@@ -48,7 +48,7 @@ import java.sql.SQLException;
* PostgreSQL WAL dumper.
*/
@Slf4j
-public final class PostgreSQLWalDumper extends AbstractIncrementalDumper<WalPosition> {
+public final class PostgreSQLWalDumper extends AbstractLifecycleExecutor implements IncrementalDumper {
private final DumperConfiguration dumperConfig;
@@ -62,7 +62,6 @@ public final class PostgreSQLWalDumper extends AbstractIncrementalDumper<WalPosi
public PostgreSQLWalDumper(final DumperConfiguration dumperConfig, final IngestPosition<WalPosition> position,
final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) {
- super(dumperConfig, position, channel, metaDataLoader);
ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(dumperConfig.getDataSourceConfig().getClass()),
() -> new UnsupportedSQLOperationException("PostgreSQLWalDumper only support PipelineDataSourceConfiguration"));
this.dumperConfig = dumperConfig;
@@ -74,10 +73,6 @@ public final class PostgreSQLWalDumper extends AbstractIncrementalDumper<WalPosi
@Override
protected void runBlocking() {
- dump();
- }
-
- private void dump() {
// TODO use unified PgConnection
try (
Connection connection = logicalReplication.createConnection((StandardPipelineDataSourceConfiguration) dumperConfig.getDataSourceConfig());
@@ -92,18 +87,13 @@ public final class PostgreSQLWalDumper extends AbstractIncrementalDumper<WalPosi
continue;
}
AbstractWalEvent event = decodingPlugin.decode(message, new PostgreSQLLogSequenceNumber(stream.getLastReceiveLSN()));
- Record record = walEventConverter.convert(event);
- pushRecord(record);
+ channel.pushRecord(walEventConverter.convert(event));
}
} catch (final SQLException ex) {
throw new IngestException(ex);
}
}
- private void pushRecord(final Record record) {
- channel.pushRecord(record);
- }
-
@Override
protected void doStop() {
}
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureIncrementalDumper.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureIncrementalDumper.java
index 24c9d40189f..50b922cc0e0 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureIncrementalDumper.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureIncrementalDumper.java
@@ -17,19 +17,10 @@
package org.apache.shardingsphere.data.pipeline.core.fixture;
-import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
-import org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition;
-import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
-import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
-import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.AbstractIncrementalDumper;
+import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
+import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper;
-public final class FixtureIncrementalDumper extends AbstractIncrementalDumper<FinishedPosition> {
-
- public FixtureIncrementalDumper(final DumperConfiguration dumperConfig, final IngestPosition<FinishedPosition> position,
- final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) {
- super(dumperConfig, position, channel, metaDataLoader);
- }
+public final class FixtureIncrementalDumper extends AbstractLifecycleExecutor implements IncrementalDumper {
@Override
protected void runBlocking() {
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureIncrementalDumperCreator.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureIncrementalDumperCreator.java
index b0152f36f90..168bc9bd21e 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureIncrementalDumperCreator.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureIncrementalDumperCreator.java
@@ -39,7 +39,7 @@ public final class FixtureIncrementalDumperCreator implements IncrementalDumperC
@Override
public IncrementalDumper createIncrementalDumper(final DumperConfiguration dumperConfig, final IngestPosition<FinishedPosition> position,
final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) {
- return new FixtureIncrementalDumper(dumperConfig, position, channel, metaDataLoader);
+ return new FixtureIncrementalDumper();
}
@Override