You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/01/30 02:09:55 UTC

[shardingsphere] branch master updated: Reuse sourceDataSource and sourceMetaDataLoader in scaling job process (#15190)

This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new e915749  Reuse sourceDataSource and sourceMetaDataLoader in scaling job process (#15190)
e915749 is described below

commit e91574954dca0ee449d62511b29f591e3d54abc8
Author: Hongsheng Zhong <sa...@126.com>
AuthorDate: Sun Jan 30 10:08:02 2022 +0800

    Reuse sourceDataSource and sourceMetaDataLoader in scaling job process (#15190)
    
    * Simplify parameters and fields
    
    * Initialize sourceDataSource and sourceMetaDataLoader in job context
    
    * Reuse sourceDataSource and sourceMetaDataLoader in job process
    
    * Unit test
---
 .../ingest/dumper/AbstractIncrementalDumper.java   |  4 +--
 .../ingest/dumper/AbstractInventoryDumper.java     | 39 +++++++++++++---------
 .../data/pipeline/core/task/IncrementalTask.java   | 10 +++---
 .../data/pipeline/core/task/InventoryTask.java     | 10 +++---
 .../rulealtered/RuleAlteredJobContext.java         | 39 ++++++++++++++++++++++
 .../rulealtered/RuleAlteredJobPreparer.java        | 15 +++++----
 .../rulealtered/prepare/InventoryTaskSplitter.java | 18 +++++-----
 .../scaling/core/job/dumper/DumperFactory.java     | 21 +++++++-----
 .../mysql/ingest/MySQLIncrementalDumper.java       |  7 ++--
 .../mysql/ingest/MySQLInventoryDumper.java         |  8 +++--
 .../mysql/ingest/MySQLIncrementalDumperTest.java   | 12 ++++++-
 .../pipeline/mysql/ingest/MySQLJdbcDumperTest.java | 21 ++++++------
 .../opengauss/ingest/OpenGaussWalDumper.java       |  8 ++---
 .../ingest/PostgreSQLInventoryDumper.java          |  8 +++--
 .../postgresql/ingest/PostgreSQLWalDumper.java     |  8 ++---
 .../postgresql/ingest/wal/WalEventConverter.java   |  5 ++-
 .../ingest/PostgreSQLJdbcDumperTest.java           | 17 ++++++----
 .../postgresql/ingest/PostgreSQLWalDumperTest.java | 25 ++++++++++----
 .../ingest/wal/WalEventConverterTest.java          | 11 +++++-
 .../api/impl/GovernanceRepositoryAPIImplTest.java  | 10 ++++--
 .../core/fixture/FixtureIncrementalDumper.java     |  6 ++--
 .../core/fixture/FixtureInventoryDumper.java       |  8 +++--
 .../pipeline/core/task/IncrementalTaskTest.java    |  6 +++-
 .../data/pipeline/core/task/InventoryTaskTest.java | 17 ++++++++--
 .../prepare/InventoryTaskSplitterTest.java         | 12 +++----
 25 files changed, 228 insertions(+), 117 deletions(-)

diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractIncrementalDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractIncrementalDumper.java
index eba47d3..f36e16e 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractIncrementalDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractIncrementalDumper.java
@@ -21,7 +21,7 @@ import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfigura
 import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
 import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
-import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
+import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumper;
 
 /**
@@ -32,6 +32,6 @@ import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDump
 public abstract class AbstractIncrementalDumper<P> extends AbstractLifecycleExecutor implements IncrementalDumper {
     
     public AbstractIncrementalDumper(final DumperConfiguration dumperConfig, final IngestPosition<P> position,
-                                     final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel) {
+                                     final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) {
     }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
index 063941a..b537ee1 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
@@ -19,9 +19,11 @@ package org.apache.shardingsphere.data.pipeline.core.ingest.dumper;
 
 import lombok.AccessLevel;
 import lombok.Getter;
+import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.concurrent.ConcurrentException;
+import org.apache.commons.lang3.concurrent.LazyInitializer;
 import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
 import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
@@ -34,7 +36,6 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
 import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
-import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
 import org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
 import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
@@ -42,6 +43,7 @@ import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTable
 import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumper;
 import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
 
+import javax.sql.DataSource;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
@@ -62,29 +64,28 @@ public abstract class AbstractInventoryDumper extends AbstractLifecycleExecutor
     
     private final JobRateLimitAlgorithm rateLimitAlgorithm;
     
-    private final PipelineDataSourceManager dataSourceManager;
-    
-    private final PipelineTableMetaData tableMetaData;
+    private final LazyInitializer<PipelineTableMetaData> tableMetaDataLazyInitializer;
     
     private final PipelineChannel channel;
     
-    protected AbstractInventoryDumper(final InventoryDumperConfiguration inventoryDumperConfig, final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel) {
+    private final DataSource dataSource;
+    
+    protected AbstractInventoryDumper(final InventoryDumperConfiguration inventoryDumperConfig, final PipelineChannel channel,
+                                      final DataSource dataSource, final PipelineTableMetaDataLoader metaDataLoader) {
         if (!StandardPipelineDataSourceConfiguration.class.equals(inventoryDumperConfig.getDataSourceConfig().getClass())) {
             throw new UnsupportedOperationException("AbstractInventoryDumper only support StandardPipelineDataSourceConfiguration");
         }
         this.inventoryDumperConfig = inventoryDumperConfig;
         this.batchSize = inventoryDumperConfig.getBatchSize();
         this.rateLimitAlgorithm = inventoryDumperConfig.getRateLimitAlgorithm();
-        this.dataSourceManager = dataSourceManager;
+        tableMetaDataLazyInitializer = new LazyInitializer<PipelineTableMetaData>() {
+            @Override
+            protected PipelineTableMetaData initialize() {
+                return metaDataLoader.getTableMetaData(inventoryDumperConfig.getTableName());
+            }
+        };
         this.channel = channel;
-        tableMetaData = createTableMetaData();
-    }
-    
-    private PipelineTableMetaData createTableMetaData() {
-        PipelineDataSourceConfiguration dataSourceConfig = inventoryDumperConfig.getDataSourceConfig();
-        // TODO share PipelineTableMetaDataLoader
-        PipelineTableMetaDataLoader metaDataManager = new PipelineTableMetaDataLoader(dataSourceManager.getDataSource(dataSourceConfig));
-        return metaDataManager.getTableMetaData(inventoryDumperConfig.getTableName());
+        this.dataSource = dataSource;
     }
     
     @Override
@@ -96,7 +97,7 @@ public abstract class AbstractInventoryDumper extends AbstractLifecycleExecutor
         String sql = getDumpSQL();
         IngestPosition<?> position = inventoryDumperConfig.getPosition();
         log.info("inventory dump, sql={}, position={}", sql, position);
-        try (Connection conn = dataSourceManager.getDataSource(inventoryDumperConfig.getDataSourceConfig()).getConnection()) {
+        try (Connection conn = dataSource.getConnection()) {
             int round = 1;
             Number startUniqueKeyValue = getPositionBeginValue(position) - 1;
             Optional<Number> maxUniqueKeyValue;
@@ -123,10 +124,16 @@ public abstract class AbstractInventoryDumper extends AbstractLifecycleExecutor
         return "SELECT * FROM " + tableName + " WHERE " + primaryKey + " > ? AND " + primaryKey + " <= ? ORDER BY " + primaryKey + " ASC LIMIT ?";
     }
     
+    @SneakyThrows(ConcurrentException.class)
+    private PipelineTableMetaData getTableMetaData() {
+        return tableMetaDataLazyInitializer.get();
+    }
+    
     private Optional<Number> dump0(final Connection conn, final String sql, final Number startUniqueKeyValue, final int round) throws SQLException {
         if (null != rateLimitAlgorithm) {
             rateLimitAlgorithm.intercept(JobOperationType.SELECT, 1);
         }
+        PipelineTableMetaData tableMetaData = getTableMetaData();
         try (PreparedStatement preparedStatement = createPreparedStatement(conn, sql)) {
             preparedStatement.setObject(1, startUniqueKeyValue);
             preparedStatement.setObject(2, getPositionEndValue(inventoryDumperConfig.getPosition()));
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
index f0a8c9b..a40065e 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
@@ -32,6 +32,7 @@ import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourc
 import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobExecutionException;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
+import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.spi.importer.Importer;
 import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelFactory;
 import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.Dumper;
@@ -47,7 +48,7 @@ import java.util.concurrent.Future;
  * Incremental task.
  */
 @Slf4j
-@ToString(exclude = {"incrementalDumperExecuteEngine", "dataSourceManager", "dumper", "progress"})
+@ToString(exclude = {"incrementalDumperExecuteEngine", "channel", "dumper", "importers", "progress"})
 public final class IncrementalTask extends AbstractLifecycleExecutor implements PipelineTask, AutoCloseable {
     
     @Getter
@@ -55,8 +56,6 @@ public final class IncrementalTask extends AbstractLifecycleExecutor implements
     
     private final ExecuteEngine incrementalDumperExecuteEngine;
     
-    private final PipelineDataSourceManager dataSourceManager;
-    
     private final PipelineChannel channel;
     
     private final Dumper dumper;
@@ -68,15 +67,14 @@ public final class IncrementalTask extends AbstractLifecycleExecutor implements
     
     public IncrementalTask(final int concurrency, final DumperConfiguration dumperConfig, final ImporterConfiguration importerConfig,
                            final PipelineChannelFactory pipelineChannelFactory, final PipelineDataSourceManager dataSourceManager,
-                           final ExecuteEngine incrementalDumperExecuteEngine) {
+                           final PipelineTableMetaDataLoader sourceMetaDataLoader, final ExecuteEngine incrementalDumperExecuteEngine) {
         this.incrementalDumperExecuteEngine = incrementalDumperExecuteEngine;
-        this.dataSourceManager = dataSourceManager;
         taskId = dumperConfig.getDataSourceName();
         progress = new IncrementalTaskProgress();
         IngestPosition<?> position = dumperConfig.getPosition();
         progress.setPosition(position);
         channel = createChannel(concurrency, pipelineChannelFactory, progress);
-        dumper = DumperFactory.createIncrementalDumper(dumperConfig, position, dataSourceManager, channel);
+        dumper = DumperFactory.createIncrementalDumper(dumperConfig, position, channel, sourceMetaDataLoader);
         importers = createImporters(concurrency, importerConfig, dataSourceManager, channel);
     }
     
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
index ffd0f97..b4d814b 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
@@ -32,12 +32,14 @@ import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourc
 import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobExecutionException;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
+import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.spi.importer.Importer;
 import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelFactory;
 import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.Dumper;
 import org.apache.shardingsphere.scaling.core.job.dumper.DumperFactory;
 import org.apache.shardingsphere.scaling.core.job.importer.ImporterFactory;
 
+import javax.sql.DataSource;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
@@ -46,7 +48,7 @@ import java.util.concurrent.Future;
  * Inventory task.
  */
 @Slf4j
-@ToString(exclude = {"importerExecuteEngine", "dataSourceManager", "dumper"})
+@ToString(exclude = {"importerExecuteEngine", "channel", "dumper", "importer"})
 public final class InventoryTask extends AbstractLifecycleExecutor implements PipelineTask, AutoCloseable {
     
     @Getter
@@ -54,8 +56,6 @@ public final class InventoryTask extends AbstractLifecycleExecutor implements Pi
     
     private final ExecuteEngine importerExecuteEngine;
     
-    private final PipelineDataSourceManager dataSourceManager;
-    
     private final PipelineChannel channel;
     
     private final Dumper dumper;
@@ -66,12 +66,12 @@ public final class InventoryTask extends AbstractLifecycleExecutor implements Pi
     
     public InventoryTask(final InventoryDumperConfiguration inventoryDumperConfig, final ImporterConfiguration importerConfig,
                          final PipelineChannelFactory pipelineChannelFactory, final PipelineDataSourceManager dataSourceManager,
+                         final DataSource sourceDataSource, final PipelineTableMetaDataLoader sourceMetaDataLoader,
                          final ExecuteEngine importerExecuteEngine) {
         this.importerExecuteEngine = importerExecuteEngine;
-        this.dataSourceManager = dataSourceManager;
         taskId = generateTaskId(inventoryDumperConfig);
         channel = createChannel(pipelineChannelFactory);
-        dumper = DumperFactory.createInventoryDumper(inventoryDumperConfig, dataSourceManager, channel);
+        dumper = DumperFactory.createInventoryDumper(inventoryDumperConfig, channel, sourceDataSource, sourceMetaDataLoader);
         importer = ImporterFactory.createImporter(importerConfig, dataSourceManager, channel);
         position = inventoryDumperConfig.getPosition();
     }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java
index 4612f5c..32562af 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java
@@ -19,12 +19,17 @@ package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
 
 import lombok.Getter;
 import lombok.Setter;
+import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.concurrent.ConcurrentException;
+import org.apache.commons.lang3.concurrent.LazyInitializer;
 import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
+import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
 import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
 
@@ -60,6 +65,20 @@ public final class RuleAlteredJobContext {
     
     private final PipelineDataSourceManager dataSourceManager = new PipelineDataSourceManager();
     
+    private final LazyInitializer<PipelineDataSourceWrapper> sourceDataSourceLazyInitializer = new LazyInitializer<PipelineDataSourceWrapper>() {
+        @Override
+        protected PipelineDataSourceWrapper initialize() {
+            return dataSourceManager.getDataSource(taskConfig.getDumperConfig().getDataSourceConfig());
+        }
+    };
+    
+    private final LazyInitializer<PipelineTableMetaDataLoader> sourceMetaDataLoaderLazyInitializer = new LazyInitializer<PipelineTableMetaDataLoader>() {
+        @Override
+        protected PipelineTableMetaDataLoader initialize() throws ConcurrentException {
+            return new PipelineTableMetaDataLoader(sourceDataSourceLazyInitializer.get());
+        }
+    };
+    
     private RuleAlteredJobPreparer jobPreparer;
     
     public RuleAlteredJobContext(final JobConfiguration jobConfig) {
@@ -72,6 +91,26 @@ public final class RuleAlteredJobContext {
     }
     
     /**
+     * Get source data source.
+     *
+     * @return source data source
+     */
+    @SneakyThrows(ConcurrentException.class)
+    public PipelineDataSourceWrapper getSourceDataSource() {
+        return sourceDataSourceLazyInitializer.get();
+    }
+    
+    /**
+     * Get source metadata loader.
+     *
+     * @return source metadata loader
+     */
+    @SneakyThrows(ConcurrentException.class)
+    public PipelineTableMetaDataLoader getSourceMetaDataLoader() {
+        return sourceMetaDataLoaderLazyInitializer.get();
+    }
+    
+    /**
      * Release resources.
      */
     public void close() {
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
index fa95de7..e756f4b 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
@@ -29,6 +29,7 @@ import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourc
 import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
 import org.apache.shardingsphere.data.pipeline.core.ingest.position.PositionInitializerFactory;
+import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.DataSourcePreparer;
 import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTargetTablesParameter;
 import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
@@ -64,8 +65,8 @@ public final class RuleAlteredJobPreparer {
         prepareTarget(jobContext.getJobConfig(), dataSourceManager);
         initAndCheckDataSource(jobContext);
         try {
-            initIncrementalTasks(jobContext, dataSourceManager);
-            initInventoryTasks(jobContext, dataSourceManager);
+            initIncrementalTasks(jobContext);
+            initInventoryTasks(jobContext);
             log.info("prepare, jobId={}, shardingItem={}, inventoryTasks={}, incrementalTasks={}",
                     jobContext.getJobId(), jobContext.getShardingItem(), jobContext.getInventoryTasks(), jobContext.getIncrementalTasks());
         } catch (final SQLException ex) {
@@ -112,18 +113,20 @@ public final class RuleAlteredJobPreparer {
         dataSourceChecker.checkTargetTable(targetDataSources, jobContext.getTaskConfig().getImporterConfig().getShardingColumnsMap().keySet());
     }
     
-    private void initInventoryTasks(final RuleAlteredJobContext jobContext, final PipelineDataSourceManager dataSourceManager) {
-        List<InventoryTask> allInventoryTasks = inventoryTaskSplitter.splitInventoryData(jobContext, dataSourceManager);
+    private void initInventoryTasks(final RuleAlteredJobContext jobContext) {
+        List<InventoryTask> allInventoryTasks = inventoryTaskSplitter.splitInventoryData(jobContext);
         jobContext.getInventoryTasks().addAll(allInventoryTasks);
     }
     
-    private void initIncrementalTasks(final RuleAlteredJobContext jobContext, final PipelineDataSourceManager dataSourceManager) throws SQLException {
+    private void initIncrementalTasks(final RuleAlteredJobContext jobContext) throws SQLException {
         PipelineChannelFactory pipelineChannelFactory = jobContext.getRuleAlteredContext().getPipelineChannelFactory();
         ExecuteEngine incrementalDumperExecuteEngine = jobContext.getRuleAlteredContext().getIncrementalDumperExecuteEngine();
         TaskConfiguration taskConfig = jobContext.getTaskConfig();
+        PipelineDataSourceManager dataSourceManager = jobContext.getDataSourceManager();
         taskConfig.getDumperConfig().setPosition(getIncrementalPosition(jobContext, taskConfig, dataSourceManager));
+        PipelineTableMetaDataLoader sourceMetaDataLoader = jobContext.getSourceMetaDataLoader();
         IncrementalTask incrementalTask = new IncrementalTask(taskConfig.getHandleConfig().getConcurrency(), taskConfig.getDumperConfig(), taskConfig.getImporterConfig(),
-            pipelineChannelFactory, dataSourceManager, incrementalDumperExecuteEngine);
+            pipelineChannelFactory, dataSourceManager, sourceMetaDataLoader, incrementalDumperExecuteEngine);
         jobContext.getIncrementalTasks().add(incrementalTask);
     }
     
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
index 757b754..2a9ff69 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
@@ -22,7 +22,6 @@ import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfigura
 import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.PrimaryKeyPosition;
@@ -65,25 +64,26 @@ public final class InventoryTaskSplitter {
      * Split inventory data to multi-tasks.
      *
      * @param jobContext job context
-     * @param dataSourceManager data source manager
      * @return split inventory data task
      */
-    public List<InventoryTask> splitInventoryData(final RuleAlteredJobContext jobContext, final PipelineDataSourceManager dataSourceManager) {
+    public List<InventoryTask> splitInventoryData(final RuleAlteredJobContext jobContext) {
         List<InventoryTask> result = new LinkedList<>();
         TaskConfiguration taskConfig = jobContext.getTaskConfig();
         PipelineChannelFactory pipelineChannelFactory = jobContext.getRuleAlteredContext().getPipelineChannelFactory();
+        PipelineDataSourceManager dataSourceManager = jobContext.getDataSourceManager();
+        DataSource dataSource = jobContext.getSourceDataSource();
+        PipelineTableMetaDataLoader metaDataLoader = jobContext.getSourceMetaDataLoader();
         ExecuteEngine importerExecuteEngine = jobContext.getRuleAlteredContext().getImporterExecuteEngine();
-        for (InventoryDumperConfiguration each : splitDumperConfig(jobContext, taskConfig.getDumperConfig(), dataSourceManager)) {
-            result.add(new InventoryTask(each, taskConfig.getImporterConfig(), pipelineChannelFactory, dataSourceManager, importerExecuteEngine));
+        for (InventoryDumperConfiguration each : splitDumperConfig(jobContext, taskConfig.getDumperConfig())) {
+            result.add(new InventoryTask(each, taskConfig.getImporterConfig(), pipelineChannelFactory, dataSourceManager, dataSource, metaDataLoader, importerExecuteEngine));
         }
         return result;
     }
     
-    private Collection<InventoryDumperConfiguration> splitDumperConfig(
-            final RuleAlteredJobContext jobContext, final DumperConfiguration dumperConfig, final PipelineDataSourceManager dataSourceManager) {
+    private Collection<InventoryDumperConfiguration> splitDumperConfig(final RuleAlteredJobContext jobContext, final DumperConfiguration dumperConfig) {
         Collection<InventoryDumperConfiguration> result = new LinkedList<>();
-        PipelineDataSourceWrapper dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig());
-        PipelineTableMetaDataLoader metaDataLoader = new PipelineTableMetaDataLoader(dataSource);
+        DataSource dataSource = jobContext.getSourceDataSource();
+        PipelineTableMetaDataLoader metaDataLoader = jobContext.getSourceMetaDataLoader();
         for (InventoryDumperConfiguration each : splitByTable(dumperConfig)) {
             result.addAll(splitByPrimaryKey(jobContext, dataSource, metaDataLoader, each));
         }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/dumper/DumperFactory.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/dumper/DumperFactory.java
index eed54f6..3ecacf1 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/dumper/DumperFactory.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/dumper/DumperFactory.java
@@ -24,12 +24,13 @@ import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfigura
 import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
-import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
+import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumper;
 import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumper;
 import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
 import org.apache.shardingsphere.scaling.core.spi.ScalingEntryLoader;
 
+import javax.sql.DataSource;
 import java.lang.reflect.Constructor;
 
 /**
@@ -42,16 +43,18 @@ public final class DumperFactory {
      * Create inventory dumper.
      *
      * @param inventoryDumperConfig inventory dumper configuration
-     * @param dataSourceManager data source factory
      * @param channel channel
+     * @param sourceDataSource data source
+     * @param sourceMetaDataLoader metadata loader
      * @return inventory dumper
      */
     @SneakyThrows(ReflectiveOperationException.class)
-    public static InventoryDumper createInventoryDumper(final InventoryDumperConfiguration inventoryDumperConfig, final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel) {
+    public static InventoryDumper createInventoryDumper(final InventoryDumperConfiguration inventoryDumperConfig, final PipelineChannel channel,
+                                                        final DataSource sourceDataSource, final PipelineTableMetaDataLoader sourceMetaDataLoader) {
         ScalingEntry scalingEntry = ScalingEntryLoader.getInstance(inventoryDumperConfig.getDataSourceConfig().getDatabaseType().getName());
         Constructor<? extends InventoryDumper> constructor = scalingEntry.getInventoryDumperClass()
-                .getConstructor(InventoryDumperConfiguration.class, PipelineDataSourceManager.class, PipelineChannel.class);
-        return constructor.newInstance(inventoryDumperConfig, dataSourceManager, channel);
+                .getConstructor(InventoryDumperConfiguration.class, PipelineChannel.class, DataSource.class, PipelineTableMetaDataLoader.class);
+        return constructor.newInstance(inventoryDumperConfig, channel, sourceDataSource, sourceMetaDataLoader);
     }
     
     /**
@@ -59,17 +62,17 @@ public final class DumperFactory {
      *
      * @param dumperConfig dumper configuration
      * @param position position
-     * @param dataSourceManager data source manager
      * @param channel channel
+     * @param sourceMetaDataLoader metadata loader
      * @return incremental dumper
      */
     @SneakyThrows(ReflectiveOperationException.class)
     public static IncrementalDumper createIncrementalDumper(final DumperConfiguration dumperConfig, final IngestPosition<?> position,
-                                                            final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel) {
+                                                            final PipelineChannel channel, final PipelineTableMetaDataLoader sourceMetaDataLoader) {
         String databaseType = dumperConfig.getDataSourceConfig().getDatabaseType().getName();
         ScalingEntry scalingEntry = ScalingEntryLoader.getInstance(databaseType);
         Constructor<? extends IncrementalDumper> constructor = scalingEntry.getIncrementalDumperClass()
-                .getConstructor(DumperConfiguration.class, IngestPosition.class, PipelineDataSourceManager.class, PipelineChannel.class);
-        return constructor.newInstance(dumperConfig, position, dataSourceManager, channel);
+                .getConstructor(DumperConfiguration.class, IngestPosition.class, PipelineChannel.class, PipelineTableMetaDataLoader.class);
+        return constructor.newInstance(dumperConfig, position, channel, sourceMetaDataLoader);
     }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
index 6009940..d90e75d 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
@@ -30,7 +30,6 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
-import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.AbstractIncrementalDumper;
 import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
@@ -79,13 +78,13 @@ public final class MySQLIncrementalDumper extends AbstractIncrementalDumper<Binl
     }
     
     public MySQLIncrementalDumper(final DumperConfiguration dumperConfig, final IngestPosition<BinlogPosition> binlogPosition,
-                                  final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel) {
-        super(dumperConfig, binlogPosition, dataSourceManager, channel);
+                                  final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) {
+        super(dumperConfig, binlogPosition, channel, metaDataLoader);
         this.binlogPosition = (BinlogPosition) binlogPosition;
         this.dumperConfig = dumperConfig;
         Preconditions.checkArgument(dumperConfig.getDataSourceConfig() instanceof StandardPipelineDataSourceConfiguration, "MySQLBinlogDumper only support StandardPipelineDataSourceConfiguration");
         this.channel = channel;
-        metaDataLoader = new PipelineTableMetaDataLoader(dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig()));
+        this.metaDataLoader = metaDataLoader;
     }
     
     @Override
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLInventoryDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLInventoryDumper.java
index 809d5b9..5a22aec 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLInventoryDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLInventoryDumper.java
@@ -19,9 +19,10 @@ package org.apache.shardingsphere.data.pipeline.mysql.ingest;
 
 import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
-import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.AbstractInventoryDumper;
+import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 
+import javax.sql.DataSource;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
@@ -36,8 +37,9 @@ public final class MySQLInventoryDumper extends AbstractInventoryDumper {
     
     private static final String YEAR_DATA_TYPE = "YEAR";
     
-    public MySQLInventoryDumper(final InventoryDumperConfiguration inventoryDumperConfig, final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel) {
-        super(inventoryDumperConfig, dataSourceManager, channel);
+    public MySQLInventoryDumper(final InventoryDumperConfiguration inventoryDumperConfig, final PipelineChannel channel,
+                                final DataSource dataSource, final PipelineTableMetaDataLoader metaDataLoader) {
+        super(inventoryDumperConfig, channel, dataSource, metaDataLoader);
         Properties queryProps = new Properties();
         queryProps.setProperty("yearIsDateType", Boolean.FALSE.toString());
         inventoryDumperConfig.getDataSourceConfig().appendJDBCQueryProperties(queryProps);
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
index 1b44d6a..173ee39 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
@@ -27,6 +27,7 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
 import org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.MultiplexMemoryPipelineChannel;
+import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.util.ReflectionUtil;
 import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.BinlogPosition;
 import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.AbstractBinlogEvent;
@@ -34,6 +35,7 @@ import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.DeleteR
 import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.PlaceholderEvent;
 import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.UpdateRowsEvent;
 import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.WriteRowsEvent;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -56,12 +58,20 @@ public final class MySQLIncrementalDumperTest {
     
     private MultiplexMemoryPipelineChannel channel;
     
+    private final PipelineDataSourceManager dataSourceManager = new PipelineDataSourceManager();
+    
     @Before
     public void setUp() {
         DumperConfiguration dumperConfig = mockDumperConfiguration();
         initTableData(dumperConfig);
         channel = new MultiplexMemoryPipelineChannel();
-        incrementalDumper = new MySQLIncrementalDumper(dumperConfig, new BinlogPosition("binlog-000001", 4L), new PipelineDataSourceManager(), channel);
+        PipelineTableMetaDataLoader metaDataLoader = new PipelineTableMetaDataLoader(dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig()));
+        incrementalDumper = new MySQLIncrementalDumper(dumperConfig, new BinlogPosition("binlog-000001", 4L), channel, metaDataLoader);
+    }
+    
+    @After
+    public void tearDown() {
+        dataSourceManager.close();
     }
     
     private DumperConfiguration mockDumperConfiguration() {
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLJdbcDumperTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLJdbcDumperTest.java
index d74c70d..5dbfcf3 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLJdbcDumperTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLJdbcDumperTest.java
@@ -20,13 +20,14 @@ package org.apache.shardingsphere.data.pipeline.mysql.ingest;
 import lombok.SneakyThrows;
 import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.SimpleMemoryPipelineChannel;
+import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 
 import javax.sql.DataSource;
@@ -45,22 +46,20 @@ import static org.mockito.Mockito.when;
 @RunWith(MockitoJUnitRunner.class)
 public final class MySQLJdbcDumperTest {
     
-    private PipelineDataSourceManager dataSourceManager;
-    
     private MySQLInventoryDumper mysqlJdbcDumper;
     
-    @Mock
-    private Connection connection;
-    
     @Before
     public void setUp() {
-        dataSourceManager = new PipelineDataSourceManager();
-        mysqlJdbcDumper = new MySQLInventoryDumper(mockInventoryDumperConfiguration(), dataSourceManager, new SimpleMemoryPipelineChannel(100));
+        PipelineDataSourceManager dataSourceManager = new PipelineDataSourceManager();
+        InventoryDumperConfiguration dumperConfig = mockInventoryDumperConfiguration();
+        PipelineDataSourceWrapper dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig());
+        mysqlJdbcDumper = new MySQLInventoryDumper(mockInventoryDumperConfiguration(), new SimpleMemoryPipelineChannel(100),
+                dataSource, new PipelineTableMetaDataLoader(dataSource));
+        initTableData(dataSource);
     }
     
     private InventoryDumperConfiguration mockInventoryDumperConfiguration() {
         DumperConfiguration dumperConfig = mockDumperConfiguration();
-        initTableData(dumperConfig);
         InventoryDumperConfiguration result = new InventoryDumperConfiguration(dumperConfig);
         result.setTableName("t_order");
         return result;
@@ -73,8 +72,7 @@ public final class MySQLJdbcDumperTest {
     }
     
     @SneakyThrows(SQLException.class)
-    private void initTableData(final DumperConfiguration dumperConfig) {
-        DataSource dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig());
+    private void initTableData(final DataSource dataSource) {
         try (Connection connection = dataSource.getConnection();
              Statement statement = connection.createStatement()) {
             statement.execute("DROP TABLE IF EXISTS t_order");
@@ -98,6 +96,7 @@ public final class MySQLJdbcDumperTest {
     
     @Test
     public void assertCreatePreparedStatement() throws SQLException {
+        Connection connection = mock(Connection.class);
         when(connection.prepareStatement("", ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)).thenReturn(mock(PreparedStatement.class));
         PreparedStatement preparedStatement = mysqlJdbcDumper.createPreparedStatement(connection, "");
         verify(preparedStatement).setFetchSize(Integer.MIN_VALUE);
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
index a91ba15..693d973 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
@@ -23,10 +23,10 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.Standa
 import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
-import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.core.datasource.creator.PipelineDataSourceCreatorFactory;
 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.AbstractIncrementalDumper;
 import org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
+import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
 import org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.OpenGaussLogicalReplication;
 import org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.decode.MppdbDecodingPlugin;
@@ -64,15 +64,15 @@ public final class OpenGaussWalDumper extends AbstractIncrementalDumper<WalPosit
     private final PipelineChannel channel;
     
     public OpenGaussWalDumper(final DumperConfiguration dumperConfig, final IngestPosition<WalPosition> position,
-                              final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel) {
-        super(dumperConfig, position, dataSourceManager, channel);
+                              final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) {
+        super(dumperConfig, position, channel, metaDataLoader);
         walPosition = (WalPosition) position;
         if (!StandardPipelineDataSourceConfiguration.class.equals(dumperConfig.getDataSourceConfig().getClass())) {
             throw new UnsupportedOperationException("PostgreSQLWalDumper only support PipelineDataSourceConfiguration");
         }
         this.dumperConfig = dumperConfig;
         this.channel = channel;
-        walEventConverter = new WalEventConverter(dumperConfig, dataSourceManager);
+        walEventConverter = new WalEventConverter(dumperConfig, metaDataLoader);
     }
     
     @Override
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLInventoryDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLInventoryDumper.java
index 4b9a005..2f6a38b 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLInventoryDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLInventoryDumper.java
@@ -19,10 +19,11 @@ package org.apache.shardingsphere.data.pipeline.postgresql.ingest;
 
 import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
-import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.AbstractInventoryDumper;
+import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 import org.postgresql.util.PGobject;
 
+import javax.sql.DataSource;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
@@ -38,8 +39,9 @@ public final class PostgreSQLInventoryDumper extends AbstractInventoryDumper {
     
     private static final String PG_BIT_TYPE = "bit";
     
-    public PostgreSQLInventoryDumper(final InventoryDumperConfiguration inventoryDumperConfig, final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel) {
-        super(inventoryDumperConfig, dataSourceManager, channel);
+    public PostgreSQLInventoryDumper(final InventoryDumperConfiguration inventoryDumperConfig, final PipelineChannel channel,
+                                     final DataSource dataSource, final PipelineTableMetaDataLoader metaDataLoader) {
+        super(inventoryDumperConfig, channel, dataSource, metaDataLoader);
     }
     
     @Override
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java
index 4ae61bc..effdcf1 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java
@@ -23,9 +23,9 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.Standa
 import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
-import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.AbstractIncrementalDumper;
 import org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
+import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.LogicalReplication;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WalEventConverter;
@@ -60,15 +60,15 @@ public final class PostgreSQLWalDumper extends AbstractIncrementalDumper<WalPosi
     private final PipelineChannel channel;
     
     public PostgreSQLWalDumper(final DumperConfiguration dumperConfig, final IngestPosition<WalPosition> position,
-                               final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel) {
-        super(dumperConfig, position, dataSourceManager, channel);
+                               final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) {
+        super(dumperConfig, position, channel, metaDataLoader);
         walPosition = (WalPosition) position;
         if (!StandardPipelineDataSourceConfiguration.class.equals(dumperConfig.getDataSourceConfig().getClass())) {
             throw new UnsupportedOperationException("PostgreSQLWalDumper only support PipelineDataSourceConfiguration");
         }
         this.dumperConfig = dumperConfig;
         this.channel = channel;
-        walEventConverter = new WalEventConverter(dumperConfig, dataSourceManager);
+        walEventConverter = new WalEventConverter(dumperConfig, metaDataLoader);
     }
     
     @Override
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverter.java
index c65987f..57038e5 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverter.java
@@ -22,7 +22,6 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
-import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
 import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
@@ -44,9 +43,9 @@ public final class WalEventConverter {
     
     private final PipelineTableMetaDataLoader metaDataLoader;
     
-    public WalEventConverter(final DumperConfiguration dumperConfig, final PipelineDataSourceManager dataSourceManager) {
+    public WalEventConverter(final DumperConfiguration dumperConfig, final PipelineTableMetaDataLoader metaDataLoader) {
         this.dumperConfig = dumperConfig;
-        metaDataLoader = new PipelineTableMetaDataLoader(dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig()));
+        this.metaDataLoader = metaDataLoader;
     }
     
     /**
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLJdbcDumperTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLJdbcDumperTest.java
index 7733e00..7c0b852 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLJdbcDumperTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLJdbcDumperTest.java
@@ -20,9 +20,11 @@ package org.apache.shardingsphere.data.pipeline.postgresql.ingest;
 import lombok.SneakyThrows;
 import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.SimpleMemoryPipelineChannel;
+import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -37,27 +39,29 @@ import static org.junit.Assert.assertThat;
 
 public final class PostgreSQLJdbcDumperTest {
     
-    private PipelineDataSourceManager dataSourceManager;
+    private PipelineDataSourceWrapper dataSource;
     
     private PostgreSQLInventoryDumper jdbcDumper;
     
     @Before
     public void setUp() {
-        dataSourceManager = new PipelineDataSourceManager();
-        jdbcDumper = new PostgreSQLInventoryDumper(mockInventoryDumperConfiguration(), dataSourceManager, new SimpleMemoryPipelineChannel(100));
+        PipelineDataSourceManager dataSourceManager = new PipelineDataSourceManager();
+        InventoryDumperConfiguration dumperConfig = mockInventoryDumperConfiguration();
+        dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig());
+        jdbcDumper = new PostgreSQLInventoryDumper(mockInventoryDumperConfiguration(), new SimpleMemoryPipelineChannel(100),
+                dataSource, new PipelineTableMetaDataLoader(dataSource));
+        initTableData(dataSource);
     }
     
     private InventoryDumperConfiguration mockInventoryDumperConfiguration() {
         DumperConfiguration dumperConfig = mockDumperConfiguration();
-        initTableData(dumperConfig);
         InventoryDumperConfiguration result = new InventoryDumperConfiguration(dumperConfig);
         result.setTableName("t_order");
         return result;
     }
     
     @SneakyThrows(SQLException.class)
-    private void initTableData(final DumperConfiguration dumperConfig) {
-        DataSource dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig());
+    private void initTableData(final DataSource dataSource) {
         try (Connection connection = dataSource.getConnection();
              Statement statement = connection.createStatement()) {
             statement.execute("DROP TABLE IF EXISTS t_order");
@@ -68,7 +72,6 @@ public final class PostgreSQLJdbcDumperTest {
     
     @Test
     public void assertCreatePreparedStatement() throws SQLException {
-        DataSource dataSource = dataSourceManager.getDataSource(mockDumperConfiguration().getDataSourceConfig());
         try (Connection connection = dataSource.getConnection();
              PreparedStatement preparedStatement = jdbcDumper.createPreparedStatement(connection, "SELECT * FROM t_order")) {
             assertThat(preparedStatement.getFetchSize(), is(1));
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java
index c54da70..dbaf5a6 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java
@@ -18,14 +18,17 @@
 package org.apache.shardingsphere.data.pipeline.postgresql.ingest;
 
 import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.MultiplexMemoryPipelineChannel;
 import org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
+import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.util.ReflectionUtil;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.LogicalReplication;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WalPosition;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.PostgreSQLLogSequenceNumber;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -61,17 +64,26 @@ public final class PostgreSQLWalDumperTest {
     
     private WalPosition position;
     
-    private PostgreSQLWalDumper walDumper;
+    private DumperConfiguration dumperConfig;
     
-    private StandardPipelineDataSourceConfiguration pipelineDataSourceConfig;
+    private PostgreSQLWalDumper walDumper;
     
     private MultiplexMemoryPipelineChannel channel;
     
+    private final PipelineDataSourceManager dataSourceManager = new PipelineDataSourceManager();
+    
     @Before
     public void setUp() {
         position = new WalPosition(new PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(100L)));
         channel = new MultiplexMemoryPipelineChannel();
-        walDumper = new PostgreSQLWalDumper(mockDumperConfiguration(), position, new PipelineDataSourceManager(), channel);
+        dumperConfig = mockDumperConfiguration();
+        PipelineTableMetaDataLoader metaDataLoader = new PipelineTableMetaDataLoader(dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig()));
+        walDumper = new PostgreSQLWalDumper(dumperConfig, position, channel, metaDataLoader);
+    }
+    
+    @After
+    public void tearDown() {
+        dataSourceManager.close();
     }
     
     private DumperConfiguration mockDumperConfiguration() {
@@ -86,9 +98,9 @@ public final class PostgreSQLWalDumperTest {
         } catch (final SQLException e) {
             throw new RuntimeException("Init table failed", e);
         }
-        pipelineDataSourceConfig = new StandardPipelineDataSourceConfiguration(jdbcUrl, username, password);
+        PipelineDataSourceConfiguration dataSourceConfig = new StandardPipelineDataSourceConfiguration(jdbcUrl, username, password);
         DumperConfiguration result = new DumperConfiguration();
-        result.setDataSourceConfig(pipelineDataSourceConfig);
+        result.setDataSourceConfig(dataSourceConfig);
         Map<String, String> tableNameMap = new HashMap<>();
         tableNameMap.put("t_order_0", "t_order");
         result.setTableNameMap(tableNameMap);
@@ -97,9 +109,10 @@ public final class PostgreSQLWalDumperTest {
     
     @Test
     public void assertStart() throws SQLException, NoSuchFieldException, IllegalAccessException {
+        StandardPipelineDataSourceConfiguration dataSourceConfig = (StandardPipelineDataSourceConfiguration) dumperConfig.getDataSourceConfig();
         try {
             ReflectionUtil.setFieldValue(walDumper, "logicalReplication", logicalReplication);
-            when(logicalReplication.createConnection(pipelineDataSourceConfig)).thenReturn(pgConnection);
+            when(logicalReplication.createConnection(dataSourceConfig)).thenReturn(pgConnection);
             when(pgConnection.unwrap(PgConnection.class)).thenReturn(pgConnection);
             when(logicalReplication.createReplicationStream(pgConnection, PostgreSQLPositionInitializer.getUniqueSlotName(pgConnection), position.getLogSequenceNumber()))
                     .thenReturn(pgReplicationStream);
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverterTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverterTest.java
index 1a18864..df3f1ab 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverterTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverterTest.java
@@ -25,11 +25,13 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderReco
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
+import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractRowEvent;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.DeleteRowEvent;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.PlaceholderEvent;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.UpdateRowEvent;
 import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.WriteRowEvent;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -48,13 +50,20 @@ public final class WalEventConverterTest {
     
     private WalEventConverter walEventConverter;
     
+    private final PipelineDataSourceManager dataSourceManager = new PipelineDataSourceManager();
+    
     @Before
     public void setUp() {
         DumperConfiguration dumperConfig = mockDumperConfiguration();
-        walEventConverter = new WalEventConverter(dumperConfig, new PipelineDataSourceManager());
+        walEventConverter = new WalEventConverter(dumperConfig, new PipelineTableMetaDataLoader(dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig())));
         initTableData(dumperConfig);
     }
     
+    @After
+    public void tearDown() {
+        dataSourceManager.close();
+    }
+    
     private DumperConfiguration mockDumperConfiguration() {
         DumperConfiguration result = new DumperConfiguration();
         result.setDataSourceConfig(new StandardPipelineDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=PostgreSQL", "root", "root"));
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
index 7349617..2ed943b 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.data.pipeline.api.impl;
 import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
 import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
@@ -28,6 +29,7 @@ import org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstan
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.core.fixture.EmbedTestingServer;
 import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.JobProgressYamlSwapper;
+import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
 import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
@@ -52,6 +54,7 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
 
 public final class GovernanceRepositoryAPIImplTest {
     
@@ -130,14 +133,17 @@ public final class GovernanceRepositoryAPIImplTest {
         dumperConfig.setTableName("t_order");
         dumperConfig.setPrimaryKey("order_id");
         dumperConfig.setShardingItem(0);
+        PipelineDataSourceWrapper dataSource = mock(PipelineDataSourceWrapper.class);
+        PipelineTableMetaDataLoader metaDataLoader = new PipelineTableMetaDataLoader(dataSource);
         return new InventoryTask(dumperConfig, taskConfig.getImporterConfig(), PipelineContextUtil.getPipelineChannelFactory(),
-                new PipelineDataSourceManager(), PipelineContextUtil.getExecuteEngine());
+                new PipelineDataSourceManager(), dataSource, metaDataLoader, PipelineContextUtil.getExecuteEngine());
     }
     
     private IncrementalTask mockIncrementalTask(final TaskConfiguration taskConfig) {
         DumperConfiguration dumperConfig = taskConfig.getDumperConfig();
         dumperConfig.setPosition(new PlaceholderPosition());
+        PipelineTableMetaDataLoader metaDataLoader = new PipelineTableMetaDataLoader(mock(PipelineDataSourceWrapper.class));
         return new IncrementalTask(3, dumperConfig, taskConfig.getImporterConfig(), PipelineContextUtil.getPipelineChannelFactory(),
-                new PipelineDataSourceManager(), PipelineContextUtil.getExecuteEngine());
+                new PipelineDataSourceManager(), metaDataLoader, PipelineContextUtil.getExecuteEngine());
     }
 }
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureIncrementalDumper.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureIncrementalDumper.java
index 32bec41..b97eebb 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureIncrementalDumper.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureIncrementalDumper.java
@@ -21,14 +21,14 @@ import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfigura
 import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
-import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.AbstractIncrementalDumper;
+import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 
 public final class FixtureIncrementalDumper extends AbstractIncrementalDumper<FinishedPosition> {
     
     public FixtureIncrementalDumper(final DumperConfiguration dumperConfig, final IngestPosition<FinishedPosition> position,
-                                    final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel) {
-        super(dumperConfig, position, dataSourceManager, channel);
+                                    final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) {
+        super(dumperConfig, position, channel, metaDataLoader);
     }
     
     @Override
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureInventoryDumper.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureInventoryDumper.java
index 7bc7a8c..3da4758 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureInventoryDumper.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureInventoryDumper.java
@@ -19,17 +19,19 @@ package org.apache.shardingsphere.data.pipeline.core.fixture;
 
 import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
-import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.AbstractInventoryDumper;
+import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 
+import javax.sql.DataSource;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
 
 public final class FixtureInventoryDumper extends AbstractInventoryDumper {
     
-    public FixtureInventoryDumper(final InventoryDumperConfiguration dumperConfig, final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel) {
-        super(dumperConfig, dataSourceManager, channel);
+    public FixtureInventoryDumper(final InventoryDumperConfiguration dumperConfig, final PipelineChannel channel,
+                                  final DataSource dataSource, final PipelineTableMetaDataLoader metaDataLoader) {
+        super(dumperConfig, channel, dataSource, metaDataLoader);
     }
     
     @Override
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
index bf1c14f..52765c2 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
@@ -18,8 +18,10 @@
 package org.apache.shardingsphere.data.pipeline.core.task;
 
 import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
+import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
 import org.apache.shardingsphere.data.pipeline.core.util.ResourceUtil;
 import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
@@ -31,6 +33,7 @@ import org.junit.Test;
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
 
 public final class IncrementalTaskTest {
     
@@ -45,9 +48,10 @@ public final class IncrementalTaskTest {
     public void setUp() {
         TaskConfiguration taskConfig = new RuleAlteredJobContext(ResourceUtil.mockJobConfig()).getTaskConfig();
         taskConfig.getDumperConfig().setPosition(new PlaceholderPosition());
+        PipelineTableMetaDataLoader metaDataLoader = new PipelineTableMetaDataLoader(mock(PipelineDataSourceWrapper.class));
         incrementalTask = new IncrementalTask(3, taskConfig.getDumperConfig(), taskConfig.getImporterConfig(),
                 PipelineContextUtil.getPipelineChannelFactory(),
-                new PipelineDataSourceManager(), PipelineContextUtil.getExecuteEngine());
+                new PipelineDataSourceManager(), metaDataLoader, PipelineContextUtil.getExecuteEngine());
     }
     
     @Test
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
index 57320bc..b80d201 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
@@ -26,9 +26,11 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPositio
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.PrimaryKeyPosition;
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
+import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
 import org.apache.shardingsphere.data.pipeline.core.util.ResourceUtil;
 import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
+import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -42,12 +44,19 @@ public final class InventoryTaskTest {
     
     private static TaskConfiguration taskConfig;
     
+    private static final PipelineDataSourceManager DATA_SOURCE_MANAGER = new PipelineDataSourceManager();
+    
     @BeforeClass
     public static void beforeClass() {
         PipelineContextUtil.mockModeConfig();
         taskConfig = new RuleAlteredJobContext(ResourceUtil.mockJobConfig()).getTaskConfig();
     }
     
+    @AfterClass
+    public static void afterClass() {
+        DATA_SOURCE_MANAGER.close();
+    }
+    
     @Test(expected = IngestException.class)
     public void assertStartWithGetEstimatedRowsFailure() {
         InventoryDumperConfiguration inventoryDumperConfig = new InventoryDumperConfiguration(taskConfig.getDumperConfig());
@@ -57,9 +66,11 @@ public final class InventoryTaskTest {
             position = new PrimaryKeyPosition(0, 1000);
         }
         inventoryDumperConfig.setPosition(position);
+        PipelineDataSourceWrapper dataSource = DATA_SOURCE_MANAGER.getDataSource(inventoryDumperConfig.getDataSourceConfig());
+        PipelineTableMetaDataLoader metaDataLoader = new PipelineTableMetaDataLoader(dataSource);
         try (InventoryTask inventoryTask = new InventoryTask(inventoryDumperConfig, taskConfig.getImporterConfig(),
                 PipelineContextUtil.getPipelineChannelFactory(),
-                new PipelineDataSourceManager(), PipelineContextUtil.getExecuteEngine())) {
+                DATA_SOURCE_MANAGER, dataSource, metaDataLoader, PipelineContextUtil.getExecuteEngine())) {
             inventoryTask.start();
         }
     }
@@ -74,9 +85,11 @@ public final class InventoryTaskTest {
             position = new PrimaryKeyPosition(0, 1000);
         }
         inventoryDumperConfig.setPosition(position);
+        PipelineDataSourceWrapper dataSource = DATA_SOURCE_MANAGER.getDataSource(inventoryDumperConfig.getDataSourceConfig());
+        PipelineTableMetaDataLoader metaDataLoader = new PipelineTableMetaDataLoader(dataSource);
         try (InventoryTask inventoryTask = new InventoryTask(inventoryDumperConfig, taskConfig.getImporterConfig(),
                 PipelineContextUtil.getPipelineChannelFactory(),
-                new PipelineDataSourceManager(), PipelineContextUtil.getExecuteEngine())) {
+                new PipelineDataSourceManager(), dataSource, metaDataLoader, PipelineContextUtil.getExecuteEngine())) {
             inventoryTask.start();
             assertFalse(inventoryTask.getProgress().getPosition() instanceof FinishedPosition);
         }
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitterTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitterTest.java
index b7ae201..8a03427 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitterTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitterTest.java
@@ -51,12 +51,12 @@ public final class InventoryTaskSplitterTest {
     @Before
     public void setUp() {
         initJobContext();
-        dataSourceManager = new PipelineDataSourceManager();
         inventoryTaskSplitter = new InventoryTaskSplitter();
     }
     
     private void initJobContext() {
         jobContext = new RuleAlteredJobContext(ResourceUtil.mockJobConfig());
+        dataSourceManager = jobContext.getDataSourceManager();
         taskConfig = jobContext.getTaskConfig();
     }
     
@@ -64,7 +64,7 @@ public final class InventoryTaskSplitterTest {
     public void assertSplitInventoryDataWithEmptyTable() throws SQLException {
         taskConfig.getHandleConfig().setShardingSize(10);
         initEmptyTablePrimaryEnvironment(taskConfig.getDumperConfig());
-        List<InventoryTask> actual = inventoryTaskSplitter.splitInventoryData(jobContext, dataSourceManager);
+        List<InventoryTask> actual = inventoryTaskSplitter.splitInventoryData(jobContext);
         assertNotNull(actual);
         assertThat(actual.size(), is(1));
         assertThat(((PrimaryKeyPosition) actual.get(0).getProgress().getPosition()).getBeginValue(), is(0L));
@@ -75,7 +75,7 @@ public final class InventoryTaskSplitterTest {
     public void assertSplitInventoryDataWithIntPrimary() throws SQLException {
         taskConfig.getHandleConfig().setShardingSize(10);
         initIntPrimaryEnvironment(taskConfig.getDumperConfig());
-        List<InventoryTask> actual = inventoryTaskSplitter.splitInventoryData(jobContext, dataSourceManager);
+        List<InventoryTask> actual = inventoryTaskSplitter.splitInventoryData(jobContext);
         assertNotNull(actual);
         assertThat(actual.size(), is(10));
         assertThat(((PrimaryKeyPosition) actual.get(9).getProgress().getPosition()).getBeginValue(), is(91L));
@@ -85,7 +85,7 @@ public final class InventoryTaskSplitterTest {
     @Test
     public void assertSplitInventoryDataWithCharPrimary() throws SQLException {
         initCharPrimaryEnvironment(taskConfig.getDumperConfig());
-        List<InventoryTask> actual = inventoryTaskSplitter.splitInventoryData(jobContext, dataSourceManager);
+        List<InventoryTask> actual = inventoryTaskSplitter.splitInventoryData(jobContext);
         assertNotNull(actual);
         assertThat(actual.size(), is(1));
     }
@@ -93,7 +93,7 @@ public final class InventoryTaskSplitterTest {
     @Test
     public void assertSplitInventoryDataWithUnionPrimary() throws SQLException {
         initUnionPrimaryEnvironment(taskConfig.getDumperConfig());
-        List<InventoryTask> actual = inventoryTaskSplitter.splitInventoryData(jobContext, dataSourceManager);
+        List<InventoryTask> actual = inventoryTaskSplitter.splitInventoryData(jobContext);
         assertNotNull(actual);
         assertThat(actual.size(), is(1));
     }
@@ -101,7 +101,7 @@ public final class InventoryTaskSplitterTest {
     @Test
     public void assertSplitInventoryDataWithoutPrimary() throws SQLException {
         initNoPrimaryEnvironment(taskConfig.getDumperConfig());
-        List<InventoryTask> actual = inventoryTaskSplitter.splitInventoryData(jobContext, dataSourceManager);
+        List<InventoryTask> actual = inventoryTaskSplitter.splitInventoryData(jobContext);
         assertNotNull(actual);
         assertThat(actual.size(), is(1));
     }