You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by az...@apache.org on 2023/06/02 10:20:38 UTC

[shardingsphere] branch master updated: Refactor CDC layer (#25994)

This is an automated email from the ASF dual-hosted git repository.

azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 2d7bc66a905 Refactor CDC layer (#25994)
2d7bc66a905 is described below

commit 2d7bc66a905693a2eaab1f0d6ae61b2fc45c8510
Author: Hongsheng Zhong <zh...@apache.org>
AuthorDate: Fri Jun 2 18:20:27 2023 +0800

    Refactor CDC layer (#25994)
    
    * Move AutoCloseable to PipelineTask
    
    * Extract TaskExecuteCallback from InventoryTask and IncrementalTask
    
    * Replace AutoCloseable to Closeable for PipelineTask
    
    * Replace InventoryTask and IncrementalTask in InventoryIncrementalJobItemContext to PipelineTask, update related classes
    
    * Extract AckCallbacks
    
    * Extract PipelineTaskUtils
    
    * Add inventory and incremental task for CDC; Refactor CDCJobPreparer
    
    * Simplify InventoryTask params for better reuse
    
    * Simplify IncrementalTask params for better reuse
    
    * Remove getConnector() in migration
    
    * Rename ImporterConnector to PipelineSink
    
    * Rename connector to sink
    
    * Remove jobShardingCount in PipelineSocketSink
    
    * Add SingleChannelConsumerImporter
    
    * Merge DataSourceImporter and PipelineDataSourceSink
    
    * Replace DataSourceImporter ref; Remove DataSourceImporterCreator
    
    * Add CDCImporter, CDCAckId;
    Remove SocketSinkImporter, SocketSinkImporterCreator, CDCAckHolder, CDCDataRecordUtils, DataRecordComparatorGenerator;
    Refactor PipelineChannel, PipelineSink and impls;
    
    * Refactor CDCJob execution
    
    * Refactor CDC job composing
    
    * Remove unused classes; Remove PipelineSink.getType
    
    * Refactor PipelineSink.getConnector() to identifierMatched()
    
    * Refactor TreeMap to PriorityQueue in CDCImporter
    
    * Update core unit test
    
    * Update it unit test
    
    * Remove channel from InventoryTask and IncrementalTask
    
    * Refactor CDC job composing
    
    * Update
    
    * Clean code
    
    * Simplify InventoryIncrementalTasksRunner constructor param
    
    * Refactor CDCTasksRunner construction and stop; Cache CDCImporter before running
    
    * Refactor CDCImporter, add CDCChannelProgressPair
    
    * Update to pass CDC E2E MySQL
    
    * Update to pass CDC E2E openGauss
    
    * Add missed license header
    
    * Use Caffeine cache replace Guava cache
---
 .../api/executor/AbstractLifecycleExecutor.java    |   9 -
 .../api/ingest/channel/PipelineChannel.java        |  22 +-
 kernel/data-pipeline/cdc/core/pom.xml              |  16 +-
 .../data/pipeline/cdc/api/impl/CDCJobAPI.java      |  15 +-
 .../cdc/context/job/CDCJobItemContext.java         |  22 +-
 .../data/pipeline/cdc/core/ack/CDCAckHolder.java   |  87 -------
 .../data/pipeline/cdc/core/ack/CDCAckId.java       |  67 ++++++
 .../data/pipeline/cdc/core/ack/CDCAckPosition.java |  14 --
 .../connector/SocketSinkImporterConnector.java     | 250 ---------------------
 .../cdc/core/importer/CDCChannelProgressPair.java} |  27 +--
 .../pipeline/cdc/core/importer/CDCImporter.java    | 214 ++++++++++++++++++
 .../cdc/core/importer/CDCImporterManager.java}     |  49 ++--
 .../pipeline/cdc/core/importer/CSNRecords.java}    |  30 +--
 .../cdc/core/importer/CSNRecordsComparator.java    |  23 +-
 .../cdc/core/importer/SocketSinkImporter.java      | 120 ----------
 .../core/importer/SocketSinkImporterCreator.java   |  44 ----
 .../cdc/core/importer/sink/CDCSocketSink.java      | 113 ++++++++++
 .../data/pipeline/cdc/core/job/CDCJob.java         | 164 ++++++++++++--
 .../pipeline/cdc/core/prepare/CDCJobPreparer.java  | 116 +++++++---
 .../pipeline/cdc/core/task/CDCIncrementalTask.java |  79 +++++++
 .../pipeline/cdc/core/task/CDCInventoryTask.java   |  86 +++++++
 .../pipeline/cdc/core/task/CDCTasksRunner.java     |  39 +++-
 .../generator/DataRecordComparatorGenerator.java   |  47 ----
 .../pipeline/cdc/handler/CDCBackendHandler.java    |  45 ++--
 .../data/pipeline/cdc/util/CDCDataRecordUtils.java | 115 ----------
 ...here.data.pipeline.spi.importer.ImporterCreator |  18 --
 .../pipeline/cdc/core/ack/CDCAckHolderTest.java    |  64 ------
 .../data/pipeline/cdc/core/ack/CDCAckIdTest.java   |  24 +-
 .../core/importer/CSNRecordsComparatorTest.java    |  46 ++++
 .../importer/SocketSinkImporterCreatorTest.java    |  48 ----
 .../DataRecordComparatorGeneratorTest.java         |  55 -----
 .../pipeline/cdc/util/CDCDataRecordUtilsTest.java  |  73 ------
 .../InventoryIncrementalJobItemContext.java        |   8 +-
 .../core/importer/DataSourceImporterCreator.java   |  44 ----
 .../importer/SingleChannelConsumerImporter.java    |  74 ++++++
 .../PipelineDataSourceSink.java}                   |  55 ++---
 .../pipeline/core/ingest/channel/AckCallbacks.java |  61 +++++
 .../memory/MultiplexMemoryPipelineChannel.java     |  12 +-
 .../memory/SimpleMemoryPipelineChannel.java        |  26 ++-
 .../core/ingest/dumper/InventoryDumper.java        |   4 +-
 .../pipeline/core/job/AbstractPipelineJob.java     |   2 +-
 .../data/pipeline/core/job/PipelineJobCenter.java  |  10 +
 .../core/prepare/InventoryTaskSplitter.java        |  29 ++-
 .../data/pipeline/core/task/IncrementalTask.java   |  93 +-------
 .../core/task/InventoryIncrementalTasksRunner.java |  15 +-
 .../data/pipeline/core/task/InventoryTask.java     |  76 +------
 .../data/pipeline/core/task/PipelineTask.java      |   3 +-
 .../data/pipeline/core/task/PipelineTaskUtils.java |  89 ++++++++
 .../TaskExecuteCallback.java}                      |  24 +-
 .../data/pipeline/core/util/PipelineJdbcUtils.java |  20 ++
 .../pipeline/spi/importer/ImporterCreator.java     |  47 ----
 .../importer/sink/PipelineSink.java}               |  45 ++--
 ...here.data.pipeline.spi.importer.ImporterCreator |  18 --
 .../pipeline/scenario/migration/MigrationJob.java  |   3 +-
 .../migration/context/MigrationJobItemContext.java |   9 +-
 .../migration/prepare/MigrationJobPreparer.java    |  60 +++--
 .../pipeline/cases/PipelineContainerComposer.java  |   9 +-
 .../test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java |   2 +-
 .../cases/cdc/DataSourceRecordConsumer.java        |   1 +
 .../api/impl/GovernanceRepositoryAPIImplTest.java  |  20 +-
 .../pipeline/core/fixture/FixtureImporter.java     |   4 +-
 .../core/fixture/FixtureImporterCreator.java       |  44 ----
 .../FixtureInventoryIncrementalJobItemContext.java |   4 +-
 ...erTest.java => PipelineDataSourceSinkTest.java} |  32 +--
 .../core/prepare/InventoryTaskSplitterTest.java    |   2 +-
 .../pipeline/core/task/IncrementalTaskTest.java    |  15 +-
 .../data/pipeline/core/task/InventoryTaskTest.java |  27 ++-
 ...here.data.pipeline.spi.importer.ImporterCreator |  18 --
 68 files changed, 1501 insertions(+), 1645 deletions(-)

diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/executor/AbstractLifecycleExecutor.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/executor/AbstractLifecycleExecutor.java
index 463da675642..ac008c426aa 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/executor/AbstractLifecycleExecutor.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/executor/AbstractLifecycleExecutor.java
@@ -23,7 +23,6 @@ import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
 
 import java.sql.SQLException;
-import java.sql.Statement;
 import java.time.Instant;
 import java.time.LocalDateTime;
 import java.time.ZoneId;
@@ -50,7 +49,6 @@ public abstract class AbstractLifecycleExecutor implements LifecycleExecutor {
         running = true;
         startTimeMillis = System.currentTimeMillis();
         runBlocking();
-        stop();
     }
     
     /**
@@ -78,13 +76,6 @@ public abstract class AbstractLifecycleExecutor implements LifecycleExecutor {
     
     protected abstract void doStop() throws SQLException;
     
-    protected final void cancelStatement(final Statement statement) throws SQLException {
-        if (null == statement || statement.isClosed()) {
-            return;
-        }
-        statement.cancel();
-    }
-    
     @Override
     public final void run() {
         start();
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/channel/PipelineChannel.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/channel/PipelineChannel.java
index 25ce5a44a59..c99a7986b1e 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/channel/PipelineChannel.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/channel/PipelineChannel.java
@@ -19,13 +19,14 @@ package org.apache.shardingsphere.data.pipeline.api.ingest.channel;
 
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
 
+import java.io.Closeable;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 /**
  * Pipeline channel.
  */
-public interface PipelineChannel {
+public interface PipelineChannel extends Closeable {
     
     /**
      * Push {@code DataRecord} into channel.
@@ -41,15 +42,30 @@ public interface PipelineChannel {
      * @param batchSize record batch size
      * @param timeout timeout
      * @param timeUnit time unit
-     * @return record
+     * @return records of transactions
      */
-    List<Record> fetchRecords(int batchSize, int timeout, TimeUnit timeUnit);
+    List<Record> fetchRecords(int batchSize, long timeout, TimeUnit timeUnit);
+    
+    /**
+     * Peek {@code Record} list from channel.
+     *
+     * @return records of a transaction
+     */
+    List<Record> peekRecords();
+    
+    /**
+     * Poll {@code Record} list from channel.
+     *
+     * @return records of a transaction
+     */
+    List<Record> pollRecords();
     
     /**
      * Ack the last batch.
      *
      * @param records record list
      */
+    // TODO Refactor ack param
     void ack(List<Record> records);
     
     /**
diff --git a/kernel/data-pipeline/cdc/core/pom.xml b/kernel/data-pipeline/cdc/core/pom.xml
index e6992a4fe45..6a0f99297d0 100644
--- a/kernel/data-pipeline/cdc/core/pom.xml
+++ b/kernel/data-pipeline/cdc/core/pom.xml
@@ -40,16 +40,16 @@
         </dependency>
         
         <dependency>
-            <groupId>org.junit.jupiter</groupId>
-            <artifactId>junit-jupiter-api</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.junit.jupiter</groupId>
-            <artifactId>junit-jupiter-engine</artifactId>
+            <groupId>org.apache.shardingsphere</groupId>
+            <artifactId>shardingsphere-test-util</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
         </dependency>
         <dependency>
-            <groupId>org.junit.jupiter</groupId>
-            <artifactId>junit-jupiter-params</artifactId>
+            <groupId>org.apache.shardingsphere</groupId>
+            <artifactId>shardingsphere-test-fixture-jdbc</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
         </dependency>
     </dependencies>
 </project>
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index 7ed1eb70836..0ffa569b115 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -71,7 +71,7 @@ import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDa
 import org.apache.shardingsphere.data.pipeline.core.prepare.PipelineJobPreparerUtils;
 import org.apache.shardingsphere.data.pipeline.core.sharding.ShardingColumnsExtractor;
 import org.apache.shardingsphere.data.pipeline.core.util.JobDataNodeLineConvertUtils;
-import org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
+import org.apache.shardingsphere.data.pipeline.spi.importer.sink.PipelineSink;
 import org.apache.shardingsphere.data.pipeline.spi.job.JobType;
 import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
@@ -204,14 +204,21 @@ public final class CDCJobAPI extends AbstractInventoryIncrementalJobAPIImpl {
         return result;
     }
     
+    @Override
+    protected JobConfigurationPOJO convertJobConfiguration(final PipelineJobConfiguration jobConfig) {
+        JobConfigurationPOJO result = super.convertJobConfiguration(jobConfig);
+        result.setShardingTotalCount(1);
+        return result;
+    }
+    
     /**
      * Start job.
      *
      * @param jobId job id
-     * @param importerConnector importer connector
+     * @param sink sink
      */
-    public void startJob(final String jobId, final ImporterConnector importerConnector) {
-        CDCJob job = new CDCJob(jobId, importerConnector);
+    public void startJob(final String jobId, final PipelineSink sink) {
+        CDCJob job = new CDCJob(jobId, sink);
         PipelineJobCenter.addJob(jobId, job);
         updateJobConfigurationDisabled(jobId, false);
         JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/job/CDCJobItemContext.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/job/CDCJobItemContext.java
index f42ef50242b..51284920cdb 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/job/CDCJobItemContext.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/job/CDCJobItemContext.java
@@ -18,6 +18,7 @@
 package org.apache.shardingsphere.data.pipeline.cdc.context.job;
 
 import lombok.Getter;
+import lombok.RequiredArgsConstructor;
 import lombok.Setter;
 import lombok.SneakyThrows;
 import org.apache.commons.lang3.concurrent.ConcurrentException;
@@ -35,7 +36,7 @@ import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncremental
 import org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
 import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
-import org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
+import org.apache.shardingsphere.data.pipeline.spi.importer.sink.PipelineSink;
 
 import java.util.Collection;
 import java.util.LinkedList;
@@ -44,6 +45,7 @@ import java.util.concurrent.atomic.AtomicLong;
 /**
  * CDC job item context.
  */
+@RequiredArgsConstructor
 @Getter
 public final class CDCJobItemContext implements InventoryIncrementalJobItemContext {
     
@@ -65,7 +67,7 @@ public final class CDCJobItemContext implements InventoryIncrementalJobItemConte
     
     private final PipelineDataSourceManager dataSourceManager;
     
-    private final ImporterConnector importerConnector;
+    private final PipelineSink sink;
     
     private final Collection<PipelineTask> inventoryTasks = new LinkedList<>();
     
@@ -91,17 +93,6 @@ public final class CDCJobItemContext implements InventoryIncrementalJobItemConte
         }
     };
     
-    public CDCJobItemContext(final CDCJobConfiguration jobConfig, final int shardingItem, final InventoryIncrementalJobItemProgress initProgress, final CDCProcessContext jobProcessContext,
-                             final CDCTaskConfiguration taskConfig, final PipelineDataSourceManager dataSourceManager, final ImporterConnector importerConnector) {
-        this.jobConfig = jobConfig;
-        this.shardingItem = shardingItem;
-        this.initProgress = initProgress;
-        this.jobProcessContext = jobProcessContext;
-        this.taskConfig = taskConfig;
-        this.dataSourceManager = dataSourceManager;
-        this.importerConnector = importerConnector;
-    }
-    
     @Override
     public String getJobId() {
         return jobConfig.getJobId();
@@ -134,9 +125,8 @@ public final class CDCJobItemContext implements InventoryIncrementalJobItemConte
         return sourceMetaDataLoaderLazyInitializer.get();
     }
     
-    @Override
-    public ImporterConnector getImporterConnector() {
-        return importerConnector;
+    public PipelineSink getSink() {
+        return sink;
     }
     
     @Override
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckHolder.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckHolder.java
deleted file mode 100644
index 7d5e07fea05..00000000000
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckHolder.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.cdc.core.ack;
-
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
-import org.apache.shardingsphere.data.pipeline.cdc.core.importer.SocketSinkImporter;
-
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * CDC ack holder.
- */
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class CDCAckHolder {
-    
-    private static final CDCAckHolder INSTANCE = new CDCAckHolder();
-    
-    private final Map<String, Map<SocketSinkImporter, CDCAckPosition>> ackIdPositionMap = new ConcurrentHashMap<>();
-    
-    /**
-     * the ack of CDC.
-     *
-     * @param ackId ack id
-     */
-    public void ack(final String ackId) {
-        Map<SocketSinkImporter, CDCAckPosition> importerDataRecordMap = ackIdPositionMap.remove(ackId);
-        if (null != importerDataRecordMap) {
-            importerDataRecordMap.forEach(SocketSinkImporter::ackWithLastDataRecord);
-        }
-    }
-    
-    /**
-     * Bind ack id.
-     *
-     * @param importerDataRecordMap import data record map
-     * @return ack id
-     */
-    public String bindAckIdWithPosition(final Map<SocketSinkImporter, CDCAckPosition> importerDataRecordMap) {
-        String result = generateAckId();
-        // TODO it's might need to persist to registry center in cluster mode.
-        ackIdPositionMap.put(result, importerDataRecordMap);
-        return result;
-    }
-    
-    private String generateAckId() {
-        return "ACK-" + UUID.randomUUID();
-    }
-    
-    /**
-     * Clean up.
-     *
-     * @param socketSinkImporter CDC importer
-     */
-    public void cleanUp(final SocketSinkImporter socketSinkImporter) {
-        if (ackIdPositionMap.isEmpty()) {
-            return;
-        }
-        ackIdPositionMap.entrySet().removeIf(entry -> entry.getValue().containsKey(socketSinkImporter));
-    }
-    
-    /**
-     * Get instance.
-     *
-     * @return CDC ack holder
-     */
-    public static CDCAckHolder getInstance() {
-        return INSTANCE;
-    }
-}
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckId.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckId.java
new file mode 100644
index 00000000000..f9e414cb04c
--- /dev/null
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckId.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.core.ack;
+
+import com.google.common.base.Splitter;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.commons.lang3.RandomStringUtils;
+
+import java.util.List;
+
+/**
+ * CDC ack id.
+ */
+@RequiredArgsConstructor
+@Getter
+public final class CDCAckId {
+    
+    private final String importerId;
+    
+    private final String random;
+    
+    /**
+     * Build ack id.
+     *
+     * @param importerId importer id
+     * @return ack id
+     */
+    public static CDCAckId build(final String importerId) {
+        return new CDCAckId(importerId, RandomStringUtils.randomAlphanumeric(16));
+    }
+    
+    /**
+     * Marshal ack id.
+     *
+     * @return ack id
+     */
+    public String marshal() {
+        return importerId + "_" + random;
+    }
+    
+    /**
+     * Unmarshal ack id from text.
+     *
+     * @param text text
+     * @return ack id
+     */
+    public static CDCAckId unmarshal(final String text) {
+        List<String> parts = Splitter.on('_').trimResults().omitEmptyStrings().splitToList(text);
+        return new CDCAckId(parts.get(0), parts.get(1));
+    }
+}
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckPosition.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckPosition.java
index a62ad348b9d..e1391fc59bf 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckPosition.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckPosition.java
@@ -42,20 +42,6 @@ public final class CDCAckPosition {
         createTimeMills = System.currentTimeMillis();
     }
     
-    public CDCAckPosition(final Record lastRecord, final long createTimeMills) {
-        this.lastRecord = lastRecord;
-        this.createTimeMills = createTimeMills;
-    }
-    
-    /**
-     * Add data record count.
-     *
-     * @param count count.
-     */
-    public void addDataRecordCount(final int count) {
-        dataRecordCount.addAndGet(count);
-    }
-    
     /**
      * Get data record count.
      *
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/connector/SocketSinkImporterConnector.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/connector/SocketSinkImporterConnector.java
deleted file mode 100644
index bc3f6314d67..00000000000
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/connector/SocketSinkImporterConnector.java
+++ /dev/null
@@ -1,250 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.cdc.core.connector;
-
-import io.netty.channel.Channel;
-import lombok.RequiredArgsConstructor;
-import lombok.Setter;
-import lombok.SneakyThrows;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.importer.ImporterType;
-import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
-import org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
-import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
-import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
-import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckHolder;
-import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckPosition;
-import org.apache.shardingsphere.data.pipeline.cdc.core.importer.SocketSinkImporter;
-import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseGenerator;
-import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult;
-import org.apache.shardingsphere.data.pipeline.cdc.util.CDCDataRecordUtils;
-import org.apache.shardingsphere.data.pipeline.cdc.util.DataRecordResultConvertUtils;
-import org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
-import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
-
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-/**
- * Socket sink importer connector.
- */
-@Slf4j
-public final class SocketSinkImporterConnector implements ImporterConnector, AutoCloseable {
-    
-    private static final long DEFAULT_TIMEOUT_MILLISECONDS = 200L;
-    
-    private final Lock lock = new ReentrantLock();
-    
-    private final Condition condition = lock.newCondition();
-    
-    @Setter
-    private volatile boolean incrementalTaskRunning = true;
-    
-    private final ShardingSphereDatabase database;
-    
-    private final Channel channel;
-    
-    private final int jobShardingCount;
-    
-    private final Comparator<DataRecord> dataRecordComparator;
-    
-    private final Map<String, String> tableNameSchemaMap = new HashMap<>();
-    
-    private final Map<SocketSinkImporter, BlockingQueue<List<DataRecord>>> incrementalRecordMap = new ConcurrentHashMap<>();
-    
-    private final AtomicInteger runningIncrementalTaskCount = new AtomicInteger(0);
-    
-    private Thread incrementalImporterTask;
-    
-    public SocketSinkImporterConnector(final Channel channel, final ShardingSphereDatabase database, final int jobShardingCount, final Collection<String> schemaTableNames,
-                                       final Comparator<DataRecord> dataRecordComparator) {
-        this.channel = channel;
-        this.database = database;
-        this.jobShardingCount = jobShardingCount;
-        schemaTableNames.stream().filter(each -> each.contains(".")).forEach(each -> {
-            String[] split = each.split("\\.");
-            tableNameSchemaMap.put(split[1], split[0]);
-        });
-        this.dataRecordComparator = dataRecordComparator;
-    }
-    
-    @Override
-    public Object getConnector() {
-        return channel;
-    }
-    
-    /**
-     * Write data record into channel.
-     *
-     * @param recordList data records
-     * @param socketSinkImporter cdc importer
-     * @param importerType importer type
-     */
-    public void write(final List<Record> recordList, final SocketSinkImporter socketSinkImporter, final ImporterType importerType) {
-        if (recordList.isEmpty()) {
-            return;
-        }
-        if (ImporterType.INVENTORY == importerType || null == dataRecordComparator) {
-            int dataRecordCount = (int) recordList.stream().filter(DataRecord.class::isInstance).count();
-            Record lastRecord = recordList.get(recordList.size() - 1);
-            if (lastRecord instanceof FinishedRecord && 0 == dataRecordCount) {
-                socketSinkImporter.ackWithLastDataRecord(new CDCAckPosition(lastRecord, 0));
-                return;
-            }
-            Map<SocketSinkImporter, CDCAckPosition> importerDataRecordMap = new HashMap<>();
-            importerDataRecordMap.put(socketSinkImporter, new CDCAckPosition(lastRecord, dataRecordCount));
-            writeImmediately(recordList, importerDataRecordMap);
-        } else if (ImporterType.INCREMENTAL == importerType) {
-            writeIntoQueue(recordList, socketSinkImporter);
-        }
-    }
-    
-    private void writeImmediately(final List<? extends Record> recordList, final Map<SocketSinkImporter, CDCAckPosition> importerDataRecordMap) {
-        doAwait();
-        if (!channel.isActive()) {
-            return;
-        }
-        List<DataRecordResult.Record> records = new LinkedList<>();
-        for (Record each : recordList) {
-            if (!(each instanceof DataRecord)) {
-                continue;
-            }
-            DataRecord dataRecord = (DataRecord) each;
-            records.add(DataRecordResultConvertUtils.convertDataRecordToRecord(database.getName(), tableNameSchemaMap.get(dataRecord.getTableName()), dataRecord));
-        }
-        String ackId = CDCAckHolder.getInstance().bindAckIdWithPosition(importerDataRecordMap);
-        DataRecordResult dataRecordResult = DataRecordResult.newBuilder().addAllRecord(records).setAckId(ackId).build();
-        channel.writeAndFlush(CDCResponseGenerator.succeedBuilder("").setDataRecordResult(dataRecordResult).build());
-    }
-    
-    @SuppressWarnings("ResultOfMethodCallIgnored")
-    private void doAwait() {
-        while (!channel.isWritable() && channel.isActive()) {
-            lock.lock();
-            try {
-                condition.await(DEFAULT_TIMEOUT_MILLISECONDS, TimeUnit.MILLISECONDS);
-            } catch (final InterruptedException ignored) {
-                Thread.currentThread().interrupt();
-            } finally {
-                lock.unlock();
-            }
-        }
-    }
-    
-    @SneakyThrows(InterruptedException.class)
-    private void writeIntoQueue(final List<Record> dataRecords, final SocketSinkImporter socketSinkImporter) {
-        BlockingQueue<List<DataRecord>> blockingQueue = incrementalRecordMap.get(socketSinkImporter);
-        if (null == blockingQueue) {
-            log.warn("not find the queue to write");
-            return;
-        }
-        Map<Long, List<DataRecord>> recordsMap = new LinkedHashMap<>();
-        for (Record each : dataRecords) {
-            if (!(each instanceof DataRecord)) {
-                continue;
-            }
-            DataRecord dataRecord = (DataRecord) each;
-            // TODO need improve if support global transaction
-            recordsMap.computeIfAbsent(dataRecord.getCsn(), ignored -> new LinkedList<>()).add(dataRecord);
-        }
-        for (List<DataRecord> each : recordsMap.values()) {
-            blockingQueue.put(each);
-        }
-    }
-    
-    /**
-     * Send incremental start event.
-     *
-     * @param socketSinkImporter socket sink importer
-     * @param batchSize batch size
-     */
-    public void sendIncrementalStartEvent(final SocketSinkImporter socketSinkImporter, final int batchSize) {
-        incrementalRecordMap.computeIfAbsent(socketSinkImporter, ignored -> new ArrayBlockingQueue<>(batchSize));
-        int count = runningIncrementalTaskCount.incrementAndGet();
-        if (count < jobShardingCount || null == dataRecordComparator) {
-            return;
-        }
-        log.debug("start CDC incremental importer");
-        if (null == incrementalImporterTask) {
-            incrementalImporterTask = new Thread(new CDCIncrementalImporterTask(batchSize));
-            incrementalImporterTask.start();
-        }
-    }
-    
-    /**
-     * Clean socket sink importer connector.
-     *
-     * @param socketSinkImporter CDC importer
-     */
-    public void clean(final SocketSinkImporter socketSinkImporter) {
-        incrementalRecordMap.remove(socketSinkImporter);
-        if (ImporterType.INCREMENTAL == socketSinkImporter.getImporterType()) {
-            incrementalTaskRunning = false;
-        }
-    }
-    
-    @Override
-    public String getType() {
-        return CDCSinkType.SOCKET.name();
-    }
-    
-    @Override
-    public void close() throws Exception {
-        channel.close();
-    }
-    
-    @RequiredArgsConstructor
-    private final class CDCIncrementalImporterTask implements Runnable {
-        
-        private final int batchSize;
-        
-        @SneakyThrows(InterruptedException.class)
-        @Override
-        public void run() {
-            while (incrementalTaskRunning) {
-                Map<SocketSinkImporter, CDCAckPosition> cdcAckPositionMap = new HashMap<>();
-                List<DataRecord> dataRecords = new LinkedList<>();
-                for (int i = 0; i < batchSize; i++) {
-                    List<DataRecord> minimumRecords = CDCDataRecordUtils.findMinimumDataRecordsAndSavePosition(incrementalRecordMap, dataRecordComparator, cdcAckPositionMap);
-                    if (minimumRecords.isEmpty()) {
-                        break;
-                    }
-                    dataRecords.addAll(minimumRecords);
-                }
-                if (dataRecords.isEmpty()) {
-                    Thread.sleep(200L);
-                } else {
-                    writeImmediately(dataRecords, cdcAckPositionMap);
-                }
-            }
-        }
-    }
-}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/connector/ImporterConnector.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCChannelProgressPair.java
similarity index 60%
copy from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/connector/ImporterConnector.java
copy to kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCChannelProgressPair.java
index 89a58775c15..63ec0a1ccf3 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/connector/ImporterConnector.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCChannelProgressPair.java
@@ -15,24 +15,21 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.spi.importer.connector;
+package org.apache.shardingsphere.data.pipeline.cdc.core.importer;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
 
 /**
- * Importer connector.
+ * CDC channel and progress listener pair.
  */
-public interface ImporterConnector {
+@RequiredArgsConstructor
+@Getter
+public final class CDCChannelProgressPair {
     
-    /**
-     * Get connector.
-     *
-     * @return connector
-     */
-    Object getConnector();
+    private final PipelineChannel channel;
     
-    /**
-     * Connector type.
-     *
-     * @return connector type
-     */
-    String getType();
+    private final PipelineJobProgressListener jobProgressListener;
 }
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
new file mode 100644
index 00000000000..f5124b3bfbd
--- /dev/null
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.core.importer;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
+import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
+import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord;
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
+import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressUpdatedParameter;
+import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckId;
+import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckPosition;
+import org.apache.shardingsphere.data.pipeline.spi.importer.sink.PipelineSink;
+import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * CDC importer.
+ */
+@RequiredArgsConstructor
+@Slf4j
+public final class CDCImporter extends AbstractLifecycleExecutor implements Importer {
+    
+    @Getter
+    private final String importerId = RandomStringUtils.randomAlphanumeric(8);
+    
+    private final List<CDCChannelProgressPair> originalChannelProgressPairs;
+    
+    private final int batchSize;
+    
+    private final long timeout;
+    
+    private final TimeUnit timeUnit;
+    
+    private final PipelineSink sink;
+    
+    private final boolean needSorting;
+    
+    private final JobRateLimitAlgorithm rateLimitAlgorithm;
+    
+    private final PriorityQueue<CSNRecords> csnRecordsQueue = new PriorityQueue<>(new CSNRecordsComparator());
+    
+    private final Cache<String, Pair<CDCChannelProgressPair, CDCAckPosition>> ackCache = Caffeine.newBuilder().maximumSize(10000).expireAfterAccess(5, TimeUnit.MINUTES).build();
+    
+    @Override
+    protected void runBlocking() {
+        CDCImporterManager.putImporter(this);
+        List<CDCChannelProgressPair> channelProgressPairs = new ArrayList<>(originalChannelProgressPairs);
+        while (isRunning()) {
+            if (needSorting) {
+                doWithSorting(channelProgressPairs);
+            } else {
+                doWithoutSorting(channelProgressPairs);
+            }
+            if (channelProgressPairs.isEmpty()) {
+                break;
+            }
+        }
+    }
+    
+    private void doWithoutSorting(final List<CDCChannelProgressPair> channelProgressPairs) {
+        Iterator<CDCChannelProgressPair> channelProgressPairsIterator = channelProgressPairs.iterator();
+        while (channelProgressPairsIterator.hasNext()) {
+            CDCChannelProgressPair channelProgressPair = channelProgressPairsIterator.next();
+            PipelineChannel channel = channelProgressPair.getChannel();
+            List<Record> records = channel.fetchRecords(batchSize, timeout, timeUnit).stream().filter(each -> !(each instanceof PlaceholderRecord)).collect(Collectors.toList());
+            if (records.isEmpty()) {
+                continue;
+            }
+            if (null != rateLimitAlgorithm) {
+                rateLimitAlgorithm.intercept(JobOperationType.INSERT, 1);
+            }
+            String ackId = CDCAckId.build(importerId).marshal();
+            ackCache.put(ackId, Pair.of(channelProgressPair, new CDCAckPosition(records.get(records.size() - 1), getDataRecordsCount(records))));
+            sink.write(ackId, records);
+            Record lastRecord = records.get(records.size() - 1);
+            if (lastRecord instanceof FinishedRecord) {
+                channelProgressPairsIterator.remove();
+            }
+            if (lastRecord instanceof FinishedRecord && records.stream().noneMatch(DataRecord.class::isInstance)) {
+                channel.ack(records);
+                channelProgressPair.getJobProgressListener().onProgressUpdated(new PipelineJobProgressUpdatedParameter(0));
+            }
+        }
+    }
+    
+    @SneakyThrows(InterruptedException.class)
+    private void doWithSorting(final List<CDCChannelProgressPair> channelProgressPairs) {
+        if (null != rateLimitAlgorithm) {
+            rateLimitAlgorithm.intercept(JobOperationType.INSERT, 1);
+        }
+        prepareTransactionRecords(channelProgressPairs);
+        CSNRecords csnRecords = csnRecordsQueue.poll();
+        if (null == csnRecords) {
+            timeUnit.sleep(timeout);
+            return;
+        }
+        // TODO Combine small transactions into a large transaction, to improve transformation performance.
+        String ackId = CDCAckId.build(importerId).marshal();
+        List<Record> records = csnRecords.getRecords();
+        ackCache.put(ackId, Pair.of(csnRecords.getChannelProgressPair(), new CDCAckPosition(records.get(records.size() - 1), getDataRecordsCount(records))));
+        sink.write(ackId, filterDataRecords(records));
+    }
+    
+    private int getDataRecordsCount(final List<Record> records) {
+        return (int) records.stream().filter(DataRecord.class::isInstance).count();
+    }
+    
+    private List<Record> filterDataRecords(final List<Record> records) {
+        return records.stream().filter(DataRecord.class::isInstance).map(each -> (DataRecord) each).collect(Collectors.toList());
+    }
+    
+    // TODO openGauss CSN should be incremented for every transaction. Currently, CSN might be duplicated in transactions.
+    // TODO Use channels watermark depth to improve performance.
+    private void prepareTransactionRecords(final List<CDCChannelProgressPair> channelProgressPairs) {
+        if (csnRecordsQueue.isEmpty()) {
+            for (CDCChannelProgressPair each : channelProgressPairs) {
+                PipelineChannel channel = each.getChannel();
+                List<Record> records = channel.pollRecords();
+                if (records.isEmpty()) {
+                    continue;
+                }
+                if (0 == getDataRecordsCount(records)) {
+                    channel.ack(records);
+                    continue;
+                }
+                csnRecordsQueue.add(new CSNRecords(findFirstDataRecord(records).getCsn(), each, records));
+            }
+        } else {
+            CSNRecords csnRecords = csnRecordsQueue.peek();
+            long oldestCSN = findFirstDataRecord(csnRecords.getRecords()).getCsn();
+            for (CDCChannelProgressPair each : channelProgressPairs) {
+                PipelineChannel channel = each.getChannel();
+                List<Record> records = channel.peekRecords();
+                if (records.isEmpty()) {
+                    continue;
+                }
+                if (0 == getDataRecordsCount(records)) {
+                    records = channel.pollRecords();
+                    channel.ack(records);
+                    continue;
+                }
+                long csn = findFirstDataRecord(records).getCsn();
+                if (csn < oldestCSN) {
+                    records = channel.pollRecords();
+                    csnRecordsQueue.add(new CSNRecords(csn, each, records));
+                }
+            }
+        }
+    }
+    
+    private DataRecord findFirstDataRecord(final List<Record> records) {
+        for (Record each : records) {
+            if (each instanceof DataRecord) {
+                return (DataRecord) each;
+            }
+        }
+        throw new IllegalStateException("No data record found");
+    }
+    
+    /**
+     * Ack.
+     *
+     * @param ackId ack id
+     */
+    public void ack(final String ackId) {
+        Pair<CDCChannelProgressPair, CDCAckPosition> channelPositionPair = ackCache.getIfPresent(ackId);
+        if (null == channelPositionPair) {
+            log.warn("Could not find cached ack info, ack id: {}", ackId);
+            return;
+        }
+        CDCAckPosition ackPosition = channelPositionPair.getRight();
+        channelPositionPair.getLeft().getChannel().ack(Collections.singletonList(ackPosition.getLastRecord()));
+        channelPositionPair.getLeft().getJobProgressListener().onProgressUpdated(new PipelineJobProgressUpdatedParameter(ackPosition.getDataRecordCount()));
+    }
+    
+    @Override
+    protected void doStop() {
+        CDCImporterManager.removeImporter(importerId);
+    }
+}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTask.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporterManager.java
similarity index 52%
copy from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTask.java
copy to kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporterManager.java
index 1b090ee9b2a..62f684da2a1 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTask.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporterManager.java
@@ -15,46 +15,43 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.task;
+package org.apache.shardingsphere.data.pipeline.cdc.core.importer;
 
-import org.apache.shardingsphere.data.pipeline.api.task.progress.TaskProgress;
-
-import java.util.Collection;
-import java.util.concurrent.CompletableFuture;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
- * Pipeline task interface.
+ * CDC importer manager.
  */
-public interface PipelineTask {
-    
-    /**
-     * Start task.
-     *
-     * @return future
-     */
-    Collection<CompletableFuture<?>> start();
+public final class CDCImporterManager {
     
-    /**
-     * Stop task.
-     */
-    void stop();
+    private static final Map<String, CDCImporter> IMPORTER_MAP = new ConcurrentHashMap<>();
     
     /**
-     * Get task id.
+     * Put importer.
      *
-     * @return task id
+     * @param importer importer
      */
-    String getTaskId();
+    public static void putImporter(final CDCImporter importer) {
+        IMPORTER_MAP.put(importer.getImporterId(), importer);
+    }
     
     /**
-     * Get task progress.
+     * Get importer.
      *
-     * @return task progress
+     * @param id importer id
+     * @return importer
      */
-    TaskProgress getTaskProgress();
+    public static CDCImporter getImporter(final String id) {
+        return IMPORTER_MAP.get(id);
+    }
     
     /**
-     * Close.
+     * Remove importer.
+     *
+     * @param id importer id
      */
-    void close();
+    public static void removeImporter(final String id) {
+        IMPORTER_MAP.remove(id);
+    }
 }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/connector/ImporterConnector.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CSNRecords.java
similarity index 65%
rename from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/connector/ImporterConnector.java
rename to kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CSNRecords.java
index 89a58775c15..7e6d3469eed 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/connector/ImporterConnector.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CSNRecords.java
@@ -15,24 +15,24 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.spi.importer.connector;
+package org.apache.shardingsphere.data.pipeline.cdc.core.importer;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
+
+import java.util.List;
 
 /**
- * Importer connector.
+ * CSN records.
  */
-public interface ImporterConnector {
+@RequiredArgsConstructor
+@Getter
+public final class CSNRecords {
+    
+    private final long csn;
     
-    /**
-     * Get connector.
-     *
-     * @return connector
-     */
-    Object getConnector();
+    private final CDCChannelProgressPair channelProgressPair;
     
-    /**
-     * Connector type.
-     *
-     * @return connector type
-     */
-    String getType();
+    private final List<Record> records;
 }
diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureImporterConnector.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CSNRecordsComparator.java
similarity index 66%
copy from test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureImporterConnector.java
copy to kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CSNRecordsComparator.java
index 0953a373026..9b65c8b7130 100644
--- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureImporterConnector.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CSNRecordsComparator.java
@@ -15,19 +15,22 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.test.it.data.pipeline.core.fixture;
+package org.apache.shardingsphere.data.pipeline.cdc.core.importer;
 
-import org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
 
-public final class FixtureImporterConnector implements ImporterConnector {
-    
-    @Override
-    public Object getConnector() {
-        return null;
-    }
+import java.util.Comparator;
+
+/**
+ * CSN records comparator.
+ */
+@RequiredArgsConstructor
+@Getter
+public final class CSNRecordsComparator implements Comparator<CSNRecords> {
     
     @Override
-    public String getType() {
-        return "FIXTURE";
+    public int compare(final CSNRecords o1, final CSNRecords o2) {
+        return Long.compare(o1.getCsn(), o2.getCsn());
     }
 }
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/SocketSinkImporter.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/SocketSinkImporter.java
deleted file mode 100644
index a6eb2942911..00000000000
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/SocketSinkImporter.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.cdc.core.importer;
-
-import lombok.AccessLevel;
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
-import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
-import org.apache.shardingsphere.data.pipeline.api.importer.ImporterType;
-import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
-import org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
-import org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord;
-import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
-import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
-import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
-import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressUpdatedParameter;
-import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckHolder;
-import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckPosition;
-import org.apache.shardingsphere.data.pipeline.cdc.core.connector.SocketSinkImporterConnector;
-import org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
-import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-
-/**
- * Socket sink importer.
- */
-@Slf4j
-public final class SocketSinkImporter extends AbstractLifecycleExecutor implements Importer {
-    
-    @Getter(AccessLevel.PROTECTED)
-    private final ImporterConfiguration importerConfig;
-    
-    private final PipelineChannel channel;
-    
-    private final SocketSinkImporterConnector importerConnector;
-    
-    private final PipelineJobProgressListener jobProgressListener;
-    
-    @Getter
-    private final ImporterType importerType;
-    
-    private final JobRateLimitAlgorithm rateLimitAlgorithm;
-    
-    public SocketSinkImporter(final ImporterConfiguration importerConfig, final ImporterConnector importerConnector, final PipelineChannel channel,
-                              final PipelineJobProgressListener jobProgressListener, final ImporterType importerType) {
-        this.importerConfig = importerConfig;
-        rateLimitAlgorithm = null == importerConfig ? null : importerConfig.getRateLimitAlgorithm();
-        this.channel = channel;
-        this.importerConnector = (SocketSinkImporterConnector) importerConnector;
-        this.jobProgressListener = jobProgressListener;
-        this.importerType = importerType;
-    }
-    
-    @Override
-    protected void runBlocking() {
-        int batchSize = importerConfig.getBatchSize();
-        if (ImporterType.INCREMENTAL == importerType) {
-            importerConnector.sendIncrementalStartEvent(this, batchSize);
-        }
-        while (isRunning()) {
-            List<Record> records = channel.fetchRecords(batchSize, 500, TimeUnit.MILLISECONDS);
-            if (!records.isEmpty()) {
-                List<Record> recordList = records.stream().filter(each -> !(each instanceof PlaceholderRecord)).collect(Collectors.toList());
-                processDataRecords(recordList);
-                if (FinishedRecord.class.equals(records.get(records.size() - 1).getClass())) {
-                    break;
-                }
-            }
-        }
-    }
-    
-    private void processDataRecords(final List<Record> recordList) {
-        if (null == recordList || recordList.isEmpty()) {
-            return;
-        }
-        if (null != rateLimitAlgorithm) {
-            rateLimitAlgorithm.intercept(JobOperationType.INSERT, 1);
-        }
-        importerConnector.write(recordList, this, importerType);
-    }
-    
-    /**
-     * Ack with last data record.
-     *
-     * @param cdcAckPosition cdc ack position
-     */
-    public void ackWithLastDataRecord(final CDCAckPosition cdcAckPosition) {
-        channel.ack(Collections.singletonList(cdcAckPosition.getLastRecord()));
-        jobProgressListener.onProgressUpdated(new PipelineJobProgressUpdatedParameter(cdcAckPosition.getDataRecordCount()));
-    }
-    
-    @Override
-    protected void doStop() {
-        if (ImporterType.INCREMENTAL == importerType) {
-            importerConnector.clean(this);
-            CDCAckHolder.getInstance().cleanUp(this);
-        }
-    }
-}
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/SocketSinkImporterCreator.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/SocketSinkImporterCreator.java
deleted file mode 100644
index bea3b61c1da..00000000000
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/SocketSinkImporterCreator.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.cdc.core.importer;
-
-import org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
-import org.apache.shardingsphere.data.pipeline.api.importer.ImporterType;
-import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
-import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
-import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
-import org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreator;
-import org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
-
-/**
- * Socket sink importer creator.
- */
-public final class SocketSinkImporterCreator implements ImporterCreator {
-    
-    @Override
-    public Importer createImporter(final ImporterConfiguration importerConfig, final ImporterConnector importerConnector, final PipelineChannel channel,
-                                   final PipelineJobProgressListener jobProgressListener, final ImporterType importerType) {
-        return new SocketSinkImporter(importerConfig, importerConnector, channel, jobProgressListener, importerType);
-    }
-    
-    @Override
-    public String getType() {
-        return CDCSinkType.SOCKET.name();
-    }
-}
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/CDCSocketSink.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/CDCSocketSink.java
new file mode 100644
index 00000000000..155d5b6d332
--- /dev/null
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/CDCSocketSink.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.core.importer.sink;
+
+import io.netty.channel.Channel;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressUpdatedParameter;
+import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseGenerator;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult;
+import org.apache.shardingsphere.data.pipeline.cdc.util.DataRecordResultConvertUtils;
+import org.apache.shardingsphere.data.pipeline.spi.importer.sink.PipelineSink;
+import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * CDC socket sink.
+ */
+@Slf4j
+public final class CDCSocketSink implements PipelineSink {
+    
+    private static final long DEFAULT_TIMEOUT_MILLISECONDS = 200L;
+    
+    private final Lock lock = new ReentrantLock();
+    
+    private final Condition condition = lock.newCondition();
+    
+    private final ShardingSphereDatabase database;
+    
+    private final Channel channel;
+    
+    private final Map<String, String> tableNameSchemaMap = new HashMap<>();
+    
+    public CDCSocketSink(final Channel channel, final ShardingSphereDatabase database, final Collection<String> schemaTableNames) {
+        this.channel = channel;
+        this.database = database;
+        schemaTableNames.stream().filter(each -> each.contains(".")).forEach(each -> {
+            String[] split = each.split("\\.");
+            tableNameSchemaMap.put(split[1], split[0]);
+        });
+    }
+    
+    @Override
+    public boolean identifierMatched(final Object identifier) {
+        return channel.id().equals(identifier);
+    }
+    
+    @Override
+    public PipelineJobProgressUpdatedParameter write(final String ackId, final List<Record> records) {
+        if (records.isEmpty()) {
+            return new PipelineJobProgressUpdatedParameter(0);
+        }
+        while (!channel.isWritable() && channel.isActive()) {
+            doAwait();
+        }
+        if (!channel.isActive()) {
+            return new PipelineJobProgressUpdatedParameter(0);
+        }
+        List<DataRecordResult.Record> resultRecords = new LinkedList<>();
+        for (Record each : records) {
+            if (!(each instanceof DataRecord)) {
+                continue;
+            }
+            DataRecord dataRecord = (DataRecord) each;
+            resultRecords.add(DataRecordResultConvertUtils.convertDataRecordToRecord(database.getName(), tableNameSchemaMap.get(dataRecord.getTableName()), dataRecord));
+        }
+        DataRecordResult dataRecordResult = DataRecordResult.newBuilder().addAllRecord(resultRecords).setAckId(ackId).build();
+        channel.writeAndFlush(CDCResponseGenerator.succeedBuilder("").setDataRecordResult(dataRecordResult).build());
+        return new PipelineJobProgressUpdatedParameter(resultRecords.size());
+    }
+    
+    private void doAwait() {
+        lock.lock();
+        try {
+            condition.await(DEFAULT_TIMEOUT_MILLISECONDS, TimeUnit.MILLISECONDS);
+        } catch (final InterruptedException ignored) {
+            Thread.currentThread().interrupt();
+        } finally {
+            lock.unlock();
+        }
+    }
+    
+    @Override
+    public void close() throws IOException {
+        channel.close();
+    }
+}
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
index ee6ea603e61..314e1e16c53 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
@@ -17,9 +17,13 @@
 
 package org.apache.shardingsphere.data.pipeline.cdc.core.job;
 
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
+import org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition;
+import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
 import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
 import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobAPI;
@@ -30,22 +34,32 @@ import org.apache.shardingsphere.data.pipeline.cdc.context.job.CDCJobItemContext
 import org.apache.shardingsphere.data.pipeline.cdc.core.prepare.CDCJobPreparer;
 import org.apache.shardingsphere.data.pipeline.cdc.core.task.CDCTasksRunner;
 import org.apache.shardingsphere.data.pipeline.cdc.yaml.job.YamlCDCJobConfigurationSwapper;
-import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalJobItemContext;
 import org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
-import org.apache.shardingsphere.data.pipeline.core.job.AbstractSimplePipelineJob;
+import org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
+import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
+import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
+import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
+import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
 import org.apache.shardingsphere.data.pipeline.core.util.CloseUtils;
-import org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
+import org.apache.shardingsphere.data.pipeline.spi.importer.sink.PipelineSink;
 import org.apache.shardingsphere.elasticjob.api.ShardingContext;
+import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
 
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 
 /**
  * CDC job.
  */
 @Slf4j
-public final class CDCJob extends AbstractSimplePipelineJob {
+public final class CDCJob extends AbstractPipelineJob implements SimpleJob {
     
-    private final ImporterConnector importerConnector;
+    @Getter
+    private final PipelineSink sink;
     
     private final CDCJobAPI jobAPI = new CDCJobAPI();
     
@@ -53,37 +67,147 @@ public final class CDCJob extends AbstractSimplePipelineJob {
     
     private final PipelineDataSourceManager dataSourceManager = new DefaultPipelineDataSourceManager();
     
-    public CDCJob(final String jobId, final ImporterConnector importerConnector) {
+    public CDCJob(final String jobId, final PipelineSink sink) {
         super(jobId);
-        this.importerConnector = importerConnector;
+        this.sink = sink;
     }
     
     @Override
-    protected void doPrepare(final PipelineJobItemContext jobItemContext) {
-        jobPreparer.initTasks((CDCJobItemContext) jobItemContext);
+    public void execute(final ShardingContext shardingContext) {
+        String jobId = shardingContext.getJobName();
+        log.info("Execute job {}", jobId);
+        CDCJobConfiguration jobConfig = new YamlCDCJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
+        List<CDCJobItemContext> jobItemContexts = new LinkedList<>();
+        for (int shardingItem = 0; shardingItem < jobConfig.getJobShardingCount(); shardingItem++) {
+            if (isStopping()) {
+                log.info("stopping true, ignore");
+                return;
+            }
+            CDCJobItemContext jobItemContext = buildPipelineJobItemContext(jobConfig, shardingItem);
+            PipelineTasksRunner tasksRunner = new CDCTasksRunner(jobItemContext);
+            if (!addTasksRunner(shardingItem, tasksRunner)) {
+                continue;
+            }
+            jobItemContexts.add(jobItemContext);
+            jobAPI.cleanJobItemErrorMessage(jobId, shardingItem);
+            log.info("start tasks runner, jobId={}, shardingItem={}", jobId, shardingItem);
+        }
+        if (jobItemContexts.isEmpty()) {
+            log.warn("job item contexts empty, ignore");
+            return;
+        }
+        prepare(jobItemContexts);
+        executeInventoryTasks(jobItemContexts);
+        executeIncrementalTasks(jobItemContexts);
     }
     
-    @Override
-    protected PipelineJobItemContext buildPipelineJobItemContext(final ShardingContext shardingContext) {
-        int shardingItem = shardingContext.getShardingItem();
-        CDCJobConfiguration jobConfig = new YamlCDCJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
-        Optional<InventoryIncrementalJobItemProgress> initProgress = jobAPI.getJobItemProgress(shardingContext.getJobName(), shardingItem);
+    private CDCJobItemContext buildPipelineJobItemContext(final CDCJobConfiguration jobConfig, final int shardingItem) {
+        Optional<InventoryIncrementalJobItemProgress> initProgress = jobAPI.getJobItemProgress(jobConfig.getJobId(), shardingItem);
         CDCProcessContext jobProcessContext = jobAPI.buildPipelineProcessContext(jobConfig);
         CDCTaskConfiguration taskConfig = jobAPI.buildTaskConfiguration(jobConfig, shardingItem, jobProcessContext.getPipelineProcessConfig());
-        return new CDCJobItemContext(jobConfig, shardingItem, initProgress.orElse(null), jobProcessContext, taskConfig, dataSourceManager, importerConnector);
+        return new CDCJobItemContext(jobConfig, shardingItem, initProgress.orElse(null), jobProcessContext, taskConfig, dataSourceManager, sink);
+    }
+    
+    private void prepare(final Collection<CDCJobItemContext> jobItemContexts) {
+        try {
+            jobPreparer.initTasks(jobItemContexts);
+            // CHECKSTYLE:OFF
+        } catch (final RuntimeException ex) {
+            // CHECKSTYLE:ON
+            for (PipelineJobItemContext each : jobItemContexts) {
+                processFailed(each, ex);
+            }
+            throw ex;
+            // CHECKSTYLE:OFF
+        } catch (final Exception ex) {
+            // CHECKSTYLE:ON
+            for (PipelineJobItemContext each : jobItemContexts) {
+                processFailed(each, ex);
+            }
+            throw new PipelineInternalException(ex);
+        }
     }
     
     @Override
-    protected PipelineTasksRunner buildPipelineTasksRunner(final PipelineJobItemContext pipelineJobItemContext) {
-        InventoryIncrementalJobItemContext jobItemContext = (InventoryIncrementalJobItemContext) pipelineJobItemContext;
-        return new CDCTasksRunner(jobItemContext, jobItemContext.getInventoryTasks(), jobItemContext.getIncrementalTasks());
+    protected void processFailed(final PipelineJobItemContext jobItemContext, final Exception ex) {
+        String jobId = jobItemContext.getJobId();
+        log.error("job prepare failed, {}-{}", jobId, jobItemContext.getShardingItem(), ex);
+        jobAPI.persistJobItemErrorMessage(jobItemContext.getJobId(), jobItemContext.getShardingItem(), ex);
+        PipelineJobCenter.stop(jobId);
+        jobAPI.updateJobConfigurationDisabled(jobId, true);
+    }
+    
+    private void executeInventoryTasks(final List<CDCJobItemContext> jobItemContexts) {
+        Collection<CompletableFuture<?>> futures = new LinkedList<>();
+        for (CDCJobItemContext each : jobItemContexts) {
+            updateLocalAndRemoteJobItemStatus(each, JobStatus.EXECUTE_INVENTORY_TASK);
+            for (PipelineTask task : each.getInventoryTasks()) {
+                if (task.getTaskProgress().getPosition() instanceof FinishedPosition) {
+                    continue;
+                }
+                futures.addAll(task.start());
+            }
+        }
+        if (futures.isEmpty()) {
+            return;
+        }
+        ExecuteEngine.trigger(futures, new CDCExecuteCallback("inventory", jobItemContexts.get(0)));
+    }
+    
+    private void updateLocalAndRemoteJobItemStatus(final PipelineJobItemContext jobItemContext, final JobStatus jobStatus) {
+        jobItemContext.setStatus(jobStatus);
+        jobAPI.updateJobItemStatus(jobItemContext.getJobId(), jobItemContext.getShardingItem(), jobStatus);
+    }
+    
+    private void executeIncrementalTasks(final List<CDCJobItemContext> jobItemContexts) {
+        log.info("execute incremental tasks, jobId={}", getJobId());
+        Collection<CompletableFuture<?>> futures = new LinkedList<>();
+        for (CDCJobItemContext each : jobItemContexts) {
+            if (JobStatus.EXECUTE_INCREMENTAL_TASK == each.getStatus()) {
+                log.info("job status already EXECUTE_INCREMENTAL_TASK, ignore");
+                return;
+            }
+            updateLocalAndRemoteJobItemStatus(each, JobStatus.EXECUTE_INCREMENTAL_TASK);
+            for (PipelineTask task : each.getIncrementalTasks()) {
+                if (task.getTaskProgress().getPosition() instanceof FinishedPosition) {
+                    continue;
+                }
+                futures.addAll(task.start());
+            }
+        }
+        ExecuteEngine.trigger(futures, new CDCExecuteCallback("incremental", jobItemContexts.get(0)));
+    }
+    
+    @Override
+    protected void doPrepare(final PipelineJobItemContext jobItemContext) {
+        throw new UnsupportedOperationException();
     }
     
     @Override
     protected void doClean() {
         dataSourceManager.close();
-        if (importerConnector instanceof AutoCloseable) {
-            CloseUtils.closeQuietly((AutoCloseable) importerConnector);
+        CloseUtils.closeQuietly(sink);
+    }
+    
+    @RequiredArgsConstructor
+    private final class CDCExecuteCallback implements ExecuteCallback {
+        
+        private final String identifier;
+        
+        private final PipelineJobItemContext jobItemContext;
+        
+        @Override
+        public void onSuccess() {
+            log.info("onSuccess, all {} tasks finished.", identifier);
+        }
+        
+        @Override
+        public void onFailure(final Throwable throwable) {
+            log.error("onFailure, {} task execute failed.", identifier, throwable);
+            String jobId = jobItemContext.getJobId();
+            jobAPI.persistJobItemErrorMessage(jobId, jobItemContext.getShardingItem(), throwable);
+            PipelineJobCenter.stop(jobId);
+            jobAPI.updateJobConfigurationDisabled(jobId, true);
         }
     }
 }
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
index b5b5f52d2b1..5fb6519ca36 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
@@ -18,26 +18,45 @@
 package org.apache.shardingsphere.data.pipeline.cdc.core.prepare;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
+import org.apache.shardingsphere.data.pipeline.api.importer.ImporterType;
+import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.Dumper;
+import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrementalTasksProgress;
-import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskProgress;
 import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobAPI;
-import org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration;
+import org.apache.shardingsphere.data.pipeline.cdc.context.CDCProcessContext;
 import org.apache.shardingsphere.data.pipeline.cdc.context.job.CDCJobItemContext;
+import org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCChannelProgressPair;
+import org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCImporter;
+import org.apache.shardingsphere.data.pipeline.cdc.core.task.CDCIncrementalTask;
+import org.apache.shardingsphere.data.pipeline.cdc.core.task.CDCInventoryTask;
 import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithGetBinlogPositionException;
-import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
+import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumper;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
 import org.apache.shardingsphere.data.pipeline.core.prepare.InventoryTaskSplitter;
 import org.apache.shardingsphere.data.pipeline.core.prepare.PipelineJobPreparerUtils;
-import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
-import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
-import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
+import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
+import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils;
+import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumperCreator;
+import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
 
 import java.sql.SQLException;
+import java.util.Collection;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * CDC job preparer.
@@ -50,9 +69,21 @@ public final class CDCJobPreparer {
     /**
      * Do prepare work.
      *
-     * @param jobItemContext job item context
+     * @param jobItemContexts job item contexts
      */
-    public void initTasks(final CDCJobItemContext jobItemContext) {
+    public void initTasks(final Collection<CDCJobItemContext> jobItemContexts) {
+        // TODO Use pipeline tree to build it
+        AtomicBoolean inventoryImporterUsed = new AtomicBoolean();
+        List<CDCChannelProgressPair> inventoryChannelProgressPairs = new LinkedList<>();
+        AtomicBoolean incrementalImporterUsed = new AtomicBoolean();
+        List<CDCChannelProgressPair> incrementalChannelProgressPairs = new LinkedList<>();
+        for (CDCJobItemContext each : jobItemContexts) {
+            initTasks0(each, inventoryImporterUsed, inventoryChannelProgressPairs, incrementalImporterUsed, incrementalChannelProgressPairs);
+        }
+    }
+    
+    private void initTasks0(final CDCJobItemContext jobItemContext, final AtomicBoolean inventoryImporterUsed, final List<CDCChannelProgressPair> inventoryChannelProgressPairs,
+                            final AtomicBoolean incrementalImporterUsed, final List<CDCChannelProgressPair> incrementalChannelProgressPairs) {
         Optional<InventoryIncrementalJobItemProgress> jobItemProgress = jobAPI.getJobItemProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem());
         if (!jobItemProgress.isPresent()) {
             jobAPI.persistJobItemProgress(jobItemContext);
@@ -61,23 +92,14 @@ public final class CDCJobPreparer {
             PipelineJobCenter.stop(jobItemContext.getJobId());
             return;
         }
-        initIncrementalTasks(jobItemContext);
-        CDCJobConfiguration jobConfig = jobItemContext.getJobConfig();
-        if (jobConfig.isFull()) {
-            initInventoryTasks(jobItemContext);
+        prepareIncremental(jobItemContext);
+        if (jobItemContext.getJobConfig().isFull()) {
+            initInventoryTasks(jobItemContext, inventoryImporterUsed, inventoryChannelProgressPairs);
         }
+        initIncrementalTask(jobItemContext, incrementalImporterUsed, incrementalChannelProgressPairs);
     }
     
-    private void initInventoryTasks(final CDCJobItemContext jobItemContext) {
-        CDCTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
-        // TODO channel requires a new implementation
-        InventoryDumperConfiguration inventoryDumperConfig = new InventoryDumperConfiguration(jobItemContext.getTaskConfig().getDumperConfig());
-        InventoryTaskSplitter inventoryTaskSplitter = new InventoryTaskSplitter(jobItemContext.getSourceDataSource(), inventoryDumperConfig, taskConfig.getImporterConfig());
-        List<InventoryTask> allInventoryTasks = inventoryTaskSplitter.splitInventoryData(jobItemContext);
-        jobItemContext.getInventoryTasks().addAll(allInventoryTasks);
-    }
-    
-    private void initIncrementalTasks(final CDCJobItemContext jobItemContext) {
+    private void prepareIncremental(final CDCJobItemContext jobItemContext) {
         CDCTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
         JobItemIncrementalTasksProgress initIncremental = null == jobItemContext.getInitProgress() ? null : jobItemContext.getInitProgress().getIncremental();
         try {
@@ -85,11 +107,51 @@ public final class CDCJobPreparer {
         } catch (final SQLException ex) {
             throw new PrepareJobWithGetBinlogPositionException(jobItemContext.getJobId(), ex);
         }
-        PipelineChannelCreator pipelineChannelCreator = jobItemContext.getJobProcessContext().getPipelineChannelCreator();
-        PipelineTableMetaDataLoader sourceMetaDataLoader = jobItemContext.getSourceMetaDataLoader();
-        ExecuteEngine incrementalExecuteEngine = jobItemContext.getJobProcessContext().getIncrementalExecuteEngine();
-        IncrementalTask incrementalTask = new IncrementalTask(taskConfig.getImporterConfig().getConcurrency(), taskConfig.getDumperConfig(), taskConfig.getImporterConfig(),
-                pipelineChannelCreator, jobItemContext.getImporterConnector(), sourceMetaDataLoader, incrementalExecuteEngine, jobItemContext);
+    }
+    
+    private void initInventoryTasks(final CDCJobItemContext jobItemContext, final AtomicBoolean importerUsed, final List<CDCChannelProgressPair> channelProgressPairs) {
+        long startTimeMillis = System.currentTimeMillis();
+        CDCTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
+        ImporterConfiguration importerConfig = taskConfig.getImporterConfig();
+        CDCProcessContext processContext = jobItemContext.getJobProcessContext();
+        for (InventoryDumperConfiguration each : new InventoryTaskSplitter(jobItemContext.getSourceDataSource(), new InventoryDumperConfiguration(taskConfig.getDumperConfig()), importerConfig)
+                .splitInventoryDumperConfig(jobItemContext)) {
+            AtomicReference<IngestPosition> position = new AtomicReference<>(each.getPosition());
+            PipelineChannel channel = PipelineTaskUtils.createInventoryChannel(processContext.getPipelineChannelCreator(), importerConfig.getBatchSize(), position);
+            channelProgressPairs.add(new CDCChannelProgressPair(channel, jobItemContext));
+            Dumper dumper = new InventoryDumper(each, channel, jobItemContext.getSourceDataSource(), jobItemContext.getSourceMetaDataLoader());
+            Importer importer = importerUsed.get() ? null : new CDCImporter(channelProgressPairs, importerConfig.getBatchSize(), 3, TimeUnit.SECONDS, jobItemContext.getSink(),
+                    needSorting(ImporterType.INVENTORY, hasGlobalCSN(taskConfig.getDumperConfig().getDataSourceConfig().getDatabaseType())),
+                    importerConfig.getRateLimitAlgorithm());
+            jobItemContext.getInventoryTasks().add(new CDCInventoryTask(PipelineTaskUtils.generateInventoryTaskId(each), processContext.getInventoryDumperExecuteEngine(),
+                    processContext.getInventoryImporterExecuteEngine(), dumper, importer, position));
+            importerUsed.set(true);
+        }
+        log.info("initInventoryTasks cost {} ms", System.currentTimeMillis() - startTimeMillis);
+    }
+    
+    private boolean needSorting(final ImporterType importerType, final boolean hasGlobalCSN) {
+        return ImporterType.INCREMENTAL == importerType && hasGlobalCSN;
+    }
+    
+    private boolean hasGlobalCSN(final DatabaseType databaseType) {
+        return databaseType instanceof OpenGaussDatabaseType;
+    }
+    
+    private void initIncrementalTask(final CDCJobItemContext jobItemContext, final AtomicBoolean importerUsed, final List<CDCChannelProgressPair> channelProgressPairs) {
+        CDCTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
+        DumperConfiguration dumperConfig = taskConfig.getDumperConfig();
+        ImporterConfiguration importerConfig = taskConfig.getImporterConfig();
+        IncrementalTaskProgress taskProgress = PipelineTaskUtils.createIncrementalTaskProgress(dumperConfig.getPosition(), jobItemContext.getInitProgress());
+        PipelineChannel channel = PipelineTaskUtils.createIncrementalChannel(importerConfig.getConcurrency(), jobItemContext.getJobProcessContext().getPipelineChannelCreator(), taskProgress);
+        channelProgressPairs.add(new CDCChannelProgressPair(channel, jobItemContext));
+        Dumper dumper = PipelineTypedSPILoader.getDatabaseTypedService(IncrementalDumperCreator.class, dumperConfig.getDataSourceConfig().getDatabaseType().getType())
+                .createIncrementalDumper(dumperConfig, dumperConfig.getPosition(), channel, jobItemContext.getSourceMetaDataLoader());
+        boolean needSorting = needSorting(ImporterType.INCREMENTAL, hasGlobalCSN(importerConfig.getDataSourceConfig().getDatabaseType()));
+        Importer importer = importerUsed.get() ? null : new CDCImporter(channelProgressPairs, importerConfig.getBatchSize(), 300, TimeUnit.MILLISECONDS,
+                jobItemContext.getSink(), needSorting, importerConfig.getRateLimitAlgorithm());
+        PipelineTask incrementalTask = new CDCIncrementalTask(dumperConfig.getDataSourceName(), jobItemContext.getJobProcessContext().getIncrementalExecuteEngine(), dumper, importer, taskProgress);
         jobItemContext.getIncrementalTasks().add(incrementalTask);
+        importerUsed.set(true);
     }
 }
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCIncrementalTask.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCIncrementalTask.java
new file mode 100644
index 00000000000..ed872eb8f76
--- /dev/null
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCIncrementalTask.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.core.task;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
+import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.Dumper;
+import org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskProgress;
+import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
+import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
+import org.apache.shardingsphere.data.pipeline.core.task.TaskExecuteCallback;
+
+import javax.annotation.Nullable;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * CDC incremental task.
+ */
+@RequiredArgsConstructor
+@Slf4j
+@ToString(exclude = {"incrementalExecuteEngine", "dumper", "importer", "taskProgress"})
+public final class CDCIncrementalTask implements PipelineTask {
+    
+    @Getter
+    private final String taskId;
+    
+    private final ExecuteEngine incrementalExecuteEngine;
+    
+    private final Dumper dumper;
+    
+    @Nullable
+    private final Importer importer;
+    
+    @Getter
+    private final IncrementalTaskProgress taskProgress;
+    
+    @Override
+    public Collection<CompletableFuture<?>> start() {
+        taskProgress.getIncrementalTaskDelay().setLatestActiveTimeMillis(System.currentTimeMillis());
+        Collection<CompletableFuture<?>> result = new LinkedList<>();
+        result.add(incrementalExecuteEngine.submit(dumper, new TaskExecuteCallback(this)));
+        if (null != importer) {
+            result.add(incrementalExecuteEngine.submit(importer, new TaskExecuteCallback(this)));
+        }
+        return result;
+    }
+    
+    @Override
+    public void stop() {
+        dumper.stop();
+        if (null != importer) {
+            importer.stop();
+        }
+    }
+    
+    @Override
+    public void close() {
+    }
+}
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCInventoryTask.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCInventoryTask.java
new file mode 100644
index 00000000000..eda3b485897
--- /dev/null
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCInventoryTask.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.core.task;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
+import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.Dumper;
+import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
+import org.apache.shardingsphere.data.pipeline.api.task.progress.InventoryTaskProgress;
+import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
+import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
+import org.apache.shardingsphere.data.pipeline.core.task.TaskExecuteCallback;
+
+import javax.annotation.Nullable;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * CDC inventory task.
+ */
+@RequiredArgsConstructor
+@ToString(exclude = {"inventoryDumperExecuteEngine", "inventoryImporterExecuteEngine", "dumper", "importer"})
+@Slf4j
+public final class CDCInventoryTask implements PipelineTask {
+    
+    @Getter
+    private final String taskId;
+    
+    private final ExecuteEngine inventoryDumperExecuteEngine;
+    
+    private final ExecuteEngine inventoryImporterExecuteEngine;
+    
+    private final Dumper dumper;
+    
+    @Nullable
+    private final Importer importer;
+    
+    private final AtomicReference<IngestPosition> position;
+    
+    @Override
+    public Collection<CompletableFuture<?>> start() {
+        Collection<CompletableFuture<?>> result = new LinkedList<>();
+        result.add(inventoryDumperExecuteEngine.submit(dumper, new TaskExecuteCallback(this)));
+        if (null != importer) {
+            result.add(inventoryImporterExecuteEngine.submit(importer, new TaskExecuteCallback(this)));
+        }
+        return result;
+    }
+    
+    @Override
+    public void stop() {
+        dumper.stop();
+        if (null != importer) {
+            importer.stop();
+        }
+    }
+    
+    @Override
+    public InventoryTaskProgress getTaskProgress() {
+        return new InventoryTaskProgress(position.get());
+    }
+    
+    @Override
+    public void close() {
+    }
+}
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCTasksRunner.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCTasksRunner.java
index 9f78ddf90b8..60ed3fb3915 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCTasksRunner.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCTasksRunner.java
@@ -18,22 +18,49 @@
 package org.apache.shardingsphere.data.pipeline.cdc.core.task;
 
 import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
-import org.apache.shardingsphere.data.pipeline.core.task.InventoryIncrementalTasksRunner;
+import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
+import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalJobItemContext;
 import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
+import org.apache.shardingsphere.data.pipeline.core.util.CloseUtils;
 
 import java.util.Collection;
 
 /**
  * CDC tasks runner.
  */
-public final class CDCTasksRunner extends InventoryIncrementalTasksRunner {
+public final class CDCTasksRunner implements PipelineTasksRunner {
     
-    public CDCTasksRunner(final PipelineJobItemContext jobItemContext, final Collection<PipelineTask> inventoryTasks, final Collection<PipelineTask> incrementalTasks) {
-        super(jobItemContext, inventoryTasks, incrementalTasks);
+    private final InventoryIncrementalJobItemContext jobItemContext;
+    
+    private final Collection<PipelineTask> inventoryTasks;
+    
+    private final Collection<PipelineTask> incrementalTasks;
+    
+    public CDCTasksRunner(final InventoryIncrementalJobItemContext jobItemContext) {
+        this.jobItemContext = jobItemContext;
+        inventoryTasks = jobItemContext.getInventoryTasks();
+        incrementalTasks = jobItemContext.getIncrementalTasks();
+    }
+    
+    @Override
+    public PipelineJobItemContext getJobItemContext() {
+        return jobItemContext;
+    }
+    
+    @Override
+    public void start() {
     }
     
     @Override
-    protected void inventorySuccessCallback() {
-        executeIncrementalTask();
+    public void stop() {
+        jobItemContext.setStopping(true);
+        for (PipelineTask each : inventoryTasks) {
+            each.stop();
+            CloseUtils.closeQuietly(each);
+        }
+        for (PipelineTask each : incrementalTasks) {
+            each.stop();
+            CloseUtils.closeQuietly(each);
+        }
     }
 }
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/generator/DataRecordComparatorGenerator.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/generator/DataRecordComparatorGenerator.java
deleted file mode 100644
index 10bd3a9c6a2..00000000000
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/generator/DataRecordComparatorGenerator.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.cdc.generator;
-
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
-import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
-import org.apache.shardingsphere.infra.database.type.DatabaseType;
-import org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
-
-import java.util.Comparator;
-
-/**
- * Data record comparator generator.
- */
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class DataRecordComparatorGenerator {
-    
-    /**
-     * Generator comparator.
-     *
-     * @param databaseType database type
-     * @return data record comparator
-     */
-    public static Comparator<DataRecord> generatorIncrementalComparator(final DatabaseType databaseType) {
-        if (databaseType instanceof OpenGaussDatabaseType) {
-            return Comparator.comparing(DataRecord::getCsn, Comparator.nullsFirst(Comparator.naturalOrder()));
-        }
-        // TODO MySQL and PostgreSQL not support now
-        return null;
-    }
-}
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
index 1d0751d8adc..69a58617e37 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
@@ -21,21 +21,20 @@ import com.google.common.base.Strings;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelId;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
-import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
 import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobAPI;
 import org.apache.shardingsphere.data.pipeline.cdc.api.pojo.StreamDataParameter;
 import org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
 import org.apache.shardingsphere.data.pipeline.cdc.context.CDCConnectionContext;
-import org.apache.shardingsphere.data.pipeline.cdc.context.job.CDCJobItemContext;
-import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckHolder;
-import org.apache.shardingsphere.data.pipeline.cdc.core.connector.SocketSinkImporterConnector;
+import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckId;
+import org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCImporter;
+import org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCImporterManager;
+import org.apache.shardingsphere.data.pipeline.cdc.core.importer.sink.CDCSocketSink;
+import org.apache.shardingsphere.data.pipeline.cdc.core.job.CDCJob;
 import org.apache.shardingsphere.data.pipeline.cdc.exception.CDCExceptionWrapper;
 import org.apache.shardingsphere.data.pipeline.cdc.exception.CDCServerException;
 import org.apache.shardingsphere.data.pipeline.cdc.exception.NotFindStreamDataSourceTableException;
 import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseGenerator;
-import org.apache.shardingsphere.data.pipeline.cdc.generator.DataRecordComparatorGenerator;
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.AckStreamingRequestBody;
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody;
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody.SchemaTable;
@@ -57,7 +56,6 @@ import org.apache.shardingsphere.single.rule.SingleRule;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
@@ -152,10 +150,7 @@ public final class CDCBackendHandler {
             PipelineJobCenter.stop(jobId);
         }
         ShardingSphereDatabase database = PipelineContextManager.getProxyContext().getContextManager().getMetaDataContexts().getMetaData().getDatabase(cdcJobConfig.getDatabaseName());
-        Comparator<DataRecord> dataRecordComparator = cdcJobConfig.isDecodeWithTX()
-                ? DataRecordComparatorGenerator.generatorIncrementalComparator(database.getProtocolType())
-                : null;
-        jobAPI.startJob(jobId, new SocketSinkImporterConnector(channel, database, cdcJobConfig.getJobShardingCount(), cdcJobConfig.getSchemaTableNames(), dataRecordComparator));
+        jobAPI.startJob(jobId, new CDCSocketSink(channel, database, cdcJobConfig.getSchemaTableNames()));
         connectionContext.setJobId(jobId);
     }
     
@@ -170,22 +165,14 @@ public final class CDCBackendHandler {
             log.warn("job id is null or empty, ignored");
             return;
         }
-        List<Integer> shardingItems = new ArrayList<>(PipelineJobCenter.getShardingItems(jobId));
-        if (shardingItems.isEmpty()) {
+        CDCJob job = (CDCJob) PipelineJobCenter.getJob(jobId);
+        if (null == job) {
             return;
         }
-        Optional<PipelineJobItemContext> jobItemContext = PipelineJobCenter.getJobItemContext(jobId, shardingItems.get(0));
-        if (!jobItemContext.isPresent()) {
-            return;
-        }
-        CDCJobItemContext cdcJobItemContext = (CDCJobItemContext) jobItemContext.get();
-        if (cdcJobItemContext.getImporterConnector() instanceof SocketSinkImporterConnector) {
-            Channel channel = (Channel) cdcJobItemContext.getImporterConnector().getConnector();
-            if (channelId.equals(channel.id())) {
-                log.info("close CDC job, channel id: {}", channelId);
-                PipelineJobCenter.stop(jobId);
-                jobAPI.updateJobConfigurationDisabled(jobId, true);
-            }
+        if (job.getSink().identifierMatched(channelId)) {
+            log.info("close CDC job, channel id: {}", channelId);
+            PipelineJobCenter.stop(jobId);
+            jobAPI.updateJobConfigurationDisabled(jobId, true);
         }
     }
     
@@ -214,6 +201,12 @@ public final class CDCBackendHandler {
      * @param requestBody request body
      */
     public void processAck(final AckStreamingRequestBody requestBody) {
-        CDCAckHolder.getInstance().ack(requestBody.getAckId());
+        CDCAckId ackId = CDCAckId.unmarshal(requestBody.getAckId());
+        CDCImporter importer = CDCImporterManager.getImporter(ackId.getImporterId());
+        if (null == importer) {
+            log.warn("Could not find importer, ack id: {}", ackId.marshal());
+            return;
+        }
+        importer.ack(ackId.marshal());
     }
 }
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataRecordUtils.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataRecordUtils.java
deleted file mode 100644
index 6189f2ba161..00000000000
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataRecordUtils.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.cdc.util;
-
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
-import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
-import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
-import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckPosition;
-import org.apache.shardingsphere.data.pipeline.cdc.core.importer.SocketSinkImporter;
-
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.BlockingQueue;
-
-/**
- * CDC data record utility class.
- */
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class CDCDataRecordUtils {
-    
-    /**
-     * Find minimum data record and save position.
-     *
-     * @param incrementalRecordMap CDC ack position map.
-     * @param dataRecordComparator CDC ack position map.
-     * @param cdcAckPositionMap CDC ack position map.
-     * @return minimum data record
-     */
-    public static List<DataRecord> findMinimumDataRecordsAndSavePosition(final Map<SocketSinkImporter, BlockingQueue<List<DataRecord>>> incrementalRecordMap,
-                                                                         final Comparator<DataRecord> dataRecordComparator, final Map<SocketSinkImporter, CDCAckPosition> cdcAckPositionMap) {
-        if (null == dataRecordComparator) {
-            return findMinimumDataRecordWithoutComparator(incrementalRecordMap, cdcAckPositionMap);
-        } else {
-            return findMinimumDataRecordWithComparator(incrementalRecordMap, cdcAckPositionMap, dataRecordComparator);
-        }
-    }
-    
-    private static List<DataRecord> findMinimumDataRecordWithoutComparator(final Map<SocketSinkImporter, BlockingQueue<List<DataRecord>>> incrementalRecordMap,
-                                                                           final Map<SocketSinkImporter, CDCAckPosition> cdcAckPositionMap) {
-        for (Entry<SocketSinkImporter, BlockingQueue<List<DataRecord>>> entry : incrementalRecordMap.entrySet()) {
-            List<DataRecord> records = entry.getValue().poll();
-            if (null == records || records.isEmpty()) {
-                continue;
-            }
-            DataRecord lastRecord = records.get(records.size() - 1);
-            saveAckPosition(cdcAckPositionMap, entry.getKey(), lastRecord);
-            return records;
-        }
-        return Collections.emptyList();
-    }
-    
-    private static void saveAckPosition(final Map<SocketSinkImporter, CDCAckPosition> cdcAckPositionMap, final SocketSinkImporter socketSinkImporter, final Record record) {
-        CDCAckPosition cdcAckPosition = cdcAckPositionMap.get(socketSinkImporter);
-        if (null == cdcAckPosition) {
-            cdcAckPositionMap.put(socketSinkImporter, new CDCAckPosition(record, 1));
-        } else {
-            cdcAckPosition.setLastRecord(record);
-            cdcAckPosition.addDataRecordCount(cdcAckPosition.getDataRecordCount());
-        }
-    }
-    
-    private static List<DataRecord> findMinimumDataRecordWithComparator(final Map<SocketSinkImporter, BlockingQueue<List<DataRecord>>> incrementalRecordMap,
-                                                                        final Map<SocketSinkImporter, CDCAckPosition> cdcAckPositionMap, final Comparator<DataRecord> dataRecordComparator) {
-        Map<SocketSinkImporter, List<DataRecord>> waitSortedMap = new HashMap<>();
-        for (Entry<SocketSinkImporter, BlockingQueue<List<DataRecord>>> entry : incrementalRecordMap.entrySet()) {
-            List<DataRecord> peek = entry.getValue().peek();
-            if (null == peek) {
-                continue;
-            }
-            waitSortedMap.put(entry.getKey(), peek);
-        }
-        if (waitSortedMap.isEmpty()) {
-            return Collections.emptyList();
-        }
-        List<DataRecord> result = null;
-        SocketSinkImporter belongImporter = null;
-        for (Entry<SocketSinkImporter, List<DataRecord>> entry : waitSortedMap.entrySet()) {
-            if (null == result) {
-                result = entry.getValue();
-                belongImporter = entry.getKey();
-                continue;
-            }
-            if (dataRecordComparator.compare(result.get(0), entry.getValue().get(0)) > 0) {
-                result = entry.getValue();
-                belongImporter = entry.getKey();
-            }
-        }
-        if (null == result) {
-            return Collections.emptyList();
-        }
-        incrementalRecordMap.get(belongImporter).poll();
-        saveAckPosition(cdcAckPositionMap, belongImporter, result.get(result.size() - 1));
-        return result;
-    }
-}
diff --git a/kernel/data-pipeline/cdc/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreator b/kernel/data-pipeline/cdc/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreator
deleted file mode 100644
index 8b03ec4f243..00000000000
--- a/kernel/data-pipeline/cdc/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreator
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.shardingsphere.data.pipeline.cdc.core.importer.SocketSinkImporterCreator
diff --git a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckHolderTest.java b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckHolderTest.java
deleted file mode 100644
index ffc722d6ad2..00000000000
--- a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckHolderTest.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.cdc.core.ack;
-
-import org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition;
-import org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
-import org.apache.shardingsphere.data.pipeline.cdc.core.importer.SocketSinkImporter;
-import org.apache.shardingsphere.infra.util.reflection.ReflectionUtils;
-import org.junit.jupiter.api.Test;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.Mockito.mock;
-
-class CDCAckHolderTest {
-    
-    @Test
-    void assertBindAckIdWithPositionAndAck() {
-        CDCAckHolder cdcAckHolder = CDCAckHolder.getInstance();
-        final Map<SocketSinkImporter, CDCAckPosition> importerDataRecordMap = new HashMap<>();
-        SocketSinkImporter socketSinkImporter = mock(SocketSinkImporter.class);
-        importerDataRecordMap.put(socketSinkImporter, new CDCAckPosition(new FinishedRecord(new FinishedPosition()), 0));
-        Optional<Map<String, Map<SocketSinkImporter, CDCAckPosition>>> ackIdPositionMap = ReflectionUtils.getFieldValue(cdcAckHolder, "ackIdPositionMap");
-        assertTrue(ackIdPositionMap.isPresent());
-        assertTrue(ackIdPositionMap.get().isEmpty());
-        String ackId = cdcAckHolder.bindAckIdWithPosition(importerDataRecordMap);
-        assertThat(ackIdPositionMap.get().size(), is(1));
-        cdcAckHolder.ack(ackId);
-        assertTrue(ackIdPositionMap.get().isEmpty());
-    }
-    
-    @Test
-    void assertCleanUpTimeoutAckId() {
-        CDCAckHolder cdcAckHolder = CDCAckHolder.getInstance();
-        final Map<SocketSinkImporter, CDCAckPosition> importerDataRecordMap = new HashMap<>();
-        SocketSinkImporter socketSinkImporter = mock(SocketSinkImporter.class);
-        importerDataRecordMap.put(socketSinkImporter, new CDCAckPosition(new FinishedRecord(new FinishedPosition()), System.currentTimeMillis() - 60 * 1000 * 10));
-        cdcAckHolder.bindAckIdWithPosition(importerDataRecordMap);
-        cdcAckHolder.cleanUp(socketSinkImporter);
-        Optional<Map<String, Map<SocketSinkImporter, CDCAckPosition>>> ackIdPositionMap = ReflectionUtils.getFieldValue(cdcAckHolder, "ackIdPositionMap");
-        assertTrue(ackIdPositionMap.isPresent());
-        assertTrue(ackIdPositionMap.get().isEmpty());
-    }
-}
diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureImporterConnector.java b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckIdTest.java
similarity index 60%
rename from test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureImporterConnector.java
rename to kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckIdTest.java
index 0953a373026..e4cd727c2c4 100644
--- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureImporterConnector.java
+++ b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckIdTest.java
@@ -15,19 +15,21 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.test.it.data.pipeline.core.fixture;
+package org.apache.shardingsphere.data.pipeline.cdc.core.ack;
 
-import org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
+import org.junit.jupiter.api.Test;
 
-public final class FixtureImporterConnector implements ImporterConnector {
-    
-    @Override
-    public Object getConnector() {
-        return null;
-    }
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+class CDCAckIdTest {
     
-    @Override
-    public String getType() {
-        return "FIXTURE";
+    @Test
+    void assertBuild() {
+        CDCAckId expected = CDCAckId.build("importer1");
+        String text = expected.marshal();
+        CDCAckId actual = CDCAckId.unmarshal(text);
+        assertThat(actual.getImporterId(), is(expected.getImporterId()));
+        assertThat(actual.getRandom(), is(expected.getRandom()));
     }
 }
diff --git a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CSNRecordsComparatorTest.java b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CSNRecordsComparatorTest.java
new file mode 100644
index 00000000000..96587fb77a6
--- /dev/null
+++ b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CSNRecordsComparatorTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.core.importer;
+
+import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.PriorityQueue;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Mockito.mock;
+
+class CSNRecordsComparatorTest {
+    
+    @Test
+    void assertSort() {
+        PipelineChannel channel = mock(PipelineChannel.class);
+        PriorityQueue<CSNRecords> queue = new PriorityQueue<>(new CSNRecordsComparator());
+        CDCChannelProgressPair channelProgressPair = new CDCChannelProgressPair(channel, mock(PipelineJobProgressListener.class));
+        queue.add(new CSNRecords(3L, channelProgressPair, Collections.emptyList()));
+        queue.add(new CSNRecords(1L, channelProgressPair, Collections.emptyList()));
+        queue.add(new CSNRecords(2L, channelProgressPair, Collections.emptyList()));
+        assertThat(queue.size(), is(3));
+        assertThat(queue.poll().getCsn(), is(1L));
+        assertThat(queue.poll().getCsn(), is(2L));
+        assertThat(queue.poll().getCsn(), is(3L));
+    }
+}
diff --git a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/SocketSinkImporterCreatorTest.java b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/SocketSinkImporterCreatorTest.java
deleted file mode 100644
index 21c050be78f..00000000000
--- a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/SocketSinkImporterCreatorTest.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.cdc.core.importer;
-
-import io.netty.channel.Channel;
-import org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
-import org.apache.shardingsphere.data.pipeline.cdc.core.connector.SocketSinkImporterConnector;
-import org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreator;
-import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
-import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.Mock;
-import org.mockito.junit.jupiter.MockitoExtension;
-
-import java.util.Collections;
-
-import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.mockito.Mockito.mock;
-
-@ExtendWith(MockitoExtension.class)
-class SocketSinkImporterCreatorTest {
-    
-    @Mock
-    private ImporterConfiguration importerConfig;
-    
-    @Test
-    void assertCreateCDCImporter() {
-        SocketSinkImporterConnector importerConnector = new SocketSinkImporterConnector(mock(Channel.class), mock(ShardingSphereDatabase.class), 1, Collections.emptyList(), null);
-        assertThat(TypedSPILoader.getService(ImporterCreator.class, "Socket").createImporter(importerConfig, importerConnector, null, null, null), instanceOf(SocketSinkImporter.class));
-    }
-}
diff --git a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/generator/DataRecordComparatorGeneratorTest.java b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/generator/DataRecordComparatorGeneratorTest.java
deleted file mode 100644
index 7ddd2b43e4f..00000000000
--- a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/generator/DataRecordComparatorGeneratorTest.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.cdc.generator;
-
-import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
-import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
-import org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
-import org.junit.jupiter.api.Test;
-
-import java.util.Comparator;
-import java.util.LinkedList;
-import java.util.List;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.jupiter.api.Assertions.assertNull;
-
-class DataRecordComparatorGeneratorTest {
-    
-    @Test
-    void assertGeneratorIncrementalComparator() {
-        Comparator<DataRecord> dataRecordComparator = DataRecordComparatorGenerator.generatorIncrementalComparator(new OpenGaussDatabaseType());
-        List<DataRecord> dataRecords = new LinkedList<>();
-        dataRecords.add(generateDataRecord(1L));
-        dataRecords.add(generateDataRecord(100L));
-        dataRecords.add(generateDataRecord(0L));
-        dataRecords.add(generateDataRecord(null));
-        dataRecords.sort(dataRecordComparator);
-        assertNull(dataRecords.get(0).getCsn());
-        assertThat(dataRecords.get(1).getCsn(), is(0L));
-        assertThat(dataRecords.get(2).getCsn(), is(1L));
-        assertThat(dataRecords.get(3).getCsn(), is(100L));
-    }
-    
-    private DataRecord generateDataRecord(final Long csn) {
-        DataRecord result = new DataRecord(new PlaceholderPosition(), 0);
-        result.setCsn(csn);
-        return result;
-    }
-}
diff --git a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataRecordUtilsTest.java b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataRecordUtilsTest.java
deleted file mode 100644
index 514c948b6fb..00000000000
--- a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataRecordUtilsTest.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.cdc.util;
-
-import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
-import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
-import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckPosition;
-import org.apache.shardingsphere.data.pipeline.cdc.core.importer.SocketSinkImporter;
-import org.apache.shardingsphere.data.pipeline.cdc.generator.DataRecordComparatorGenerator;
-import org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
-import org.junit.jupiter.api.Test;
-
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.is;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.Mockito.mock;
-
-class CDCDataRecordUtilsTest {
-    
-    @Test
-    void assertFindMinimumDataRecordAndSavePosition() throws InterruptedException {
-        final Map<SocketSinkImporter, BlockingQueue<List<DataRecord>>> actualIncrementalRecordMap = new HashMap<>();
-        ArrayBlockingQueue<List<DataRecord>> queueFirst = new ArrayBlockingQueue<>(5);
-        queueFirst.put(Collections.singletonList(generateDataRecord(0)));
-        queueFirst.put(Collections.singletonList(generateDataRecord(2)));
-        queueFirst.put(Collections.singletonList(generateDataRecord(4)));
-        SocketSinkImporter mockSocketSinkImporterFirst = mock(SocketSinkImporter.class);
-        actualIncrementalRecordMap.put(mockSocketSinkImporterFirst, queueFirst);
-        ArrayBlockingQueue<List<DataRecord>> queueSecond = new ArrayBlockingQueue<>(5);
-        queueSecond.put(Collections.singletonList(generateDataRecord(1)));
-        queueSecond.put(Collections.singletonList(generateDataRecord(3)));
-        queueSecond.put(Collections.singletonList(generateDataRecord(5)));
-        SocketSinkImporter mockSocketSinkImporterSecond = mock(SocketSinkImporter.class);
-        actualIncrementalRecordMap.put(mockSocketSinkImporterSecond, queueSecond);
-        Comparator<DataRecord> dataRecordComparator = DataRecordComparatorGenerator.generatorIncrementalComparator(new OpenGaussDatabaseType());
-        final Map<SocketSinkImporter, CDCAckPosition> cdcAckPositionMap = new HashMap<>();
-        for (long i = 0; i <= 5; i++) {
-            List<DataRecord> minimumDataRecord = CDCDataRecordUtils.findMinimumDataRecordsAndSavePosition(actualIncrementalRecordMap, dataRecordComparator, cdcAckPositionMap);
-            assertThat(minimumDataRecord.size(), is(1));
-            assertThat(minimumDataRecord.get(0).getCsn(), is(i));
-        }
-        assertTrue(CDCDataRecordUtils.findMinimumDataRecordsAndSavePosition(actualIncrementalRecordMap, dataRecordComparator, cdcAckPositionMap).isEmpty());
-    }
-    
-    private DataRecord generateDataRecord(final long csn) {
-        DataRecord dataRecord = new DataRecord(new PlaceholderPosition(), 0);
-        dataRecord.setCsn(csn);
-        return dataRecord;
-    }
-}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/InventoryIncrementalJobItemContext.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/InventoryIncrementalJobItemContext.java
index c77a33219b2..c729ebed79d 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/InventoryIncrementalJobItemContext.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/InventoryIncrementalJobItemContext.java
@@ -22,7 +22,7 @@ import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncreme
 import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
 import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
-import org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
+import org.apache.shardingsphere.data.pipeline.spi.importer.sink.PipelineSink;
 
 import java.util.Collection;
 
@@ -63,11 +63,11 @@ public interface InventoryIncrementalJobItemContext extends PipelineJobItemConte
     PipelineTableMetaDataLoader getSourceMetaDataLoader();
     
     /**
-     * Get importer connector.
+     * Get sink.
      *
-     * @return importer connector
+     * @return sink
      */
-    ImporterConnector getImporterConnector();
+    PipelineSink getSink();
     
     /**
      * Get processed record count.
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporterCreator.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporterCreator.java
deleted file mode 100644
index 900d674bf8b..00000000000
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporterCreator.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.importer;
-
-import org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
-import org.apache.shardingsphere.data.pipeline.api.importer.ImporterType;
-import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
-import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
-import org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreator;
-import org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
-
-/**
- * Data source importer creator.
- */
-public final class DataSourceImporterCreator implements ImporterCreator {
-    
-    @Override
-    public Importer createImporter(final ImporterConfiguration importerConfig,
-                                   final ImporterConnector importerConnector, final PipelineChannel channel,
-                                   final PipelineJobProgressListener jobProgressListener, final ImporterType importerType) {
-        return new DataSourceImporter(importerConfig, importerConnector, channel, jobProgressListener);
-    }
-    
-    @Override
-    public String getType() {
-        return "DataSource";
-    }
-}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/SingleChannelConsumerImporter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/SingleChannelConsumerImporter.java
new file mode 100644
index 00000000000..52819ef48a3
--- /dev/null
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/SingleChannelConsumerImporter.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.importer;
+
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
+import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
+import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord;
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressUpdatedParameter;
+import org.apache.shardingsphere.data.pipeline.core.util.CloseUtils;
+import org.apache.shardingsphere.data.pipeline.spi.importer.sink.PipelineSink;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * Single channel consumer importer.
+ */
+@RequiredArgsConstructor
+public final class SingleChannelConsumerImporter extends AbstractLifecycleExecutor implements Importer {
+    
+    private final PipelineChannel channel;
+    
+    private final int batchSize;
+    
+    private final int timeout;
+    
+    private final TimeUnit timeUnit;
+    
+    private final PipelineSink sink;
+    
+    private final PipelineJobProgressListener jobProgressListener;
+    
+    @Override
+    protected void runBlocking() {
+        while (isRunning()) {
+            List<Record> records = channel.fetchRecords(batchSize, timeout, timeUnit).stream().filter(each -> !(each instanceof PlaceholderRecord)).collect(Collectors.toList());
+            if (records.isEmpty()) {
+                continue;
+            }
+            PipelineJobProgressUpdatedParameter updatedParam = sink.write("", records);
+            channel.ack(records);
+            jobProgressListener.onProgressUpdated(updatedParam);
+            if (FinishedRecord.class.equals(records.get(records.size() - 1).getClass())) {
+                break;
+            }
+        }
+    }
+    
+    @Override
+    protected void doStop() {
+        CloseUtils.closeQuietly(sink);
+    }
+}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
similarity index 85%
rename from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java
rename to kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
index 2a662553ec0..13720675810 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.importer;
+package org.apache.shardingsphere.data.pipeline.core.importer.sink;
 
 import lombok.AccessLevel;
 import lombok.Getter;
@@ -23,22 +23,19 @@ import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
-import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
-import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
-import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
-import org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.GroupedDataRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
 import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
-import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressUpdatedParameter;
 import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
 import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineImporterJobWriteException;
+import org.apache.shardingsphere.data.pipeline.core.importer.DataRecordMerger;
 import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
 import org.apache.shardingsphere.data.pipeline.core.record.RecordUtils;
-import org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
+import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
+import org.apache.shardingsphere.data.pipeline.spi.importer.sink.PipelineSink;
 import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
 import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
 import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
@@ -52,7 +49,6 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -61,7 +57,7 @@ import java.util.stream.IntStream;
  * Default importer.
  */
 @Slf4j
-public final class DataSourceImporter extends AbstractLifecycleExecutor implements Importer {
+public final class PipelineDataSourceSink implements PipelineSink {
     
     private static final DataRecordMerger MERGER = new DataRecordMerger();
     
@@ -72,10 +68,6 @@ public final class DataSourceImporter extends AbstractLifecycleExecutor implemen
     
     private final PipelineSQLBuilder pipelineSqlBuilder;
     
-    private final PipelineChannel channel;
-    
-    private final PipelineJobProgressListener jobProgressListener;
-    
     private final JobRateLimitAlgorithm rateLimitAlgorithm;
     
     private final AtomicReference<Statement> batchInsertStatement = new AtomicReference<>();
@@ -84,30 +76,21 @@ public final class DataSourceImporter extends AbstractLifecycleExecutor implemen
     
     private final AtomicReference<Statement> batchDeleteStatement = new AtomicReference<>();
     
-    public DataSourceImporter(final ImporterConfiguration importerConfig, final ImporterConnector importerConnector, final PipelineChannel channel,
-                              final PipelineJobProgressListener jobProgressListener) {
+    public PipelineDataSourceSink(final ImporterConfiguration importerConfig, final PipelineDataSourceManager dataSourceManager) {
         this.importerConfig = importerConfig;
         rateLimitAlgorithm = importerConfig.getRateLimitAlgorithm();
-        this.dataSourceManager = (PipelineDataSourceManager) importerConnector.getConnector();
-        this.channel = channel;
+        this.dataSourceManager = dataSourceManager;
         pipelineSqlBuilder = PipelineTypedSPILoader.getDatabaseTypedService(PipelineSQLBuilder.class, importerConfig.getDataSourceConfig().getDatabaseType().getType());
-        this.jobProgressListener = jobProgressListener;
     }
     
     @Override
-    protected void runBlocking() {
-        int batchSize = importerConfig.getBatchSize();
-        while (isRunning()) {
-            List<Record> records = channel.fetchRecords(batchSize, 3, TimeUnit.SECONDS);
-            if (!records.isEmpty()) {
-                PipelineJobProgressUpdatedParameter updatedParam = flush(dataSourceManager.getDataSource(importerConfig.getDataSourceConfig()), records);
-                channel.ack(records);
-                jobProgressListener.onProgressUpdated(updatedParam);
-                if (FinishedRecord.class.equals(records.get(records.size() - 1).getClass())) {
-                    break;
-                }
-            }
-        }
+    public boolean identifierMatched(final Object identifier) {
+        throw new UnsupportedOperationException();
+    }
+    
+    @Override
+    public PipelineJobProgressUpdatedParameter write(final String ackId, final List<Record> records) {
+        return flush(dataSourceManager.getDataSource(importerConfig.getDataSourceConfig()), records);
     }
     
     private PipelineJobProgressUpdatedParameter flush(final DataSource dataSource, final List<Record> buffer) {
@@ -140,7 +123,7 @@ public final class DataSourceImporter extends AbstractLifecycleExecutor implemen
     
     @SneakyThrows(InterruptedException.class)
     private void tryFlush(final DataSource dataSource, final List<DataRecord> buffer) {
-        for (int i = 0; isRunning() && i <= importerConfig.getRetryTimes(); i++) {
+        for (int i = 0; !Thread.interrupted() && i <= importerConfig.getRetryTimes(); i++) {
             try {
                 doFlush(dataSource, buffer);
                 return;
@@ -310,9 +293,9 @@ public final class DataSourceImporter extends AbstractLifecycleExecutor implemen
     }
     
     @Override
-    protected void doStop() throws SQLException {
-        cancelStatement(batchInsertStatement.get());
-        cancelStatement(updateStatement.get());
-        cancelStatement(batchDeleteStatement.get());
+    public void close() {
+        PipelineJdbcUtils.cancelStatement(batchInsertStatement.get());
+        PipelineJdbcUtils.cancelStatement(updateStatement.get());
+        PipelineJdbcUtils.cancelStatement(batchDeleteStatement.get());
     }
 }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/AckCallbacks.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/AckCallbacks.java
new file mode 100644
index 00000000000..e8735baefbb
--- /dev/null
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/AckCallbacks.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.ingest.channel;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
+import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
+import org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskProgress;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Ack callback utilities.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class AckCallbacks {
+    
+    /**
+     * Ack callback for inventory dump.
+     *
+     * @param records record list
+     * @param position ingest position
+     */
+    public static void inventoryCallback(final List<Record> records, final AtomicReference<IngestPosition> position) {
+        Record lastRecord = records.get(records.size() - 1);
+        position.set(lastRecord.getPosition());
+    }
+    
+    /**
+     * Ack callback for incremental dump.
+     *
+     * @param records record list
+     * @param progress incremental task progress
+     */
+    public static void incrementalCallback(final List<Record> records, final IncrementalTaskProgress progress) {
+        Record lastHandledRecord = records.get(records.size() - 1);
+        if (!(lastHandledRecord.getPosition() instanceof PlaceholderPosition)) {
+            progress.setPosition(lastHandledRecord.getPosition());
+            progress.getIncrementalTaskDelay().setLastEventTimestamps(lastHandledRecord.getCommitTime());
+        }
+        progress.getIncrementalTaskDelay().setLatestActiveTimeMillis(System.currentTimeMillis());
+    }
+}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MultiplexMemoryPipelineChannel.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MultiplexMemoryPipelineChannel.java
index 2e09b725c51..e4e77fe118d 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MultiplexMemoryPipelineChannel.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MultiplexMemoryPipelineChannel.java
@@ -82,10 +82,20 @@ public final class MultiplexMemoryPipelineChannel implements PipelineChannel {
     }
     
     @Override
-    public List<Record> fetchRecords(final int batchSize, final int timeout, final TimeUnit timeUnit) {
+    public List<Record> fetchRecords(final int batchSize, final long timeout, final TimeUnit timeUnit) {
         return findChannel().fetchRecords(batchSize, timeout, timeUnit);
     }
     
+    @Override
+    public List<Record> peekRecords() {
+        return findChannel().peekRecords();
+    }
+    
+    @Override
+    public List<Record> pollRecords() {
+        return findChannel().pollRecords();
+    }
+    
     @Override
     public void ack(final List<Record> records) {
         findChannel().ack(records);
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/SimpleMemoryPipelineChannel.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/SimpleMemoryPipelineChannel.java
index 605c3d25849..82a5a8fa071 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/SimpleMemoryPipelineChannel.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/SimpleMemoryPipelineChannel.java
@@ -22,6 +22,7 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.channel.AckCallback;
 import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
 
+import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.ArrayBlockingQueue;
@@ -51,25 +52,38 @@ public final class SimpleMemoryPipelineChannel implements PipelineChannel {
     @SneakyThrows(InterruptedException.class)
     // TODO thread-safe?
     @Override
-    public List<Record> fetchRecords(final int batchSize, final int timeout, final TimeUnit timeUnit) {
+    public List<Record> fetchRecords(final int batchSize, final long timeout, final TimeUnit timeUnit) {
         List<Record> result = new LinkedList<>();
-        long start = System.currentTimeMillis();
+        long startMillis = System.currentTimeMillis();
+        long timeoutMillis = timeUnit.toMillis(timeout);
         int recordsCount = 0;
-        while (batchSize > recordsCount) {
+        do {
             List<Record> records = queue.poll();
             if (null == records || records.isEmpty()) {
-                TimeUnit.MILLISECONDS.sleep(Math.min(100, timeUnit.toMillis(timeout)));
+                TimeUnit.MILLISECONDS.sleep(Math.min(100, timeoutMillis));
             } else {
                 recordsCount += records.size();
                 result.addAll(records);
             }
-            if (timeUnit.toMillis(timeout) <= System.currentTimeMillis() - start) {
+            if (recordsCount >= batchSize) {
                 break;
             }
-        }
+        } while (System.currentTimeMillis() - startMillis < timeoutMillis);
         return result;
     }
     
+    @Override
+    public List<Record> peekRecords() {
+        List<Record> result = queue.peek();
+        return null != result ? result : Collections.emptyList();
+    }
+    
+    @Override
+    public List<Record> pollRecords() {
+        List<Record> result = queue.poll();
+        return null != result ? result : Collections.emptyList();
+    }
+    
     @Override
     public void ack(final List<Record> records) {
         ackCallback.onAck(records);
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
index 4c6f41b3ef7..576b437728f 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
@@ -220,7 +220,7 @@ public final class InventoryDumper extends AbstractLifecycleExecutor implements
     }
     
     @Override
-    protected void doStop() throws SQLException {
-        cancelStatement(dumpStatement.get());
+    protected void doStop() {
+        PipelineJdbcUtils.cancelStatement(dumpStatement.get());
     }
 }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
index 452be0e1520..6d58c4c8fe7 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
@@ -103,7 +103,7 @@ public abstract class AbstractPipelineJob implements PipelineJob {
     
     protected abstract void doPrepare(PipelineJobItemContext jobItemContext) throws SQLException;
     
-    private void processFailed(final PipelineJobItemContext jobItemContext, final Exception ex) {
+    protected void processFailed(final PipelineJobItemContext jobItemContext, final Exception ex) {
         String jobId = jobItemContext.getJobId();
         log.error("job prepare failed, {}-{}", jobId, jobItemContext.getShardingItem(), ex);
         jobAPI.persistJobItemErrorMessage(jobItemContext.getJobId(), jobItemContext.getShardingItem(), ex);
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobCenter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobCenter.java
index bd46511735c..4826c302077 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobCenter.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobCenter.java
@@ -59,6 +59,16 @@ public final class PipelineJobCenter {
         return JOB_MAP.containsKey(jobId);
     }
     
+    /**
+     * Get job.
+     *
+     * @param jobId job id
+     * @return job
+     */
+    public static PipelineJob getJob(final String jobId) {
+        return JOB_MAP.get(jobId);
+    }
+    
     /**
      * Stop job.
      *
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
index c6b67dd0720..116df621efb 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
@@ -24,6 +24,9 @@ import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumper
 import org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineReadConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
+import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
+import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.Dumper;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.IntegerPrimaryKeyPosition;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.NoUniqueKeyPosition;
@@ -36,10 +39,12 @@ import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumn
 import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalJobItemContext;
 import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalProcessContext;
 import org.apache.shardingsphere.data.pipeline.core.exception.job.SplitPipelineJobByUniqueKeyException;
+import org.apache.shardingsphere.data.pipeline.core.importer.SingleChannelConsumerImporter;
+import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumper;
 import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataUtils;
 import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
+import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
-import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
 import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
 import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
 import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
@@ -57,6 +62,8 @@ import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * Inventory data task splitter.
@@ -80,16 +87,26 @@ public final class InventoryTaskSplitter {
     public List<InventoryTask> splitInventoryData(final InventoryIncrementalJobItemContext jobItemContext) {
         List<InventoryTask> result = new LinkedList<>();
         long startTimeMillis = System.currentTimeMillis();
-        PipelineChannelCreator pipelineChannelCreator = jobItemContext.getJobProcessContext().getPipelineChannelCreator();
-        for (InventoryDumperConfiguration each : splitDumperConfig(jobItemContext, dumperConfig)) {
-            result.add(new InventoryTask(each, importerConfig, pipelineChannelCreator, jobItemContext.getImporterConnector(), sourceDataSource, jobItemContext.getSourceMetaDataLoader(),
-                    jobItemContext.getJobProcessContext().getInventoryDumperExecuteEngine(), jobItemContext.getJobProcessContext().getInventoryImporterExecuteEngine(), jobItemContext));
+        InventoryIncrementalProcessContext processContext = jobItemContext.getJobProcessContext();
+        for (InventoryDumperConfiguration each : splitInventoryDumperConfig(jobItemContext)) {
+            AtomicReference<IngestPosition> position = new AtomicReference<>(each.getPosition());
+            PipelineChannel channel = PipelineTaskUtils.createInventoryChannel(processContext.getPipelineChannelCreator(), importerConfig.getBatchSize(), position);
+            Dumper dumper = new InventoryDumper(each, channel, sourceDataSource, jobItemContext.getSourceMetaDataLoader());
+            Importer importer = new SingleChannelConsumerImporter(channel, importerConfig.getBatchSize(), 3, TimeUnit.SECONDS, jobItemContext.getSink(), jobItemContext);
+            result.add(new InventoryTask(PipelineTaskUtils.generateInventoryTaskId(each), processContext.getInventoryDumperExecuteEngine(),
+                    processContext.getInventoryImporterExecuteEngine(), dumper, importer, position));
         }
         log.info("splitInventoryData cost {} ms", System.currentTimeMillis() - startTimeMillis);
         return result;
     }
     
-    private Collection<InventoryDumperConfiguration> splitDumperConfig(final InventoryIncrementalJobItemContext jobItemContext, final InventoryDumperConfiguration dumperConfig) {
+    /**
+     * Split inventory dumper configuration.
+     *
+     * @param jobItemContext job item context
+     * @return inventory dumper configurations
+     */
+    public Collection<InventoryDumperConfiguration> splitInventoryDumperConfig(final InventoryIncrementalJobItemContext jobItemContext) {
         Collection<InventoryDumperConfiguration> result = new LinkedList<>();
         for (InventoryDumperConfiguration each : splitByTable(dumperConfig)) {
             result.addAll(splitByPrimaryKey(each, jobItemContext, sourceDataSource));
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
index e028e86a492..db0d0d8c3cb 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
@@ -21,39 +21,21 @@ import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
-import org.apache.shardingsphere.data.pipeline.api.importer.ImporterType;
-import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
 import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.Dumper;
-import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
-import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
-import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
-import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
-import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
-import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskProgress;
-import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalJobItemContext;
-import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
-import org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreator;
-import org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
-import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
-import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumperCreator;
-import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
-import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
 
 import java.util.Collection;
 import java.util.LinkedList;
-import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
 /**
  * Incremental task.
  */
+@RequiredArgsConstructor
 @Slf4j
-@ToString(exclude = {"incrementalExecuteEngine", "channel", "dumper", "importers", "taskProgress"})
+@ToString(exclude = {"incrementalExecuteEngine", "dumper", "importers", "taskProgress"})
 public final class IncrementalTask implements PipelineTask {
     
     @Getter
@@ -61,8 +43,6 @@ public final class IncrementalTask implements PipelineTask {
     
     private final ExecuteEngine incrementalExecuteEngine;
     
-    private final PipelineChannel channel;
-    
     private final Dumper dumper;
     
     private final Collection<Importer> importers;
@@ -70,57 +50,12 @@ public final class IncrementalTask implements PipelineTask {
     @Getter
     private final IncrementalTaskProgress taskProgress;
     
-    // TODO simplify parameters
-    public IncrementalTask(final int concurrency, final DumperConfiguration dumperConfig, final ImporterConfiguration importerConfig,
-                           final PipelineChannelCreator pipelineChannelCreator, final ImporterConnector importerConnector,
-                           final PipelineTableMetaDataLoader sourceMetaDataLoader, final ExecuteEngine incrementalExecuteEngine,
-                           final InventoryIncrementalJobItemContext jobItemContext) {
-        taskId = dumperConfig.getDataSourceName();
-        this.incrementalExecuteEngine = incrementalExecuteEngine;
-        IngestPosition position = dumperConfig.getPosition();
-        taskProgress = createIncrementalTaskProgress(position, jobItemContext.getInitProgress());
-        channel = createChannel(concurrency, pipelineChannelCreator, taskProgress);
-        dumper = PipelineTypedSPILoader.getDatabaseTypedService(
-                IncrementalDumperCreator.class, dumperConfig.getDataSourceConfig().getDatabaseType().getType()).createIncrementalDumper(dumperConfig, position, channel, sourceMetaDataLoader);
-        importers = createImporters(concurrency, importerConfig, importerConnector, channel, jobItemContext);
-    }
-    
-    private IncrementalTaskProgress createIncrementalTaskProgress(final IngestPosition position, final InventoryIncrementalJobItemProgress jobItemProgress) {
-        IncrementalTaskProgress result = new IncrementalTaskProgress(position);
-        if (null != jobItemProgress && null != jobItemProgress.getIncremental()) {
-            Optional.ofNullable(jobItemProgress.getIncremental().getIncrementalTaskProgress())
-                    .ifPresent(optional -> result.setIncrementalTaskDelay(jobItemProgress.getIncremental().getIncrementalTaskProgress().getIncrementalTaskDelay()));
-        }
-        return result;
-    }
-    
-    private Collection<Importer> createImporters(final int concurrency, final ImporterConfiguration importerConfig, final ImporterConnector importerConnector, final PipelineChannel channel,
-                                                 final PipelineJobProgressListener jobProgressListener) {
-        Collection<Importer> result = new LinkedList<>();
-        for (int i = 0; i < concurrency; i++) {
-            result.add(TypedSPILoader.getService(ImporterCreator.class, importerConnector.getType()).createImporter(importerConfig, importerConnector, channel, jobProgressListener,
-                    ImporterType.INCREMENTAL));
-        }
-        return result;
-    }
-    
-    private PipelineChannel createChannel(final int concurrency, final PipelineChannelCreator pipelineChannelCreator, final IncrementalTaskProgress progress) {
-        return pipelineChannelCreator.createPipelineChannel(concurrency, 5, records -> {
-            Record lastHandledRecord = records.get(records.size() - 1);
-            if (!(lastHandledRecord.getPosition() instanceof PlaceholderPosition)) {
-                progress.setPosition(lastHandledRecord.getPosition());
-                progress.getIncrementalTaskDelay().setLastEventTimestamps(lastHandledRecord.getCommitTime());
-            }
-            progress.getIncrementalTaskDelay().setLatestActiveTimeMillis(System.currentTimeMillis());
-        });
-    }
-    
     @Override
     public Collection<CompletableFuture<?>> start() {
         taskProgress.getIncrementalTaskDelay().setLatestActiveTimeMillis(System.currentTimeMillis());
         Collection<CompletableFuture<?>> result = new LinkedList<>();
-        result.add(incrementalExecuteEngine.submit(dumper, new JobExecuteCallback(taskId, "incremental dumper")));
-        importers.forEach(each -> result.add(incrementalExecuteEngine.submit(each, new JobExecuteCallback(taskId, "importer"))));
+        result.add(incrementalExecuteEngine.submit(dumper, new TaskExecuteCallback(this)));
+        importers.forEach(each -> result.add(incrementalExecuteEngine.submit(each, new TaskExecuteCallback(this))));
         return result;
     }
     
@@ -134,25 +69,5 @@ public final class IncrementalTask implements PipelineTask {
     
     @Override
     public void close() {
-        channel.close();
-    }
-    
-    @RequiredArgsConstructor
-    private class JobExecuteCallback implements ExecuteCallback {
-        
-        private final String taskId;
-        
-        private final String jobType;
-        
-        @Override
-        public void onSuccess() {
-        }
-        
-        @Override
-        public void onFailure(final Throwable throwable) {
-            log.error("{} on failure, task ID={}", jobType, taskId);
-            IncrementalTask.this.stop();
-            IncrementalTask.this.close();
-        }
     }
 }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
index e5dc2fceb3b..5f10edf1712 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
@@ -20,15 +20,16 @@ package org.apache.shardingsphere.data.pipeline.core.task;
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI;
+import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalJobItemContext;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector;
+import org.apache.shardingsphere.data.pipeline.core.util.CloseUtils;
 import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
 
 import java.util.Collection;
@@ -43,7 +44,7 @@ import java.util.concurrent.CompletableFuture;
 public class InventoryIncrementalTasksRunner implements PipelineTasksRunner {
     
     @Getter
-    private final PipelineJobItemContext jobItemContext;
+    private final InventoryIncrementalJobItemContext jobItemContext;
     
     private final Collection<PipelineTask> inventoryTasks;
     
@@ -51,10 +52,10 @@ public class InventoryIncrementalTasksRunner implements PipelineTasksRunner {
     
     private final PipelineJobAPI jobAPI;
     
-    public InventoryIncrementalTasksRunner(final PipelineJobItemContext jobItemContext, final Collection<PipelineTask> inventoryTasks, final Collection<PipelineTask> incrementalTasks) {
+    public InventoryIncrementalTasksRunner(final InventoryIncrementalJobItemContext jobItemContext) {
         this.jobItemContext = jobItemContext;
-        this.inventoryTasks = inventoryTasks;
-        this.incrementalTasks = incrementalTasks;
+        inventoryTasks = jobItemContext.getInventoryTasks();
+        incrementalTasks = jobItemContext.getIncrementalTasks();
         jobAPI = TypedSPILoader.getService(PipelineJobAPI.class, PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getTypeName());
     }
     
@@ -63,11 +64,11 @@ public class InventoryIncrementalTasksRunner implements PipelineTasksRunner {
         jobItemContext.setStopping(true);
         for (PipelineTask each : inventoryTasks) {
             each.stop();
-            each.close();
+            CloseUtils.closeQuietly(each);
         }
         for (PipelineTask each : incrementalTasks) {
             each.stop();
-            each.close();
+            CloseUtils.closeQuietly(each);
         }
     }
     
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
index 4adad77d972..22a6471effb 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
@@ -18,28 +18,15 @@
 package org.apache.shardingsphere.data.pipeline.core.task;
 
 import lombok.Getter;
+import lombok.RequiredArgsConstructor;
 import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
-import org.apache.shardingsphere.data.pipeline.api.importer.ImporterType;
-import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
 import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.Dumper;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
-import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
-import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
-import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.api.task.progress.InventoryTaskProgress;
-import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
-import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumper;
-import org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreator;
-import org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
-import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
-import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
 
-import javax.sql.DataSource;
 import java.util.Collection;
 import java.util.LinkedList;
 import java.util.concurrent.CompletableFuture;
@@ -48,7 +35,8 @@ import java.util.concurrent.atomic.AtomicReference;
 /**
  * Inventory task.
  */
-@ToString(exclude = {"inventoryDumperExecuteEngine", "inventoryImporterExecuteEngine", "channel", "dumper", "importer"})
+@RequiredArgsConstructor
+@ToString(exclude = {"inventoryDumperExecuteEngine", "inventoryImporterExecuteEngine", "dumper", "importer"})
 @Slf4j
 public final class InventoryTask implements PipelineTask {
     
@@ -59,73 +47,20 @@ public final class InventoryTask implements PipelineTask {
     
     private final ExecuteEngine inventoryImporterExecuteEngine;
     
-    private final PipelineChannel channel;
-    
     private final Dumper dumper;
     
     private final Importer importer;
     
     private final AtomicReference<IngestPosition> position;
     
-    public InventoryTask(final InventoryDumperConfiguration inventoryDumperConfig, final ImporterConfiguration importerConfig,
-                         final PipelineChannelCreator pipelineChannelCreator, final ImporterConnector importerConnector,
-                         final DataSource sourceDataSource, final PipelineTableMetaDataLoader sourceMetaDataLoader,
-                         final ExecuteEngine inventoryDumperExecuteEngine, final ExecuteEngine inventoryImporterExecuteEngine,
-                         final PipelineJobProgressListener jobProgressListener) {
-        taskId = generateTaskId(inventoryDumperConfig);
-        this.inventoryDumperExecuteEngine = inventoryDumperExecuteEngine;
-        this.inventoryImporterExecuteEngine = inventoryImporterExecuteEngine;
-        channel = createChannel(pipelineChannelCreator, importerConfig.getBatchSize());
-        dumper = new InventoryDumper(inventoryDumperConfig, channel, sourceDataSource, sourceMetaDataLoader);
-        importer = TypedSPILoader.getService(ImporterCreator.class,
-                importerConnector.getType()).createImporter(importerConfig, importerConnector, channel, jobProgressListener, ImporterType.INVENTORY);
-        position = new AtomicReference<>(inventoryDumperConfig.getPosition());
-    }
-    
-    private String generateTaskId(final InventoryDumperConfiguration inventoryDumperConfig) {
-        String result = String.format("%s.%s", inventoryDumperConfig.getDataSourceName(), inventoryDumperConfig.getActualTableName());
-        return null == inventoryDumperConfig.getShardingItem() ? result : result + "#" + inventoryDumperConfig.getShardingItem();
-    }
-    
     @Override
     public Collection<CompletableFuture<?>> start() {
         Collection<CompletableFuture<?>> result = new LinkedList<>();
-        result.add(inventoryDumperExecuteEngine.submit(dumper, new ExecuteCallback() {
-            
-            @Override
-            public void onSuccess() {
-            }
-            
-            @Override
-            public void onFailure(final Throwable throwable) {
-                log.error("dumper onFailure, taskId={}", taskId);
-                stop();
-                close();
-            }
-        }));
-        result.add(inventoryImporterExecuteEngine.submit(importer, new ExecuteCallback() {
-            
-            @Override
-            public void onSuccess() {
-            }
-            
-            @Override
-            public void onFailure(final Throwable throwable) {
-                log.error("importer onFailure, taskId={}", taskId);
-                stop();
-                close();
-            }
-        }));
+        result.add(inventoryDumperExecuteEngine.submit(dumper, new TaskExecuteCallback(this)));
+        result.add(inventoryImporterExecuteEngine.submit(importer, new TaskExecuteCallback(this)));
         return result;
     }
     
-    private PipelineChannel createChannel(final PipelineChannelCreator pipelineChannelCreator, final int batchSize) {
-        return pipelineChannelCreator.createPipelineChannel(1, batchSize, records -> {
-            Record lastRecord = records.get(records.size() - 1);
-            position.set(lastRecord.getPosition());
-        });
-    }
-    
     @Override
     public void stop() {
         dumper.stop();
@@ -139,6 +74,5 @@ public final class InventoryTask implements PipelineTask {
     
     @Override
     public void close() {
-        channel.close();
     }
 }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTask.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTask.java
index 1b090ee9b2a..b0000fe97d6 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTask.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTask.java
@@ -19,13 +19,14 @@ package org.apache.shardingsphere.data.pipeline.core.task;
 
 import org.apache.shardingsphere.data.pipeline.api.task.progress.TaskProgress;
 
+import java.io.Closeable;
 import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 
 /**
  * Pipeline task interface.
  */
-public interface PipelineTask {
+public interface PipelineTask extends Closeable {
     
     /**
      * Start task.
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java
new file mode 100644
index 00000000000..15decabf3e0
--- /dev/null
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.task;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
+import org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskProgress;
+import org.apache.shardingsphere.data.pipeline.core.ingest.channel.AckCallbacks;
+import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
+
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Pipeline task utilities.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class PipelineTaskUtils {
+    
+    /**
+     * Generate inventory task id.
+     *
+     * @param inventoryDumperConfig inventory dumper configuration
+     * @return inventory task id
+     */
+    public static String generateInventoryTaskId(final InventoryDumperConfiguration inventoryDumperConfig) {
+        String result = String.format("%s.%s", inventoryDumperConfig.getDataSourceName(), inventoryDumperConfig.getActualTableName());
+        return null == inventoryDumperConfig.getShardingItem() ? result : result + "#" + inventoryDumperConfig.getShardingItem();
+    }
+    
+    /**
+     * Create incremental task progress.
+     *
+     * @param position ingest position
+     * @param jobItemProgress job item progress
+     * @return incremental task progress
+     */
+    public static IncrementalTaskProgress createIncrementalTaskProgress(final IngestPosition position, final InventoryIncrementalJobItemProgress jobItemProgress) {
+        IncrementalTaskProgress result = new IncrementalTaskProgress(position);
+        if (null != jobItemProgress && null != jobItemProgress.getIncremental()) {
+            Optional.ofNullable(jobItemProgress.getIncremental().getIncrementalTaskProgress())
+                    .ifPresent(optional -> result.setIncrementalTaskDelay(jobItemProgress.getIncremental().getIncrementalTaskProgress().getIncrementalTaskDelay()));
+        }
+        return result;
+    }
+    
+    /**
+     * Create channel for inventory task.
+     *
+     * @param pipelineChannelCreator channel creator
+     * @param averageElementSize average element size
+     * @param position ingest position
+     * @return channel
+     */
+    public static PipelineChannel createInventoryChannel(final PipelineChannelCreator pipelineChannelCreator, final int averageElementSize, final AtomicReference<IngestPosition> position) {
+        return pipelineChannelCreator.createPipelineChannel(1, averageElementSize, records -> AckCallbacks.inventoryCallback(records, position));
+    }
+    
+    /**
+     * Create incremental channel.
+     *
+     * @param concurrency output concurrency
+     * @param pipelineChannelCreator channel creator
+     * @param progress incremental task progress
+     * @return channel
+     */
+    public static PipelineChannel createIncrementalChannel(final int concurrency, final PipelineChannelCreator pipelineChannelCreator, final IncrementalTaskProgress progress) {
+        return pipelineChannelCreator.createPipelineChannel(concurrency, 5, records -> AckCallbacks.incrementalCallback(records, progress));
+    }
+}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/connector/DataSourceImporterConnector.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/TaskExecuteCallback.java
similarity index 62%
rename from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/connector/DataSourceImporterConnector.java
rename to kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/TaskExecuteCallback.java
index cc336523d4c..12d63575633 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/connector/DataSourceImporterConnector.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/TaskExecuteCallback.java
@@ -15,28 +15,30 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.importer.connector;
+package org.apache.shardingsphere.data.pipeline.core.task;
 
 import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
-import org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.io.IOUtils;
+import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
 
 /**
- * Default importer connector.
+ * Task execute callback.
  */
-
+@Slf4j
 @RequiredArgsConstructor
-public final class DataSourceImporterConnector implements ImporterConnector {
+public final class TaskExecuteCallback implements ExecuteCallback {
     
-    private final PipelineDataSourceManager pipelineDataSourceManager;
+    private final PipelineTask task;
     
     @Override
-    public Object getConnector() {
-        return pipelineDataSourceManager;
+    public void onSuccess() {
     }
     
     @Override
-    public String getType() {
-        return "DataSource";
+    public void onFailure(final Throwable throwable) {
+        log.error("onFailure, task ID={}", task.getTaskId());
+        task.stop();
+        IOUtils.closeQuietly(task);
     }
 }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineJdbcUtils.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineJdbcUtils.java
index facdd78e1ab..121c0594d98 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineJdbcUtils.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineJdbcUtils.java
@@ -19,7 +19,10 @@ package org.apache.shardingsphere.data.pipeline.core.util;
 
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
+import org.apache.shardingsphere.infra.util.exception.external.sql.type.wrapper.SQLWrapperException;
 
+import java.sql.SQLException;
+import java.sql.Statement;
 import java.sql.Types;
 
 /**
@@ -75,4 +78,21 @@ public final class PipelineJdbcUtils {
                 return false;
         }
     }
+    
+    /**
+     * Cancel statement.
+     *
+     * @param statement statement
+     * @throws SQLWrapperException if cancelling statement failed
+     */
+    public static void cancelStatement(final Statement statement) throws SQLWrapperException {
+        try {
+            if (null == statement || statement.isClosed()) {
+                return;
+            }
+            statement.cancel();
+        } catch (final SQLException ex) {
+            throw new SQLWrapperException(ex);
+        }
+    }
 }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/ImporterCreator.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/ImporterCreator.java
deleted file mode 100644
index c0f5cce078c..00000000000
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/ImporterCreator.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.spi.importer;
-
-import org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
-import org.apache.shardingsphere.data.pipeline.api.importer.ImporterType;
-import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
-import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
-import org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
-import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
-import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
-
-/**
- * Importer creator.
- */
-@SingletonSPI
-public interface ImporterCreator extends TypedSPI {
-    
-    /**
-     * Create importer.
-     *
-     * @param importerConfig importer configuration
-     * @param importerConnector import connector
-     * @param channel channel
-     * @param jobProgressListener job progress listener
-     * @param importerType importer type
-     * @return importer
-     */
-    Importer createImporter(ImporterConfiguration importerConfig, ImporterConnector importerConnector, PipelineChannel channel, PipelineJobProgressListener jobProgressListener,
-                            ImporterType importerType);
-}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTask.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/sink/PipelineSink.java
similarity index 53%
copy from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTask.java
copy to kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/sink/PipelineSink.java
index 1b090ee9b2a..e3a703c8a7c 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTask.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/sink/PipelineSink.java
@@ -15,46 +15,33 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.task;
+package org.apache.shardingsphere.data.pipeline.spi.importer.sink;
 
-import org.apache.shardingsphere.data.pipeline.api.task.progress.TaskProgress;
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressUpdatedParameter;
 
-import java.util.Collection;
-import java.util.concurrent.CompletableFuture;
+import java.io.Closeable;
+import java.util.List;
 
 /**
- * Pipeline task interface.
+ * Pipeline sink.
  */
-public interface PipelineTask {
+public interface PipelineSink extends Closeable {
     
     /**
-     * Start task.
+     * Identifier matched or not.
      *
-     * @return future
+     * @param identifier sink identifier
+     * @return true if matched, otherwise false
      */
-    Collection<CompletableFuture<?>> start();
+    boolean identifierMatched(Object identifier);
     
     /**
-     * Stop task.
-     */
-    void stop();
-    
-    /**
-     * Get task id.
-     *
-     * @return task id
-     */
-    String getTaskId();
-    
-    /**
-     * Get task progress.
+     * Write data.
      *
-     * @return task progress
-     */
-    TaskProgress getTaskProgress();
-    
-    /**
-     * Close.
+     * @param ackId ack id
+     * @param records records
+     * @return job progress updated parameter
      */
-    void close();
+    PipelineJobProgressUpdatedParameter write(String ackId, List<Record> records);
 }
diff --git a/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreator b/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreator
deleted file mode 100644
index 473c86a363d..00000000000
--- a/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreator
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.shardingsphere.data.pipeline.core.importer.DataSourceImporterCreator
diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
index ff598d5142f..c87c5a773ba 100644
--- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
+++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
@@ -67,8 +67,7 @@ public final class MigrationJob extends AbstractSimplePipelineJob {
     
     @Override
     protected PipelineTasksRunner buildPipelineTasksRunner(final PipelineJobItemContext pipelineJobItemContext) {
-        InventoryIncrementalJobItemContext jobItemContext = (InventoryIncrementalJobItemContext) pipelineJobItemContext;
-        return new InventoryIncrementalTasksRunner(jobItemContext, jobItemContext.getInventoryTasks(), jobItemContext.getIncrementalTasks());
+        return new InventoryIncrementalTasksRunner((InventoryIncrementalJobItemContext) pipelineJobItemContext);
     }
     
     @Override
diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java
index 2643b37ddd4..e00f527cfbb 100644
--- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java
+++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java
@@ -29,13 +29,13 @@ import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncreme
 import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressUpdatedParameter;
 import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalJobItemContext;
-import org.apache.shardingsphere.data.pipeline.core.importer.connector.DataSourceImporterConnector;
+import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineDataSourceSink;
 import org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
 import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
-import org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
+import org.apache.shardingsphere.data.pipeline.spi.importer.sink.PipelineSink;
 
 import java.util.Collection;
 import java.util.LinkedList;
@@ -124,10 +124,9 @@ public final class MigrationJobItemContext implements InventoryIncrementalJobIte
         return sourceMetaDataLoaderLazyInitializer.get();
     }
     
-    // TODO Use SPI, configurable
     @Override
-    public ImporterConnector getImporterConnector() {
-        return new DataSourceImporterConnector(dataSourceManager);
+    public PipelineSink getSink() {
+        return new PipelineDataSourceSink(taskConfig.getImporterConfig(), dataSourceManager);
     }
     
     /**
diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
index f34fe078082..be3ff4883b6 100644
--- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
+++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
@@ -19,19 +19,27 @@ package org.apache.shardingsphere.data.pipeline.scenario.migration.prepare;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.config.CreateTableConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
+import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.Dumper;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrementalTasksProgress;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.JobOffsetInfo;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
 import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskProgress;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
 import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithGetBinlogPositionException;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
+import org.apache.shardingsphere.data.pipeline.core.importer.SingleChannelConsumerImporter;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import org.apache.shardingsphere.data.pipeline.core.prepare.InventoryTaskSplitter;
@@ -39,11 +47,15 @@ import org.apache.shardingsphere.data.pipeline.core.prepare.PipelineJobPreparerU
 import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTargetSchemasParameter;
 import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTargetTablesParameter;
 import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
+import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
+import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
+import org.apache.shardingsphere.data.pipeline.spi.importer.sink.PipelineSink;
 import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
+import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumperCreator;
 import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.lock.GlobalLockNames;
@@ -56,8 +68,11 @@ import org.apache.shardingsphere.infra.util.exception.external.sql.type.generic.
 import org.apache.shardingsphere.mode.lock.GlobalLockDefinition;
 
 import java.sql.SQLException;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.LinkedList;
 import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Migration job preparer.
@@ -86,6 +101,8 @@ public final class MigrationJobPreparer {
             PipelineJobCenter.stop(jobItemContext.getJobId());
             return;
         }
+        prepareIncremental(jobItemContext);
+        initInventoryTasks(jobItemContext);
         if (PipelineJobPreparerUtils.isIncrementalSupported(jobItemContext.getJobConfig().getSourceDatabaseType())) {
             initIncrementalTasks(jobItemContext);
             if (jobItemContext.isStopping()) {
@@ -93,7 +110,6 @@ public final class MigrationJobPreparer {
                 return;
             }
         }
-        initInventoryTasks(jobItemContext);
         log.info("prepare, jobId={}, shardingItem={}, inventoryTasks={}, incrementalTasks={}",
                 jobItemContext.getJobId(), jobItemContext.getShardingItem(), jobItemContext.getInventoryTasks(), jobItemContext.getIncrementalTasks());
     }
@@ -133,8 +149,7 @@ public final class MigrationJobPreparer {
         }
         InventoryIncrementalJobItemProgress initProgress = jobItemContext.getInitProgress();
         if (null == initProgress) {
-            PipelineDataSourceWrapper targetDataSource = ((PipelineDataSourceManager) jobItemContext.getImporterConnector().getConnector())
-                    .getDataSource(jobItemContext.getTaskConfig().getImporterConfig().getDataSourceConfig());
+            PipelineDataSourceWrapper targetDataSource = jobItemContext.getDataSourceManager().getDataSource(jobItemContext.getTaskConfig().getImporterConfig().getDataSourceConfig());
             PipelineJobPreparerUtils.checkTargetDataSource(jobItemContext.getJobConfig().getTargetDatabaseType(), jobItemContext.getTaskConfig().getImporterConfig(),
                     Collections.singletonList(targetDataSource));
         }
@@ -144,7 +159,7 @@ public final class MigrationJobPreparer {
         MigrationJobConfiguration jobConfig = jobItemContext.getJobConfig();
         String targetDatabaseType = jobConfig.getTargetDatabaseType();
         CreateTableConfiguration createTableConfig = jobItemContext.getTaskConfig().getCreateTableConfig();
-        PipelineDataSourceManager dataSourceManager = (PipelineDataSourceManager) jobItemContext.getImporterConnector().getConnector();
+        PipelineDataSourceManager dataSourceManager = jobItemContext.getDataSourceManager();
         PrepareTargetSchemasParameter prepareTargetSchemasParam = new PrepareTargetSchemasParameter(
                 PipelineTypedSPILoader.getDatabaseTypedService(DatabaseType.class, targetDatabaseType), createTableConfig, dataSourceManager);
         PipelineJobPreparerUtils.prepareTargetSchema(targetDatabaseType, prepareTargetSchemasParam);
@@ -153,6 +168,16 @@ public final class MigrationJobPreparer {
         PipelineJobPreparerUtils.prepareTargetTables(targetDatabaseType, new PrepareTargetTablesParameter(createTableConfig, dataSourceManager, sqlParserEngine));
     }
     
+    private void prepareIncremental(final MigrationJobItemContext jobItemContext) {
+        MigrationTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
+        JobItemIncrementalTasksProgress initIncremental = null == jobItemContext.getInitProgress() ? null : jobItemContext.getInitProgress().getIncremental();
+        try {
+            taskConfig.getDumperConfig().setPosition(PipelineJobPreparerUtils.getIncrementalPosition(initIncremental, taskConfig.getDumperConfig(), jobItemContext.getDataSourceManager()));
+        } catch (final SQLException ex) {
+            throw new PrepareJobWithGetBinlogPositionException(jobItemContext.getJobId(), ex);
+        }
+    }
+    
     private void initInventoryTasks(final MigrationJobItemContext jobItemContext) {
         InventoryDumperConfiguration inventoryDumperConfig = new InventoryDumperConfiguration(jobItemContext.getTaskConfig().getDumperConfig());
         InventoryTaskSplitter inventoryTaskSplitter = new InventoryTaskSplitter(jobItemContext.getSourceDataSource(), inventoryDumperConfig, jobItemContext.getTaskConfig().getImporterConfig());
@@ -161,21 +186,30 @@ public final class MigrationJobPreparer {
     
     private void initIncrementalTasks(final MigrationJobItemContext jobItemContext) {
         MigrationTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
-        PipelineDataSourceManager dataSourceManager = (PipelineDataSourceManager) jobItemContext.getImporterConnector().getConnector();
-        JobItemIncrementalTasksProgress initIncremental = null == jobItemContext.getInitProgress() ? null : jobItemContext.getInitProgress().getIncremental();
-        try {
-            taskConfig.getDumperConfig().setPosition(PipelineJobPreparerUtils.getIncrementalPosition(initIncremental, taskConfig.getDumperConfig(), dataSourceManager));
-        } catch (final SQLException ex) {
-            throw new PrepareJobWithGetBinlogPositionException(jobItemContext.getJobId(), ex);
-        }
         PipelineChannelCreator pipelineChannelCreator = jobItemContext.getJobProcessContext().getPipelineChannelCreator();
         PipelineTableMetaDataLoader sourceMetaDataLoader = jobItemContext.getSourceMetaDataLoader();
+        DumperConfiguration dumperConfig = taskConfig.getDumperConfig();
+        ImporterConfiguration importerConfig = taskConfig.getImporterConfig();
         ExecuteEngine incrementalExecuteEngine = jobItemContext.getJobProcessContext().getIncrementalExecuteEngine();
-        IncrementalTask incrementalTask = new IncrementalTask(taskConfig.getImporterConfig().getConcurrency(), taskConfig.getDumperConfig(), taskConfig.getImporterConfig(),
-                pipelineChannelCreator, jobItemContext.getImporterConnector(), sourceMetaDataLoader, incrementalExecuteEngine, jobItemContext);
+        IncrementalTaskProgress taskProgress = PipelineTaskUtils.createIncrementalTaskProgress(dumperConfig.getPosition(), jobItemContext.getInitProgress());
+        PipelineChannel channel = PipelineTaskUtils.createIncrementalChannel(importerConfig.getConcurrency(), pipelineChannelCreator, taskProgress);
+        Dumper dumper = PipelineTypedSPILoader.getDatabaseTypedService(IncrementalDumperCreator.class, dumperConfig.getDataSourceConfig().getDatabaseType().getType())
+                .createIncrementalDumper(dumperConfig, dumperConfig.getPosition(), channel, sourceMetaDataLoader);
+        Collection<Importer> importers = createImporters(importerConfig, jobItemContext.getSink(), channel, jobItemContext);
+        PipelineTask incrementalTask = new IncrementalTask(dumperConfig.getDataSourceName(), incrementalExecuteEngine, dumper, importers, taskProgress);
         jobItemContext.getIncrementalTasks().add(incrementalTask);
     }
     
+    private Collection<Importer> createImporters(final ImporterConfiguration importerConfig, final PipelineSink sink, final PipelineChannel channel,
+                                                 final PipelineJobProgressListener jobProgressListener) {
+        Collection<Importer> result = new LinkedList<>();
+        for (int i = 0; i < importerConfig.getConcurrency(); i++) {
+            Importer importer = new SingleChannelConsumerImporter(channel, importerConfig.getBatchSize(), 3, TimeUnit.SECONDS, sink, jobProgressListener);
+            result.add(importer);
+        }
+        return result;
+    }
+    
     /**
      * Do cleanup work.
      *
diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
index bde28c399e8..92f80e1bacf 100644
--- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
+++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
@@ -205,7 +205,8 @@ public final class PipelineContainerComposer implements AutoCloseable {
             return new JdbcUrlAppender().appendQueryProperties(jdbcUrl, PropertiesBuilder.build(new Property("rewriteBatchedStatements", Boolean.TRUE.toString())));
         }
         if (DatabaseTypeUtils.isPostgreSQL(databaseType) || DatabaseTypeUtils.isOpenGauss(databaseType)) {
-            return new JdbcUrlAppender().appendQueryProperties(jdbcUrl, PropertiesBuilder.build(new Property("stringtype", "unspecified"), new Property("bitToString", Boolean.TRUE.toString())));
+            return new JdbcUrlAppender().appendQueryProperties(jdbcUrl, PropertiesBuilder.build(new Property("stringtype", "unspecified"),
+                    new Property("bitToString", Boolean.TRUE.toString()), new Property("TimeZone", "UTC")));
         }
         return jdbcUrl;
     }
@@ -383,8 +384,10 @@ public final class PipelineContainerComposer implements AutoCloseable {
             try (Connection connection = proxyDataSource.getConnection()) {
                 ResultSet resultSet = connection.createStatement().executeQuery(sql);
                 return transformResultSetToList(resultSet);
-            } catch (final SQLException ex) {
-                log.error("Data access error: ", ex);
+                // CHECKSTYLE:OFF
+            } catch (final SQLException | RuntimeException ex) {
+                // CHECKSTYLE:ON
+                log.error("Data access error, sql: {}.", sql, ex);
             }
             Awaitility.await().pollDelay(3L, TimeUnit.SECONDS).until(() -> true);
             retryNumber++;
diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
index 98393d6cdfe..b3cfafdfbdc 100644
--- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
+++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
@@ -183,7 +183,7 @@ class CDCE2EIT {
         DataSourceRecordConsumer recordConsumer = new DataSourceRecordConsumer(dataSource, containerComposer.getDatabaseType());
         CompletableFuture.runAsync(() -> new CDCClient(parameter, recordConsumer).start(), executor).whenComplete((unused, throwable) -> {
             if (null != throwable) {
-                log.error("cdc client sync failed, ", throwable);
+                log.error("cdc client sync failed", throwable);
             }
         });
     }
diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/DataSourceRecordConsumer.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/DataSourceRecordConsumer.java
index f9d837f20e7..bc646949b34 100644
--- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/DataSourceRecordConsumer.java
+++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/DataSourceRecordConsumer.java
@@ -66,6 +66,7 @@ public final class DataSourceRecordConsumer implements Consumer<List<Record>> {
     
     @Override
     public void accept(final List<Record> records) {
+        log.info("Records count: {}", records.size());
         log.debug("Records: {}", records);
         try (Connection connection = dataSource.getConnection()) {
             connection.setAutoCommit(false);
diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
index e1b6c31d9d9..155e9cc3e0a 100644
--- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
+++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
@@ -21,22 +21,18 @@ import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsist
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyContentCheckResult;
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCountCheckResult;
 import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
+import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
+import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.Dumper;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
-import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants;
-import org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
-import org.apache.shardingsphere.data.pipeline.core.importer.connector.DataSourceImporterConnector;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
+import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
-import org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
 import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
 import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
-import org.apache.shardingsphere.test.it.data.pipeline.core.fixture.FixtureInventoryIncrementalJobItemContext;
 import org.apache.shardingsphere.test.it.data.pipeline.core.util.JobConfigurationBuilder;
 import org.apache.shardingsphere.test.it.data.pipeline.core.util.PipelineContextUtils;
 import org.junit.jupiter.api.BeforeAll;
@@ -153,13 +149,7 @@ class GovernanceRepositoryAPIImplTest {
         dumperConfig.setLogicTableName("t_order");
         dumperConfig.setUniqueKeyColumns(Collections.singletonList(PipelineContextUtils.mockOrderIdColumnMetaData()));
         dumperConfig.setShardingItem(0);
-        PipelineDataSourceWrapper dataSource = mock(PipelineDataSourceWrapper.class);
-        PipelineTableMetaDataLoader metaDataLoader = new StandardPipelineTableMetaDataLoader(dataSource);
-        return new InventoryTask(dumperConfig, taskConfig.getImporterConfig(), PipelineContextUtils.getPipelineChannelCreator(), mockImporterConnector(),
-                dataSource, metaDataLoader, PipelineContextUtils.getExecuteEngine(), PipelineContextUtils.getExecuteEngine(), new FixtureInventoryIncrementalJobItemContext());
-    }
-    
-    private ImporterConnector mockImporterConnector() {
-        return new DataSourceImporterConnector(new DefaultPipelineDataSourceManager());
+        return new InventoryTask(PipelineTaskUtils.generateInventoryTaskId(dumperConfig), PipelineContextUtils.getExecuteEngine(), PipelineContextUtils.getExecuteEngine(),
+                mock(Dumper.class), mock(Importer.class), new AtomicReference<>(new PlaceholderPosition()));
     }
 }
diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureImporter.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureImporter.java
index c0c6a27ed82..23fb4908f97 100644
--- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureImporter.java
+++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureImporter.java
@@ -21,11 +21,11 @@ import org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
 import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
-import org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
+import org.apache.shardingsphere.data.pipeline.spi.importer.sink.PipelineSink;
 
 public final class FixtureImporter implements Importer {
     
-    public FixtureImporter(final ImporterConfiguration importerConfig, final ImporterConnector importerConnector, final PipelineChannel channel,
+    public FixtureImporter(final ImporterConfiguration importerConfig, final PipelineSink pipelineSink, final PipelineChannel channel,
                            final PipelineJobProgressListener jobProgressListener) {
     }
     
diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureImporterCreator.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureImporterCreator.java
deleted file mode 100644
index 8eb83e501f8..00000000000
--- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureImporterCreator.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.test.it.data.pipeline.core.fixture;
-
-import org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
-import org.apache.shardingsphere.data.pipeline.api.importer.ImporterType;
-import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
-import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
-import org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreator;
-import org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
-
-/**
- * Fixture importer creator.
- */
-public final class FixtureImporterCreator implements ImporterCreator {
-    
-    @Override
-    public Importer createImporter(final ImporterConfiguration importerConfig,
-                                   final ImporterConnector importerConnector, final PipelineChannel channel,
-                                   final PipelineJobProgressListener jobProgressListener, final ImporterType importerType) {
-        return new FixtureImporter(importerConfig, importerConnector, channel, jobProgressListener);
-    }
-    
-    @Override
-    public String getType() {
-        return "FIXTURE";
-    }
-}
diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureInventoryIncrementalJobItemContext.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureInventoryIncrementalJobItemContext.java
index e344c3123e9..8cd699c3f11 100644
--- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureInventoryIncrementalJobItemContext.java
+++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureInventoryIncrementalJobItemContext.java
@@ -25,7 +25,7 @@ import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTable
 import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalJobItemContext;
 import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalProcessContext;
 import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
-import org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
+import org.apache.shardingsphere.data.pipeline.spi.importer.sink.PipelineSink;
 
 import java.util.Collection;
 
@@ -61,7 +61,7 @@ public final class FixtureInventoryIncrementalJobItemContext implements Inventor
     }
     
     @Override
-    public ImporterConnector getImporterConnector() {
+    public PipelineSink getSink() {
         return null;
     }
     
diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/DataSourceImporterTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
similarity index 86%
rename from test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/DataSourceImporterTest.java
rename to test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
index a02398e7e49..07ad96e23ea 100644
--- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/DataSourceImporterTest.java
+++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
@@ -31,9 +31,9 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
 import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
-import org.apache.shardingsphere.data.pipeline.core.importer.DataSourceImporter;
-import org.apache.shardingsphere.data.pipeline.core.importer.connector.DataSourceImporterConnector;
-import org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
+import org.apache.shardingsphere.data.pipeline.core.importer.SingleChannelConsumerImporter;
+import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineDataSourceSink;
+import org.apache.shardingsphere.data.pipeline.spi.importer.sink.PipelineSink;
 import org.apache.shardingsphere.test.it.data.pipeline.core.fixture.FixtureInventoryIncrementalJobItemContext;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -50,15 +50,17 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 @ExtendWith(MockitoExtension.class)
-class DataSourceImporterTest {
+class PipelineDataSourceSinkTest {
     
     private static final String TABLE_NAME = "test_table";
     
@@ -80,12 +82,12 @@ class DataSourceImporterTest {
     @Mock
     private PreparedStatement preparedStatement;
     
-    private DataSourceImporter jdbcImporter;
+    private SingleChannelConsumerImporter importer;
     
     @BeforeEach
     void setUp() throws SQLException {
-        ImporterConnector importerConnector = new DataSourceImporterConnector(dataSourceManager);
-        jdbcImporter = new DataSourceImporter(mockImporterConfiguration(), importerConnector, channel, new FixtureInventoryIncrementalJobItemContext());
+        PipelineSink pipelineSink = new PipelineDataSourceSink(mockImporterConfiguration(), dataSourceManager);
+        importer = new SingleChannelConsumerImporter(channel, 100, 1, TimeUnit.SECONDS, pipelineSink, new FixtureInventoryIncrementalJobItemContext());
         when(dataSourceManager.getDataSource(dataSourceConfig)).thenReturn(dataSource);
         when(dataSource.getConnection()).thenReturn(connection);
     }
@@ -94,8 +96,8 @@ class DataSourceImporterTest {
     void assertWriteInsertDataRecord() throws SQLException {
         DataRecord insertRecord = getDataRecord("INSERT");
         when(connection.prepareStatement(any())).thenReturn(preparedStatement);
-        when(channel.fetchRecords(anyInt(), anyInt(), any())).thenReturn(mockRecords(insertRecord));
-        jdbcImporter.run();
+        when(channel.fetchRecords(anyInt(), anyLong(), any())).thenReturn(mockRecords(insertRecord));
+        importer.run();
         verify(preparedStatement).setObject(1, 1);
         verify(preparedStatement).setObject(2, 10);
         verify(preparedStatement).setObject(3, "INSERT");
@@ -106,9 +108,9 @@ class DataSourceImporterTest {
     void assertDeleteDataRecord() throws SQLException {
         DataRecord deleteRecord = getDataRecord("DELETE");
         when(connection.prepareStatement(any())).thenReturn(preparedStatement);
-        when(channel.fetchRecords(anyInt(), anyInt(), any())).thenReturn(mockRecords(deleteRecord));
+        when(channel.fetchRecords(anyInt(), anyLong(), any())).thenReturn(mockRecords(deleteRecord));
         when(preparedStatement.executeBatch()).thenReturn(new int[]{1});
-        jdbcImporter.run();
+        importer.run();
         verify(preparedStatement).setObject(1, 1);
         verify(preparedStatement).setObject(2, 10);
         verify(preparedStatement).addBatch();
@@ -118,8 +120,8 @@ class DataSourceImporterTest {
     void assertUpdateDataRecord() throws SQLException {
         DataRecord updateRecord = getDataRecord("UPDATE");
         when(connection.prepareStatement(any())).thenReturn(preparedStatement);
-        when(channel.fetchRecords(anyInt(), anyInt(), any())).thenReturn(mockRecords(updateRecord));
-        jdbcImporter.run();
+        when(channel.fetchRecords(anyInt(), anyLong(), any())).thenReturn(mockRecords(updateRecord));
+        importer.run();
         verify(preparedStatement).setObject(1, 20);
         verify(preparedStatement).setObject(2, "UPDATE");
         verify(preparedStatement).setObject(3, 1);
@@ -131,8 +133,8 @@ class DataSourceImporterTest {
     void assertUpdatePrimaryKeyDataRecord() throws SQLException {
         DataRecord updateRecord = getUpdatePrimaryKeyDataRecord();
         when(connection.prepareStatement(any())).thenReturn(preparedStatement);
-        when(channel.fetchRecords(anyInt(), anyInt(), any())).thenReturn(mockRecords(updateRecord));
-        jdbcImporter.run();
+        when(channel.fetchRecords(anyInt(), anyLong(), any())).thenReturn(mockRecords(updateRecord));
+        importer.run();
         InOrder inOrder = inOrder(preparedStatement);
         inOrder.verify(preparedStatement).setObject(1, 2);
         inOrder.verify(preparedStatement).setObject(2, 10);
diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/prepare/InventoryTaskSplitterTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
index be205ea76ca..a6e76f8d71e 100644
--- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
+++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
@@ -75,7 +75,7 @@ class InventoryTaskSplitterTest {
     private void initJobItemContext() {
         MigrationJobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration();
         jobItemContext = PipelineContextUtils.mockMigrationJobItemContext(jobConfig);
-        dataSourceManager = (PipelineDataSourceManager) jobItemContext.getImporterConnector().getConnector();
+        dataSourceManager = jobItemContext.getDataSourceManager();
     }
     
     @AfterEach
diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/IncrementalTaskTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/IncrementalTaskTest.java
index 49d84f74c7a..f129614ffaf 100644
--- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/IncrementalTaskTest.java
+++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/IncrementalTaskTest.java
@@ -17,14 +17,12 @@
 
 package org.apache.shardingsphere.test.it.data.pipeline.core.task;
 
-import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
+import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
+import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.Dumper;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
-import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
-import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskProgress;
 import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
-import org.apache.shardingsphere.test.it.data.pipeline.core.fixture.FixtureImporterConnector;
-import org.apache.shardingsphere.test.it.data.pipeline.core.fixture.FixtureInventoryIncrementalJobItemContext;
 import org.apache.shardingsphere.test.it.data.pipeline.core.util.JobConfigurationBuilder;
 import org.apache.shardingsphere.test.it.data.pipeline.core.util.PipelineContextUtils;
 import org.junit.jupiter.api.AfterEach;
@@ -33,6 +31,7 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
+import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -56,10 +55,8 @@ class IncrementalTaskTest {
     void setUp() {
         MigrationTaskConfiguration taskConfig = PipelineContextUtils.mockMigrationJobItemContext(JobConfigurationBuilder.createJobConfiguration()).getTaskConfig();
         taskConfig.getDumperConfig().setPosition(new PlaceholderPosition());
-        PipelineTableMetaDataLoader metaDataLoader = new StandardPipelineTableMetaDataLoader(mock(PipelineDataSourceWrapper.class));
-        incrementalTask = new IncrementalTask(3, taskConfig.getDumperConfig(), taskConfig.getImporterConfig(),
-                PipelineContextUtils.getPipelineChannelCreator(), new FixtureImporterConnector(), metaDataLoader, PipelineContextUtils.getExecuteEngine(),
-                new FixtureInventoryIncrementalJobItemContext());
+        incrementalTask = new IncrementalTask("ds_0", PipelineContextUtils.getExecuteEngine(), mock(Dumper.class),
+                Collections.singletonList(mock(Importer.class)), new IncrementalTaskProgress(new PlaceholderPosition()));
     }
     
     @AfterEach
diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/InventoryTaskTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/InventoryTaskTest.java
index 8aeb66607a5..5c98f92a38d 100644
--- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/InventoryTaskTest.java
+++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/InventoryTaskTest.java
@@ -21,14 +21,18 @@ import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfigura
 import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
+import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
+import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.Dumper;
+import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.IntegerPrimaryKeyPosition;
 import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
+import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumper;
 import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
+import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
-import org.apache.shardingsphere.test.it.data.pipeline.core.fixture.FixtureImporterConnector;
-import org.apache.shardingsphere.test.it.data.pipeline.core.fixture.FixtureInventoryIncrementalJobItemContext;
 import org.apache.shardingsphere.test.it.data.pipeline.core.util.JobConfigurationBuilder;
 import org.apache.shardingsphere.test.it.data.pipeline.core.util.PipelineContextUtils;
 import org.junit.jupiter.api.AfterAll;
@@ -44,10 +48,12 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.mock;
 
 class InventoryTaskTest {
     
@@ -73,13 +79,14 @@ class InventoryTaskTest {
     @Test
     void assertStartWithGetEstimatedRowsFailure() {
         InventoryDumperConfiguration inventoryDumperConfig = createInventoryDumperConfiguration("t_non_exist", "t_non_exist");
+        AtomicReference<IngestPosition> position = new AtomicReference<>(inventoryDumperConfig.getPosition());
+        PipelineChannel channel = PipelineTaskUtils.createInventoryChannel(PipelineContextUtils.getPipelineChannelCreator(), 100, position);
         PipelineDataSourceWrapper dataSource = DATA_SOURCE_MANAGER.getDataSource(inventoryDumperConfig.getDataSourceConfig());
         PipelineTableMetaDataLoader metaDataLoader = new StandardPipelineTableMetaDataLoader(dataSource);
-        InventoryTask inventoryTask = new InventoryTask(inventoryDumperConfig, taskConfig.getImporterConfig(),
-                PipelineContextUtils.getPipelineChannelCreator(), new FixtureImporterConnector(), dataSource,
-                metaDataLoader, PipelineContextUtils.getExecuteEngine(), PipelineContextUtils.getExecuteEngine(), new FixtureInventoryIncrementalJobItemContext());
+        InventoryTask inventoryTask = new InventoryTask(PipelineTaskUtils.generateInventoryTaskId(inventoryDumperConfig),
+                PipelineContextUtils.getExecuteEngine(), PipelineContextUtils.getExecuteEngine(),
+                new InventoryDumper(inventoryDumperConfig, channel, dataSource, metaDataLoader), mock(Importer.class), position);
         assertThrows(ExecutionException.class, () -> CompletableFuture.allOf(inventoryTask.start().toArray(new CompletableFuture[0])).get(10L, TimeUnit.SECONDS));
-        inventoryTask.close();
     }
     
     @Test
@@ -87,11 +94,9 @@ class InventoryTaskTest {
         initTableData(taskConfig.getDumperConfig());
         // TODO use t_order_0, and also others
         InventoryDumperConfiguration inventoryDumperConfig = createInventoryDumperConfiguration("t_order", "t_order");
-        PipelineDataSourceWrapper dataSource = DATA_SOURCE_MANAGER.getDataSource(inventoryDumperConfig.getDataSourceConfig());
-        PipelineTableMetaDataLoader metaDataLoader = new StandardPipelineTableMetaDataLoader(dataSource);
-        InventoryTask inventoryTask = new InventoryTask(inventoryDumperConfig, taskConfig.getImporterConfig(),
-                PipelineContextUtils.getPipelineChannelCreator(), new FixtureImporterConnector(), dataSource,
-                metaDataLoader, PipelineContextUtils.getExecuteEngine(), PipelineContextUtils.getExecuteEngine(), new FixtureInventoryIncrementalJobItemContext());
+        AtomicReference<IngestPosition> position = new AtomicReference<>(inventoryDumperConfig.getPosition());
+        InventoryTask inventoryTask = new InventoryTask(PipelineTaskUtils.generateInventoryTaskId(inventoryDumperConfig),
+                PipelineContextUtils.getExecuteEngine(), PipelineContextUtils.getExecuteEngine(), mock(Dumper.class), mock(Importer.class), position);
         CompletableFuture.allOf(inventoryTask.start().toArray(new CompletableFuture[0])).get(10L, TimeUnit.SECONDS);
         assertThat(inventoryTask.getTaskProgress().getPosition(), instanceOf(IntegerPrimaryKeyPosition.class));
         inventoryTask.close();
diff --git a/test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreator b/test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreator
deleted file mode 100644
index 2d3a03a50cc..00000000000
--- a/test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreator
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.shardingsphere.test.it.data.pipeline.core.fixture.FixtureImporterCreator