You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by me...@apache.org on 2022/01/28 02:21:33 UTC

[shardingsphere] branch master updated: Replace dataSourceFactory.newInstance to dataSourceManager.getDataSource for better performance and avoid possible connection leak (#15126)

This is an automated email from the ASF dual-hosted git repository.

menghaoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new f152d01  Replace dataSourceFactory.newInstance to dataSourceManager.getDataSource for better performance and avoid possible connection leak (#15126)
f152d01 is described below

commit f152d010d2e752308efc52664ddfe1f5f8248e54
Author: Hongsheng Zhong <sa...@126.com>
AuthorDate: Fri Jan 28 10:19:09 2022 +0800

    Replace dataSourceFactory.newInstance to dataSourceManager.getDataSource for better performance and avoid possible connection leak (#15126)
    
    * Refactor resetTargetTable
    
    * Remove dataSourceFactory.newInstance in AbstractSingleTableDataCalculator
    
    * Reuse dataSourceManager in incremental dumper, make sure data source will be released later
    
    * Reuse dataSourceManager in WalEventConverter, make sure data source will be released later
    
    * Move DataSourcePreparer
    
    * Reuse dataSourceManager in DataSourcePreparer
    
    * Refactor dataSourceManager for job preparer
---
 .../core/api/impl/RuleAlteredJobAPIImpl.java       |  2 +-
 .../consistency/DataConsistencyCheckerImpl.java    |  3 +--
 .../core/datasource/PipelineDataSourceFactory.java |  1 -
 .../core/datasource/PipelineDataSourceManager.java | 26 ++++++++++++++--------
 .../ingest/dumper/AbstractIncrementalDumper.java   |  4 +++-
 .../datasource/AbstractDataSourcePreparer.java     | 13 +++++------
 .../prepare/datasource}/DataSourcePreparer.java    |  4 +---
 .../datasource/PrepareTargetTablesParameter.java   |  6 ++++-
 .../AbstractSingleTableDataCalculator.java         |  9 --------
 .../data/pipeline/core/task/IncrementalTask.java   |  2 +-
 .../rulealtered/RuleAlteredJobPreparer.java        | 10 ++++-----
 .../scaling/core/job/check/EnvironmentChecker.java |  2 +-
 .../core/job/check/EnvironmentCheckerFactory.java  |  2 +-
 .../scaling/core/job/dumper/DumperFactory.java     |  9 +++++---
 .../job/environment/ScalingEnvironmentManager.java | 14 ++++++------
 .../pipeline/mysql/MySQLEnvironmentChecker.java    |  2 +-
 .../mysql/ingest/MySQLIncrementalDumper.java       |  9 ++++----
 .../datasource/MySQLDataSourcePreparer.java        | 11 +++++----
 .../mysql/ingest/MySQLIncrementalDumperTest.java   |  2 +-
 .../datasource/MySQLDataSourcePreparerTest.java    |  2 +-
 .../opengauss/ingest/OpenGaussWalDumper.java       | 12 +++++-----
 .../datasource/OpenGaussDataSourcePreparer.java    |  5 ++---
 .../postgresql/PostgreSQLEnvironmentChecker.java   |  2 +-
 .../postgresql/ingest/PostgreSQLWalDumper.java     |  8 ++++---
 .../postgresql/ingest/wal/WalEventConverter.java   |  6 ++---
 .../postgresql/ingest/PostgreSQLWalDumperTest.java |  3 ++-
 .../ingest/wal/WalEventConverterTest.java          |  2 +-
 .../config/rulealtered/HandleConfiguration.java    | 11 +++++++++
 .../core/fixture/FixtureDataSourcePreparer.java    |  4 ++--
 .../core/fixture/FixtureEnvironmentChecker.java    |  2 +-
 .../core/fixture/FixtureIncrementalDumper.java     |  6 +++--
 .../datasource/AbstractDataSourcePreparerTest.java |  1 -
 32 files changed, 106 insertions(+), 89 deletions(-)

diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
index 4655a4f..9854efa 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
@@ -322,7 +322,7 @@ public final class RuleAlteredJobAPIImpl extends AbstractPipelineJobAPIImpl impl
         log.info("Scaling job {} reset target table", jobId);
         PipelineAPIFactory.getGovernanceRepositoryAPI().deleteJobProgress(jobId);
         try {
-            new ScalingEnvironmentManager().resetTargetTable(new RuleAlteredJobContext(getJobConfig(jobId)));
+            new ScalingEnvironmentManager().cleanupTargetTables(getJobConfig(jobId));
         } catch (final SQLException ex) {
             throw new PipelineJobExecutionException("Reset target table failed for job " + jobId);
         }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImpl.java
index 9a5693c..0e83983 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImpl.java
@@ -18,7 +18,6 @@
 package org.apache.shardingsphere.data.pipeline.core.check.consistency;
 
 import com.google.common.base.Preconditions;
-import com.google.common.base.Splitter;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataCalculateParameter;
@@ -84,7 +83,7 @@ public final class DataConsistencyCheckerImpl implements DataConsistencyChecker
         this.jobConfig = jobConfig;
         ruleAlteredContext = RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
         jobId = jobConfig.getHandleConfig().getJobId();
-        logicTableNames = Splitter.on(',').splitToList(jobConfig.getHandleConfig().getLogicTables());
+        logicTableNames = jobConfig.getHandleConfig().splitLogicTableNames();
     }
     
     @Override
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceFactory.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceFactory.java
index bf4f77d..2109afc 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceFactory.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceFactory.java
@@ -38,7 +38,6 @@ public final class PipelineDataSourceFactory {
      */
     @SneakyThrows(SQLException.class)
     public PipelineDataSourceWrapper newInstance(final PipelineDataSourceConfiguration dataSourceConfig) {
-        // TODO cache and reuse, try PipelineDataSourceManager
         DataSource pipelineDataSource = PipelineDataSourceCreatorFactory.getInstance(dataSourceConfig.getType()).createPipelineDataSource(dataSourceConfig.getDataSourceConfiguration());
         return new PipelineDataSourceWrapper(pipelineDataSource, dataSourceConfig.getDatabaseType());
     }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManager.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManager.java
index 38220ed..aa3f9ce 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManager.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManager.java
@@ -49,7 +49,10 @@ public final class PipelineDataSourceManager implements AutoCloseable {
      * @param pipelineDataSourceConfig pipeline data source configuration
      */
     public void createSourceDataSource(final PipelineDataSourceConfiguration pipelineDataSourceConfig) {
-        PipelineDataSourceWrapper dataSource = dataSourceFactory.newInstance(pipelineDataSourceConfig);
+        if (cachedDataSources.containsKey(pipelineDataSourceConfig)) {
+            return;
+        }
+        PipelineDataSourceWrapper dataSource = getDataSource(pipelineDataSourceConfig);
         cachedDataSources.put(pipelineDataSourceConfig, dataSource);
         sourceDataSources.put(pipelineDataSourceConfig, dataSource);
     }
@@ -60,27 +63,31 @@ public final class PipelineDataSourceManager implements AutoCloseable {
      * @param pipelineDataSourceConfig pipeline data source configuration
      */
     public void createTargetDataSource(final PipelineDataSourceConfiguration pipelineDataSourceConfig) {
-        PipelineDataSourceWrapper dataSource = dataSourceFactory.newInstance(pipelineDataSourceConfig);
+        if (cachedDataSources.containsKey(pipelineDataSourceConfig)) {
+            return;
+        }
+        PipelineDataSourceWrapper dataSource = getDataSource(pipelineDataSourceConfig);
         cachedDataSources.put(pipelineDataSourceConfig, dataSource);
         targetDataSources.put(pipelineDataSourceConfig, dataSource);
     }
     
     /**
-     * Get data source.
+     * Get cached data source.
      *
      * @param dataSourceConfig data source configuration
      * @return data source
      */
     public PipelineDataSourceWrapper getDataSource(final PipelineDataSourceConfiguration dataSourceConfig) {
-        // TODO re-init if existing dataSource was closed
-        if (cachedDataSources.containsKey(dataSourceConfig)) {
-            return cachedDataSources.get(dataSourceConfig);
+        PipelineDataSourceWrapper result = cachedDataSources.get(dataSourceConfig);
+        if (null != result) {
+            return result;
         }
         synchronized (cachedDataSources) {
-            if (cachedDataSources.containsKey(dataSourceConfig)) {
-                return cachedDataSources.get(dataSourceConfig);
+            result = cachedDataSources.get(dataSourceConfig);
+            if (null != result) {
+                return result;
             }
-            PipelineDataSourceWrapper result = dataSourceFactory.newInstance(dataSourceConfig);
+            result = dataSourceFactory.newInstance(dataSourceConfig);
             cachedDataSources.put(dataSourceConfig, result);
             return result;
         }
@@ -100,5 +107,6 @@ public final class PipelineDataSourceManager implements AutoCloseable {
         }
         cachedDataSources.clear();
         sourceDataSources.clear();
+        targetDataSources.clear();
     }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractIncrementalDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractIncrementalDumper.java
index 6e326e2..eba47d3 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractIncrementalDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractIncrementalDumper.java
@@ -21,6 +21,7 @@ import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfigura
 import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
 import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
+import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumper;
 
 /**
@@ -30,6 +31,7 @@ import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDump
  */
 public abstract class AbstractIncrementalDumper<P> extends AbstractLifecycleExecutor implements IncrementalDumper {
     
-    public AbstractIncrementalDumper(final DumperConfiguration dumperConfig, final IngestPosition<P> position, final PipelineChannel channel) {
+    public AbstractIncrementalDumper(final DumperConfiguration dumperConfig, final IngestPosition<P> position,
+                                     final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel) {
     }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
index 9bf2f2b..dba0a18 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
@@ -23,8 +23,7 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSource
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
 import org.apache.shardingsphere.data.pipeline.api.prepare.datasource.ActualTableDefinition;
 import org.apache.shardingsphere.data.pipeline.api.prepare.datasource.TableDefinitionSQLType;
-import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
-import org.apache.shardingsphere.data.pipeline.spi.rulealtered.DataSourcePreparer;
+import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 
 import java.sql.Connection;
 import java.sql.SQLException;
@@ -48,14 +47,12 @@ public abstract class AbstractDataSourcePreparer implements DataSourcePreparer {
     
     private static final String[] IGNORE_EXCEPTION_MESSAGE = {"multiple primary keys for table", "already exists"};
     
-    private final PipelineDataSourceFactory dataSourceFactory = new PipelineDataSourceFactory();
-    
-    protected final PipelineDataSourceWrapper getSourceDataSource(final PipelineConfiguration pipelineConfig) {
-        return dataSourceFactory.newInstance(PipelineDataSourceConfigurationFactory.newInstance(pipelineConfig.getSource().getType(), pipelineConfig.getSource().getParameter()));
+    protected final PipelineDataSourceWrapper getSourceCachedDataSource(final PipelineConfiguration pipelineConfig, final PipelineDataSourceManager dataSourceManager) {
+        return dataSourceManager.getDataSource(PipelineDataSourceConfigurationFactory.newInstance(pipelineConfig.getSource().getType(), pipelineConfig.getSource().getParameter()));
     }
     
-    protected final PipelineDataSourceWrapper getTargetDataSource(final PipelineConfiguration pipelineConfig) {
-        return dataSourceFactory.newInstance(PipelineDataSourceConfigurationFactory.newInstance(pipelineConfig.getTarget().getType(), pipelineConfig.getTarget().getParameter()));
+    protected final PipelineDataSourceWrapper getTargetCachedDataSource(final PipelineConfiguration pipelineConfig, final PipelineDataSourceManager dataSourceManager) {
+        return dataSourceManager.getDataSource(PipelineDataSourceConfigurationFactory.newInstance(pipelineConfig.getTarget().getType(), pipelineConfig.getTarget().getParameter()));
     }
     
     protected final void executeTargetTableSQL(final Connection targetConnection, final String sql) throws SQLException {
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/DataSourcePreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/DataSourcePreparer.java
similarity index 86%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/DataSourcePreparer.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/DataSourcePreparer.java
index 1d9ccb1..4108e63 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/DataSourcePreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/DataSourcePreparer.java
@@ -15,9 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.spi.rulealtered;
-
-import org.apache.shardingsphere.data.pipeline.api.prepare.datasource.PrepareTargetTablesParameter;
+package org.apache.shardingsphere.data.pipeline.core.prepare.datasource;
 
 /**
  * Data source preparer.
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/prepare/datasource/PrepareTargetTablesParameter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetTablesParameter.java
similarity index 84%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/prepare/datasource/PrepareTargetTablesParameter.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetTablesParameter.java
index 3b1b202..c24bce3 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/prepare/datasource/PrepareTargetTablesParameter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetTablesParameter.java
@@ -15,13 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.api.prepare.datasource;
+package org.apache.shardingsphere.data.pipeline.core.prepare.datasource;
 
 import lombok.Getter;
 import lombok.NonNull;
 import lombok.RequiredArgsConstructor;
 import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.PipelineConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine;
+import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 
 /**
  * Prepare target tables parameter.
@@ -35,4 +36,7 @@ public final class PrepareTargetTablesParameter {
     
     @NonNull
     private final PipelineConfiguration pipelineConfiguration;
+    
+    @NonNull
+    private final PipelineDataSourceManager dataSourceManager;
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/AbstractSingleTableDataCalculator.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/AbstractSingleTableDataCalculator.java
index e7bdf57..a955df5 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/AbstractSingleTableDataCalculator.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/AbstractSingleTableDataCalculator.java
@@ -17,9 +17,6 @@
 
 package org.apache.shardingsphere.data.pipeline.core.spi.check.consistency;
 
-import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
-import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
 import org.apache.shardingsphere.data.pipeline.spi.check.consistency.SingleTableDataCalculator;
 
 import java.util.Properties;
@@ -29,14 +26,8 @@ import java.util.Properties;
  */
 public abstract class AbstractSingleTableDataCalculator implements SingleTableDataCalculator {
     
-    private final PipelineDataSourceFactory dataSourceFactory = new PipelineDataSourceFactory();
-    
     private Properties algorithmProps;
     
-    protected final PipelineDataSourceWrapper getDataSource(final PipelineDataSourceConfiguration dataSourceConfig) {
-        return dataSourceFactory.newInstance(dataSourceConfig);
-    }
-    
     @Override
     public Properties getAlgorithmProps() {
         return algorithmProps;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
index 166e689..f0a8c9b 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
@@ -76,7 +76,7 @@ public final class IncrementalTask extends AbstractLifecycleExecutor implements
         IngestPosition<?> position = dumperConfig.getPosition();
         progress.setPosition(position);
         channel = createChannel(concurrency, pipelineChannelFactory, progress);
-        dumper = DumperFactory.createIncrementalDumper(dumperConfig, position, channel);
+        dumper = DumperFactory.createIncrementalDumper(dumperConfig, position, dataSourceManager, channel);
         importers = createImporters(concurrency, importerConfig, dataSourceManager, channel);
     }
     
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
index a298bb4..0bad421 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
@@ -24,17 +24,17 @@ import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
-import org.apache.shardingsphere.data.pipeline.api.prepare.datasource.PrepareTargetTablesParameter;
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
+import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.DataSourcePreparer;
+import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTargetTablesParameter;
 import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
 import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
 import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.prepare.InventoryTaskSplitter;
 import org.apache.shardingsphere.data.pipeline.spi.check.datasource.DataSourceChecker;
 import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelFactory;
 import org.apache.shardingsphere.data.pipeline.spi.ingest.position.PositionInitializer;
-import org.apache.shardingsphere.data.pipeline.spi.rulealtered.DataSourcePreparer;
 import org.apache.shardingsphere.scaling.core.job.check.EnvironmentCheckerFactory;
 import org.apache.shardingsphere.scaling.core.job.position.PositionInitializerFactory;
 
@@ -57,7 +57,7 @@ public final class RuleAlteredJobPreparer {
      */
     public void prepare(final RuleAlteredJobContext jobContext) {
         PipelineDataSourceManager dataSourceManager = jobContext.getDataSourceManager();
-        prepareTarget(jobContext.getJobConfig());
+        prepareTarget(jobContext.getJobConfig(), dataSourceManager);
         try {
             initDataSourceManager(dataSourceManager, jobContext.getTaskConfig());
             checkDataSource(jobContext, dataSourceManager);
@@ -71,14 +71,14 @@ public final class RuleAlteredJobPreparer {
         }
     }
     
-    private void prepareTarget(final JobConfiguration jobConfig) {
+    private void prepareTarget(final JobConfiguration jobConfig, final PipelineDataSourceManager dataSourceManager) {
         DataSourcePreparer dataSourcePreparer = EnvironmentCheckerFactory.getDataSourcePreparer(jobConfig.getHandleConfig().getTargetDatabaseType());
         if (null == dataSourcePreparer) {
             log.info("dataSourcePreparer null, ignore prepare target");
             return;
         }
         JobDataNodeLine tablesFirstDataNodes = JobDataNodeLine.unmarshal(jobConfig.getHandleConfig().getTablesFirstDataNodes());
-        PrepareTargetTablesParameter prepareTargetTablesParameter = new PrepareTargetTablesParameter(tablesFirstDataNodes, jobConfig.getPipelineConfig());
+        PrepareTargetTablesParameter prepareTargetTablesParameter = new PrepareTargetTablesParameter(tablesFirstDataNodes, jobConfig.getPipelineConfig(), dataSourceManager);
         dataSourcePreparer.prepareTargetTables(prepareTargetTablesParameter);
     }
     
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/check/EnvironmentChecker.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/check/EnvironmentChecker.java
index 4cc14ce..a65a00c 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/check/EnvironmentChecker.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/check/EnvironmentChecker.java
@@ -17,8 +17,8 @@
 
 package org.apache.shardingsphere.scaling.core.job.check;
 
+import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.DataSourcePreparer;
 import org.apache.shardingsphere.data.pipeline.spi.check.datasource.DataSourceChecker;
-import org.apache.shardingsphere.data.pipeline.spi.rulealtered.DataSourcePreparer;
 
 /**
  * Environment checker.
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/check/EnvironmentCheckerFactory.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/check/EnvironmentCheckerFactory.java
index fc1a672..68d8063 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/check/EnvironmentCheckerFactory.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/check/EnvironmentCheckerFactory.java
@@ -21,8 +21,8 @@ import lombok.SneakyThrows;
 import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfiguration;
 import org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyChecker;
 import org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyCheckerImpl;
+import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.DataSourcePreparer;
 import org.apache.shardingsphere.data.pipeline.spi.check.datasource.DataSourceChecker;
-import org.apache.shardingsphere.data.pipeline.spi.rulealtered.DataSourcePreparer;
 import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
 import org.apache.shardingsphere.scaling.core.spi.ScalingEntryLoader;
 
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/dumper/DumperFactory.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/dumper/DumperFactory.java
index 33e51e9..eed54f6 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/dumper/DumperFactory.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/dumper/DumperFactory.java
@@ -59,14 +59,17 @@ public final class DumperFactory {
      *
      * @param dumperConfig dumper configuration
      * @param position position
+     * @param dataSourceManager data source manager
      * @param channel channel
      * @return incremental dumper
      */
     @SneakyThrows(ReflectiveOperationException.class)
-    public static IncrementalDumper createIncrementalDumper(final DumperConfiguration dumperConfig, final IngestPosition<?> position, final PipelineChannel channel) {
+    public static IncrementalDumper createIncrementalDumper(final DumperConfiguration dumperConfig, final IngestPosition<?> position,
+                                                            final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel) {
         String databaseType = dumperConfig.getDataSourceConfig().getDatabaseType().getName();
         ScalingEntry scalingEntry = ScalingEntryLoader.getInstance(databaseType);
-        Constructor<? extends IncrementalDumper> constructor = scalingEntry.getIncrementalDumperClass().getConstructor(DumperConfiguration.class, IngestPosition.class, PipelineChannel.class);
-        return constructor.newInstance(dumperConfig, position, channel);
+        Constructor<? extends IncrementalDumper> constructor = scalingEntry.getIncrementalDumperClass()
+                .getConstructor(DumperConfiguration.class, IngestPosition.class, PipelineDataSourceManager.class, PipelineChannel.class);
+        return constructor.newInstance(dumperConfig, position, dataSourceManager, channel);
     }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/environment/ScalingEnvironmentManager.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/environment/ScalingEnvironmentManager.java
index 70d8d14..9c5e733 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/environment/ScalingEnvironmentManager.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/environment/ScalingEnvironmentManager.java
@@ -17,12 +17,12 @@
 
 package org.apache.shardingsphere.scaling.core.job.environment;
 
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
 import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
-import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
 
 import java.sql.Connection;
 import java.sql.PreparedStatement;
@@ -37,19 +37,19 @@ public final class ScalingEnvironmentManager {
     private final PipelineDataSourceFactory dataSourceFactory = new PipelineDataSourceFactory();
     
     /**
-     * Reset target table.
+     * Cleanup target tables.
      *
-     * @param jobContext job context
+     * @param jobConfig job configuration
      * @throws SQLException SQL exception
      */
     // TODO seems it should be removed, dangerous to use
-    public void resetTargetTable(final RuleAlteredJobContext jobContext) throws SQLException {
-        Collection<String> tables = jobContext.getTaskConfig().getDumperConfig().getTableNameMap().values();
-        YamlPipelineDataSourceConfiguration target = jobContext.getJobConfig().getPipelineConfig().getTarget();
+    public void cleanupTargetTables(final JobConfiguration jobConfig) throws SQLException {
+        Collection<String> tables = jobConfig.getHandleConfig().splitLogicTableNames();
+        YamlPipelineDataSourceConfiguration target = jobConfig.getPipelineConfig().getTarget();
         try (PipelineDataSourceWrapper dataSource = dataSourceFactory.newInstance(PipelineDataSourceConfigurationFactory.newInstance(target.getType(), target.getParameter()));
              Connection connection = dataSource.getConnection()) {
             for (String each : tables) {
-                String sql = PipelineSQLBuilderFactory.getSQLBuilder(jobContext.getJobConfig().getHandleConfig().getTargetDatabaseType()).buildTruncateSQL(each);
+                String sql = PipelineSQLBuilderFactory.getSQLBuilder(jobConfig.getHandleConfig().getTargetDatabaseType()).buildTruncateSQL(each);
                 try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
                     preparedStatement.execute();
                 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/MySQLEnvironmentChecker.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/MySQLEnvironmentChecker.java
index 7df5d4c..0b74019 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/MySQLEnvironmentChecker.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/MySQLEnvironmentChecker.java
@@ -17,9 +17,9 @@
 
 package org.apache.shardingsphere.data.pipeline.mysql;
 
+import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.DataSourcePreparer;
 import org.apache.shardingsphere.data.pipeline.mysql.check.datasource.MySQLDataSourceChecker;
 import org.apache.shardingsphere.data.pipeline.mysql.prepare.datasource.MySQLDataSourcePreparer;
-import org.apache.shardingsphere.data.pipeline.spi.rulealtered.DataSourcePreparer;
 import org.apache.shardingsphere.scaling.core.job.check.EnvironmentChecker;
 
 /**
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
index 5395875..b87c985 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
@@ -30,7 +30,7 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
-import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
+import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.AbstractIncrementalDumper;
 import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.BinlogPosition;
@@ -78,13 +78,14 @@ public final class MySQLIncrementalDumper extends AbstractIncrementalDumper<Binl
         VALUE_HANDLER_MAP = SingletonSPIRegistry.getSingletonInstancesMap(ValueHandler.class, ValueHandler::getTypeName);
     }
     
-    public MySQLIncrementalDumper(final DumperConfiguration dumperConfig, final IngestPosition<BinlogPosition> binlogPosition, final PipelineChannel channel) {
-        super(dumperConfig, binlogPosition, channel);
+    public MySQLIncrementalDumper(final DumperConfiguration dumperConfig, final IngestPosition<BinlogPosition> binlogPosition,
+                                  final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel) {
+        super(dumperConfig, binlogPosition, dataSourceManager, channel);
         this.binlogPosition = (BinlogPosition) binlogPosition;
         this.dumperConfig = dumperConfig;
         Preconditions.checkArgument(dumperConfig.getDataSourceConfig() instanceof StandardPipelineDataSourceConfiguration, "MySQLBinlogDumper only support StandardPipelineDataSourceConfiguration");
         this.channel = channel;
-        columnMetaDataLoader = new MySQLColumnMetaDataLoader(new PipelineDataSourceFactory().newInstance(dumperConfig.getDataSourceConfig()));
+        columnMetaDataLoader = new MySQLColumnMetaDataLoader(dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig()));
     }
     
     @Override
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java
index 20667db..5677ec4 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java
@@ -20,10 +20,10 @@ package org.apache.shardingsphere.data.pipeline.mysql.prepare.datasource;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.PipelineConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeEntry;
-import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
-import org.apache.shardingsphere.data.pipeline.api.prepare.datasource.PrepareTargetTablesParameter;
+import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
 import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.AbstractDataSourcePreparer;
+import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTargetTablesParameter;
 import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
 import org.apache.shardingsphere.data.pipeline.mysql.sqlbuilder.MySQLPipelineSQLBuilder;
 
@@ -45,10 +45,9 @@ public final class MySQLDataSourcePreparer extends AbstractDataSourcePreparer {
     @Override
     public void prepareTargetTables(final PrepareTargetTablesParameter parameter) {
         PipelineConfiguration pipelineConfig = parameter.getPipelineConfiguration();
-        try (PipelineDataSourceWrapper sourceDataSource = getSourceDataSource(pipelineConfig);
-             Connection sourceConnection = sourceDataSource.getConnection();
-             PipelineDataSourceWrapper targetDataSource = getTargetDataSource(pipelineConfig);
-             Connection targetConnection = targetDataSource.getConnection()) {
+        PipelineDataSourceManager dataSourceManager = parameter.getDataSourceManager();
+        try (Connection sourceConnection = getSourceCachedDataSource(pipelineConfig, dataSourceManager).getConnection();
+             Connection targetConnection = getTargetCachedDataSource(pipelineConfig, dataSourceManager).getConnection()) {
             Collection<String> logicTableNames = parameter.getTablesFirstDataNodes().getEntries().stream().map(JobDataNodeEntry::getLogicTableName).collect(Collectors.toList());
             for (String each : logicTableNames) {
                 String createTableSQL = getCreateTableSQL(sourceConnection, each);
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
index c732e56..1b44d6a 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
@@ -61,7 +61,7 @@ public final class MySQLIncrementalDumperTest {
         DumperConfiguration dumperConfig = mockDumperConfiguration();
         initTableData(dumperConfig);
         channel = new MultiplexMemoryPipelineChannel();
-        incrementalDumper = new MySQLIncrementalDumper(dumperConfig, new BinlogPosition("binlog-000001", 4L), channel);
+        incrementalDumper = new MySQLIncrementalDumper(dumperConfig, new BinlogPosition("binlog-000001", 4L), new PipelineDataSourceManager(), channel);
     }
     
     private DumperConfiguration mockDumperConfiguration() {
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java
index ef7e7e7..dde7537 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java
@@ -21,9 +21,9 @@ import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.PipelineCo
 import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.prepare.datasource.PrepareTargetTablesParameter;
 import org.apache.shardingsphere.data.pipeline.core.datasource.creator.PipelineDataSourceCreatorFactory;
 import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
+import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTargetTablesParameter;
 import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
index 5c0c601..a91ba15 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
@@ -20,11 +20,12 @@ package org.apache.shardingsphere.data.pipeline.opengauss.ingest;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
 import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
+import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.core.datasource.creator.PipelineDataSourceCreatorFactory;
+import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.AbstractIncrementalDumper;
 import org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
 import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
 import org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.OpenGaussLogicalReplication;
@@ -36,7 +37,6 @@ import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WalPosition
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.DecodingPlugin;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractWalEvent;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.PlaceholderEvent;
-import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumper;
 import org.opengauss.jdbc.PgConnection;
 import org.opengauss.replication.PGReplicationStream;
 
@@ -49,7 +49,7 @@ import java.sql.SQLException;
  * OpenGauss WAL dumper.
  */
 @Slf4j
-public final class OpenGaussWalDumper extends AbstractLifecycleExecutor implements IncrementalDumper {
+public final class OpenGaussWalDumper extends AbstractIncrementalDumper<WalPosition> {
     
     private final WalPosition walPosition;
     
@@ -63,14 +63,16 @@ public final class OpenGaussWalDumper extends AbstractLifecycleExecutor implemen
     
     private final PipelineChannel channel;
     
-    public OpenGaussWalDumper(final DumperConfiguration dumperConfig, final IngestPosition<WalPosition> position, final PipelineChannel channel) {
+    public OpenGaussWalDumper(final DumperConfiguration dumperConfig, final IngestPosition<WalPosition> position,
+                              final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel) {
+        super(dumperConfig, position, dataSourceManager, channel);
         walPosition = (WalPosition) position;
         if (!StandardPipelineDataSourceConfiguration.class.equals(dumperConfig.getDataSourceConfig().getClass())) {
             throw new UnsupportedOperationException("PostgreSQLWalDumper only support PipelineDataSourceConfiguration");
         }
         this.dumperConfig = dumperConfig;
         this.channel = channel;
-        walEventConverter = new WalEventConverter(dumperConfig);
+        walEventConverter = new WalEventConverter(dumperConfig, dataSourceManager);
     }
     
     @Override
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePreparer.java
index e72c557..f70538a 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePreparer.java
@@ -23,11 +23,11 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSource
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.prepare.datasource.ActualTableDefinition;
-import org.apache.shardingsphere.data.pipeline.api.prepare.datasource.PrepareTargetTablesParameter;
 import org.apache.shardingsphere.data.pipeline.api.prepare.datasource.TableDefinitionSQLType;
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
 import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.AbstractDataSourcePreparer;
+import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTargetTablesParameter;
 import org.apache.shardingsphere.infra.datanode.DataNode;
 
 import java.sql.Connection;
@@ -60,8 +60,7 @@ public final class OpenGaussDataSourcePreparer extends AbstractDataSourcePrepare
             throw new PipelineJobPrepareFailedException("get table definitions failed.", ex);
         }
         Map<String, Collection<String>> createLogicTableSQLs = getCreateLogicTableSQLs(actualTableDefinitions);
-        try (PipelineDataSourceWrapper targetDataSource = getTargetDataSource(parameter.getPipelineConfiguration());
-             Connection targetConnection = targetDataSource.getConnection()) {
+        try (Connection targetConnection = getTargetCachedDataSource(parameter.getPipelineConfiguration(), parameter.getDataSourceManager()).getConnection()) {
             for (Entry<String, Collection<String>> entry : createLogicTableSQLs.entrySet()) {
                 for (String each : entry.getValue()) {
                     executeTargetTableSQL(targetConnection, each);
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLEnvironmentChecker.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLEnvironmentChecker.java
index ad86d45..49f67a3 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLEnvironmentChecker.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLEnvironmentChecker.java
@@ -17,8 +17,8 @@
 
 package org.apache.shardingsphere.data.pipeline.postgresql;
 
+import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.DataSourcePreparer;
 import org.apache.shardingsphere.data.pipeline.postgresql.check.datasource.PostgreSQLDataSourceChecker;
-import org.apache.shardingsphere.data.pipeline.spi.rulealtered.DataSourcePreparer;
 import org.apache.shardingsphere.scaling.core.job.check.EnvironmentChecker;
 
 /**
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java
index 7a0acd7..4ae61bc 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java
@@ -23,6 +23,7 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.Standa
 import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
+import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.AbstractIncrementalDumper;
 import org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
 import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
@@ -58,15 +59,16 @@ public final class PostgreSQLWalDumper extends AbstractIncrementalDumper<WalPosi
     
     private final PipelineChannel channel;
     
-    public PostgreSQLWalDumper(final DumperConfiguration dumperConfig, final IngestPosition<WalPosition> position, final PipelineChannel channel) {
-        super(dumperConfig, position, channel);
+    public PostgreSQLWalDumper(final DumperConfiguration dumperConfig, final IngestPosition<WalPosition> position,
+                               final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel) {
+        super(dumperConfig, position, dataSourceManager, channel);
         walPosition = (WalPosition) position;
         if (!StandardPipelineDataSourceConfiguration.class.equals(dumperConfig.getDataSourceConfig().getClass())) {
             throw new UnsupportedOperationException("PostgreSQLWalDumper only support PipelineDataSourceConfiguration");
         }
         this.dumperConfig = dumperConfig;
         this.channel = channel;
-        walEventConverter = new WalEventConverter(dumperConfig);
+        walEventConverter = new WalEventConverter(dumperConfig, dataSourceManager);
     }
     
     @Override
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverter.java
index 38c3155..c660ed0 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverter.java
@@ -22,7 +22,7 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
-import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
+import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineMetaDataManager;
 import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractRowEvent;
@@ -47,10 +47,10 @@ public final class WalEventConverter {
     
     private final PipelineMetaDataManager metaDataManager;
     
-    public WalEventConverter(final DumperConfiguration dumperConfig) {
+    public WalEventConverter(final DumperConfiguration dumperConfig, final PipelineDataSourceManager dataSourceManager) {
         this.dumperConfig = dumperConfig;
         databaseType = dumperConfig.getDataSourceConfig().getDatabaseType();
-        metaDataManager = new PipelineMetaDataManager(new PipelineDataSourceFactory().newInstance(dumperConfig.getDataSourceConfig()));
+        metaDataManager = new PipelineMetaDataManager(dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig()));
     }
     
     /**
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java
index fdbac8a..c54da70 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.data.pipeline.postgresql.ingest;
 
 import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
+import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.MultiplexMemoryPipelineChannel;
 import org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
 import org.apache.shardingsphere.data.pipeline.core.util.ReflectionUtil;
@@ -70,7 +71,7 @@ public final class PostgreSQLWalDumperTest {
     public void setUp() {
         position = new WalPosition(new PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(100L)));
         channel = new MultiplexMemoryPipelineChannel();
-        walDumper = new PostgreSQLWalDumper(mockDumperConfiguration(), position, channel);
+        walDumper = new PostgreSQLWalDumper(mockDumperConfiguration(), position, new PipelineDataSourceManager(), channel);
     }
     
     private DumperConfiguration mockDumperConfiguration() {
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverterTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverterTest.java
index 53e225c..1a18864 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverterTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverterTest.java
@@ -51,7 +51,7 @@ public final class WalEventConverterTest {
     @Before
     public void setUp() {
         DumperConfiguration dumperConfig = mockDumperConfiguration();
-        walEventConverter = new WalEventConverter(dumperConfig);
+        walEventConverter = new WalEventConverter(dumperConfig, new PipelineDataSourceManager());
         initTableData(dumperConfig);
     }
     
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/HandleConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/HandleConfiguration.java
index efe69a3..feebbb3 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/HandleConfiguration.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/HandleConfiguration.java
@@ -17,11 +17,13 @@
 
 package org.apache.shardingsphere.data.pipeline.api.config.rulealtered;
 
+import com.google.common.base.Splitter;
 import lombok.Getter;
 import lombok.NoArgsConstructor;
 import lombok.Setter;
 import lombok.ToString;
 
+import java.util.Collection;
 import java.util.List;
 
 /**
@@ -72,4 +74,13 @@ public final class HandleConfiguration {
     public int getJobShardingCount() {
         return null == jobShardingDataNodes ? 0 : jobShardingDataNodes.size();
     }
+    
+    /**
+     * Split {@linkplain #logicTables} to logic table names.
+     *
+     * @return logic table names
+     */
+    public Collection<String> splitLogicTableNames() {
+        return Splitter.on(',').splitToList(logicTables);
+    }
 }
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureDataSourcePreparer.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureDataSourcePreparer.java
index eb6c277..b086621 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureDataSourcePreparer.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureDataSourcePreparer.java
@@ -17,8 +17,8 @@
 
 package org.apache.shardingsphere.data.pipeline.core.fixture;
 
-import org.apache.shardingsphere.data.pipeline.api.prepare.datasource.PrepareTargetTablesParameter;
-import org.apache.shardingsphere.data.pipeline.spi.rulealtered.DataSourcePreparer;
+import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.DataSourcePreparer;
+import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTargetTablesParameter;
 
 public final class FixtureDataSourcePreparer implements DataSourcePreparer {
     
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureEnvironmentChecker.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureEnvironmentChecker.java
index 4afe4a1..b540ae4 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureEnvironmentChecker.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureEnvironmentChecker.java
@@ -17,8 +17,8 @@
 
 package org.apache.shardingsphere.data.pipeline.core.fixture;
 
+import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.DataSourcePreparer;
 import org.apache.shardingsphere.data.pipeline.spi.check.datasource.DataSourceChecker;
-import org.apache.shardingsphere.data.pipeline.spi.rulealtered.DataSourcePreparer;
 import org.apache.shardingsphere.scaling.core.job.check.EnvironmentChecker;
 
 public final class FixtureEnvironmentChecker implements EnvironmentChecker {
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureIncrementalDumper.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureIncrementalDumper.java
index 096d6b8..32bec41 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureIncrementalDumper.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureIncrementalDumper.java
@@ -21,12 +21,14 @@ import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfigura
 import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
+import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.AbstractIncrementalDumper;
 
 public final class FixtureIncrementalDumper extends AbstractIncrementalDumper<FinishedPosition> {
     
-    public FixtureIncrementalDumper(final DumperConfiguration dumperConfig, final IngestPosition<FinishedPosition> position, final PipelineChannel channel) {
-        super(dumperConfig, position, channel);
+    public FixtureIncrementalDumper(final DumperConfiguration dumperConfig, final IngestPosition<FinishedPosition> position,
+                                    final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel) {
+        super(dumperConfig, position, dataSourceManager, channel);
     }
     
     @Override
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparerTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparerTest.java
index a80e96a..c2eb2a1 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparerTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparerTest.java
@@ -18,7 +18,6 @@
 package org.apache.shardingsphere.data.pipeline.core.prepare.datasource;
 
 import org.apache.commons.lang3.tuple.Pair;
-import org.apache.shardingsphere.data.pipeline.api.prepare.datasource.PrepareTargetTablesParameter;
 import org.apache.shardingsphere.data.pipeline.api.prepare.datasource.TableDefinitionSQLType;
 import org.junit.Test;