You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2022/01/08 11:58:36 UTC

[shardingsphere] branch master updated: Add streamChannel in scaling config and refactor blockQueueSize config; Add PipelineChannelFactory SPI and MEMORY impl; Integrate streamChannel with incremental and inventory task (#14624)

This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 67e6590  Add streamChannel in scaling config and refactor blockQueueSize config; Add PipelineChannelFactory SPI and MEMORY impl; Integrate streamChannel with incremental and inventory task (#14624)
67e6590 is described below

commit 67e6590665caf2724de6fa96f272b369759c3172
Author: Hongsheng Zhong <sa...@126.com>
AuthorDate: Sat Jan 8 19:57:30 2022 +0800

    Add streamChannel in scaling config and refactor blockQueueSize config; Add PipelineChannelFactory SPI and MEMORY impl; Integrate streamChannel with incremental and inventory task (#14624)
---
 .../user-manual/shardingsphere-scaling/build.cn.md | 10 +++-
 .../user-manual/shardingsphere-scaling/build.en.md | 10 +++-
 .../resources/yaml/encrypt-dataConverters.yaml     |  5 +-
 ...hardingRuleAlteredJobConfigurationPreparer.java | 14 +----
 .../src/test/resources/yaml/sharding-rule.yaml     |  5 +-
 .../src/test/resources/yaml/sharding-scaling.yaml  |  5 +-
 .../OnRuleAlteredActionConfiguration.java          |  4 +-
 .../YamlOnRuleAlteredActionConfiguration.java      |  4 +-
 ...nRuleAlteredActionConfigurationYamlSwapper.java |  5 +-
 ...eAlteredActionConfigurationYamlSwapperTest.java |  6 +-
 .../distribution/AutoAcknowledgeChannel.java       |  2 +-
 .../ingest/channel/distribution/BitSetChannel.java |  7 +--
 .../channel/distribution/BlockingQueueChannel.java | 12 ++--
 .../ingest/channel/distribution/MemoryChannel.java | 10 ++--
 .../ingest/dumper/AbstractInventoryDumper.java     |  5 +-
 .../core/prepare/InventoryTaskSplitter.java        |  6 +-
 .../channel/MemoryPipelineChannelFactory.java      | 68 ++++++++++++++++++++++
 .../data/pipeline/core/task/IncrementalTask.java   | 12 +++-
 .../data/pipeline/core/task/InventoryTask.java     | 11 +++-
 .../scenario/rulealtered/RuleAlteredContext.java   | 12 ++++
 .../rulealtered/RuleAlteredJobPreparer.java        |  7 ++-
 ...eline.spi.ingest.channel.PipelineChannelFactory | 27 +--------
 .../mysql/ingest/MySQLIncrementalDumper.java       |  5 +-
 .../opengauss/ingest/OpenGaussWalDumper.java       |  5 +-
 .../postgresql/ingest/PostgreSQLWalDumper.java     |  5 +-
 .../api/config/ingest/DumperConfiguration.java     |  2 -
 .../ingest/InventoryDumperConfiguration.java       |  1 -
 .../pipeline/api}/ingest/channel/AckCallback.java  |  2 +-
 .../data/pipeline/api/ingest/channel/Channel.java  | 13 +++--
 .../ingest/channel/PipelineChannelFactory.java}    | 21 ++++---
 .../src/main/resources/conf/config-sharding.yaml   | 10 +++-
 .../api/impl/GovernanceRepositoryAPIImplTest.java  |  4 +-
 .../channel/distribution/MemoryChannelTest.java    |  2 +-
 .../core/prepare/InventoryTaskSplitterTest.java    | 16 +++--
 .../pipeline/core/task/IncrementalTaskTest.java    |  3 +-
 .../data/pipeline/core/task/InventoryTaskTest.java |  6 +-
 .../pipeline/core/util/RuleAlteredContextUtil.java | 13 +++++
 .../config_sharding_sphere_jdbc_source.yaml        |  5 +-
 .../config_sharding_sphere_jdbc_target.yaml        |  5 +-
 39 files changed, 238 insertions(+), 127 deletions(-)

diff --git a/docs/document/content/user-manual/shardingsphere-scaling/build.cn.md b/docs/document/content/user-manual/shardingsphere-scaling/build.cn.md
index 50547dc..9ea5edd 100644
--- a/docs/document/content/user-manual/shardingsphere-scaling/build.cn.md
+++ b/docs/document/content/user-manual/shardingsphere-scaling/build.cn.md
@@ -53,7 +53,6 @@ rules:
   scalingName: # 启用的弹性伸缩配置名称
   scaling:
     <scaling-action-config-name> (+):
-      blockQueueSize: # 数据通道阻塞队列大小
       input:
         workerThread: # 从源端摄取全量数据的线程池大小
         batchSize: # 一次查询操作返回的最大记录数
@@ -68,6 +67,10 @@ rules:
           type: # 算法类型。可选项:TPS
           props: # 算法属性
             tps: # tps属性。适用算法类型:TPS
+      streamChannel: # 数据通道,连接生产者和消费者,用于 input 和 output 环节。如果没配置则默认使用 MEMORY 类型
+        type: # 算法类型。可选项:MEMORY
+        props: # 算法属性
+          block-queue-size: # 属性:阻塞队列大小
       completionDetector: # 作业是否接近完成检测算法。如果不配置,那么系统无法自动进行后续步骤,可以通过 DistSQL 手动操作。
         type: # 算法类型。可选项:IDLE
         props: # 算法属性
@@ -87,7 +90,6 @@ rules:
   scalingName: default_scaling
   scaling:
     default_scaling:
-      blockQueueSize: 10000
       input:
         workerThread: 40
         batchSize: 1000
@@ -102,6 +104,10 @@ rules:
           type: TPS
           props:
             tps: 2000
+      streamChannel:
+        type: MEMORY
+        props:
+          block-queue-size: 10000
       completionDetector:
         type: IDLE
         props:
diff --git a/docs/document/content/user-manual/shardingsphere-scaling/build.en.md b/docs/document/content/user-manual/shardingsphere-scaling/build.en.md
index 3fe6b41..fa58430 100644
--- a/docs/document/content/user-manual/shardingsphere-scaling/build.en.md
+++ b/docs/document/content/user-manual/shardingsphere-scaling/build.en.md
@@ -52,7 +52,6 @@ rules:
   scalingName: # Enabled scaling action config name
   scaling:
     <scaling-action-config-name> (+):
-      blockQueueSize: # Data channel blocking queue size
       input:
         workerThread: # Worker thread pool size for inventory data ingestion from source
         batchSize: # Maximum records count of a DML select operation
@@ -67,6 +66,10 @@ rules:
           type: # Algorithm type. Options: TPS
           props: # Algorithm properties
             tps: # TPS property. Available for types: TPS
+      streamChannel: # Algorithm of channel that connect producer and consumer, used for input and output. If it's not configured, then system will use MEMORY type
+        type: # Algorithm type. Options: MEMORY
+        props: # Algorithm properties
+          block-queue-size: # Property: data channel block queue size. Available for types: MEMORY
       completionDetector: # Completion detect algorithm. If it's not configured, then system won't continue to do next steps automatically.
         type: # Algorithm type. Options: IDLE
         props: # Algorithm properties
@@ -86,7 +89,6 @@ rules:
   scalingName: default_scaling
   scaling:
     default_scaling:
-      blockQueueSize: 10000
       input:
         workerThread: 40
         batchSize: 1000
@@ -101,6 +103,10 @@ rules:
           type: TPS
           props:
             tps: 2000
+      streamChannel:
+        type: MEMORY
+        props:
+          block-queue-size: 10000
       completionDetector:
         type: IDLE
         props:
diff --git a/shardingsphere-features/shardingsphere-encrypt/shardingsphere-encrypt-core/src/test/resources/yaml/encrypt-dataConverters.yaml b/shardingsphere-features/shardingsphere-encrypt/shardingsphere-encrypt-core/src/test/resources/yaml/encrypt-dataConverters.yaml
index 39d44da..04a1ae2 100644
--- a/shardingsphere-features/shardingsphere-encrypt/shardingsphere-encrypt-core/src/test/resources/yaml/encrypt-dataConverters.yaml
+++ b/shardingsphere-features/shardingsphere-encrypt/shardingsphere-encrypt-core/src/test/resources/yaml/encrypt-dataConverters.yaml
@@ -18,7 +18,6 @@
 dataConverterName: default_convert
 dataConverters:
   default_convert:
-    blockQueueSize: 10000
     input:
       workerThread: 40
       batchSize: 1000
@@ -33,6 +32,10 @@ dataConverters:
         type: TPS
         props:
           tps: 2000
+    streamChannel:
+      type: MEMORY
+      props:
+        block-queue-size: 10000
     completionDetector:
       type: IDLE
       props:
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/schedule/ShardingRuleAlteredJobConfigurationPreparer.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/schedule/ShardingRuleAlteredJobConfigurationPreparer.java
index b761b6e..aa2419c 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/schedule/ShardingRuleAlteredJobConfigurationPreparer.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/schedule/ShardingRuleAlteredJobConfigurationPreparer.java
@@ -34,7 +34,6 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.Shardi
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredJobConfigurationPreparer;
 import org.apache.shardingsphere.infra.config.datasource.DataSourceConfiguration;
-import org.apache.shardingsphere.infra.config.rulealtered.OnRuleAlteredActionConfiguration;
 import org.apache.shardingsphere.infra.datanode.DataNode;
 import org.apache.shardingsphere.infra.yaml.config.swapper.YamlDataSourceConfigurationSwapper;
 import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
@@ -140,18 +139,13 @@ public final class ShardingRuleAlteredJobConfigurationPreparer implements RuleAl
                 tableMap.put(dataNode.getTableName(), each.getLogicTableName());
             }
         }
-        OnRuleAlteredActionConfiguration ruleAlteredActionConfig = getRuleAlteredActionConfig(targetRuleConfig.orElse(sourceRuleConfig)).orElse(null);
-        DumperConfiguration dumperConfig = createDumperConfig(dataSourceName, sourceDataSource.get(dataSourceName).getProps(), tableMap, ruleAlteredActionConfig);
+        DumperConfiguration dumperConfig = createDumperConfig(dataSourceName, sourceDataSource.get(dataSourceName).getProps(), tableMap);
         ImporterConfiguration importerConfig = createImporterConfig(pipelineConfig, handleConfig, shardingColumnsMap);
         TaskConfiguration taskConfig = new TaskConfiguration(handleConfig, dumperConfig, importerConfig);
         log.info("toTaskConfigs, dataSourceName={}, taskConfig={}", dataSourceName, taskConfig);
         return Collections.singletonList(taskConfig);
     }
     
-    private Optional<OnRuleAlteredActionConfiguration> getRuleAlteredActionConfig(final ShardingRuleConfiguration shardingRuleConfig) {
-        return Optional.ofNullable(shardingRuleConfig.getScaling().get(shardingRuleConfig.getScalingName()));
-    }
-    
     private static ShardingSpherePipelineDataSourceConfiguration getSourceConfiguration(final PipelineConfiguration pipelineConfig) {
         PipelineDataSourceConfiguration result = PipelineDataSourceConfigurationFactory.newInstance(pipelineConfig.getSource().getType(), pipelineConfig.getSource().getParameter());
         return (ShardingSpherePipelineDataSourceConfiguration) result;
@@ -194,15 +188,11 @@ public final class ShardingRuleAlteredJobConfigurationPreparer implements RuleAl
         return Collections.emptySet();
     }
     
-    private static DumperConfiguration createDumperConfig(final String dataSourceName, final Map<String, Object> props, final Map<String, String> tableMap,
-                                                          final OnRuleAlteredActionConfiguration ruleAlteredActionConfig) {
+    private static DumperConfiguration createDumperConfig(final String dataSourceName, final Map<String, Object> props, final Map<String, String> tableMap) {
         DumperConfiguration result = new DumperConfiguration();
         result.setDataSourceName(dataSourceName);
         result.setDataSourceConfig(new StandardPipelineDataSourceConfiguration(YamlEngine.marshal(props)));
         result.setTableNameMap(tableMap);
-        if (null != ruleAlteredActionConfig) {
-            result.setBlockQueueSize(ruleAlteredActionConfig.getBlockQueueSize());
-        }
         return result;
     }
     
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-rule.yaml b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-rule.yaml
index 4dd83cf..58f8850 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-rule.yaml
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-rule.yaml
@@ -108,7 +108,6 @@ rules:
   scalingName: default_scaling
   scaling:
     default_scaling:
-      blockQueueSize: 10000
       input:
         workerThread: 40
         batchSize: 1000
@@ -123,6 +122,10 @@ rules:
           type: TPS
           props:
             tps: 2000
+      streamChannel:
+        type: MEMORY
+        props:
+          block-queue-size: 10000
       completionDetector:
         type: IDLE
         props:
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-scaling.yaml b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-scaling.yaml
index bb8027d..20ee9fe 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-scaling.yaml
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-scaling.yaml
@@ -18,7 +18,6 @@
 scalingName: default_scaling
 scaling:
   default_scaling:
-    blockQueueSize: 10000
     input:
       workerThread: 40
       batchSize: 1000
@@ -33,6 +32,10 @@ scaling:
         type: TPS
         props:
           tps: 2000
+    streamChannel:
+      type: MEMORY
+      props:
+        block-queue-size: 10000
     completionDetector:
       type: IDLE
       props:
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rulealtered/OnRuleAlteredActionConfiguration.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rulealtered/OnRuleAlteredActionConfiguration.java
index 07a82a1..f917025 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rulealtered/OnRuleAlteredActionConfiguration.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rulealtered/OnRuleAlteredActionConfiguration.java
@@ -30,12 +30,12 @@ import org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmC
 @ToString
 public final class OnRuleAlteredActionConfiguration {
     
-    private final int blockQueueSize;
-    
     private final InputConfiguration input;
     
     private final OutputConfiguration output;
     
+    private final ShardingSphereAlgorithmConfiguration streamChannel;
+    
     private final ShardingSphereAlgorithmConfiguration completionDetector;
     
     private final ShardingSphereAlgorithmConfiguration dataConsistencyChecker;
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/rulealtered/YamlOnRuleAlteredActionConfiguration.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/rulealtered/YamlOnRuleAlteredActionConfiguration.java
index 4934ea8..d8875e7 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/rulealtered/YamlOnRuleAlteredActionConfiguration.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/rulealtered/YamlOnRuleAlteredActionConfiguration.java
@@ -32,12 +32,12 @@ import org.apache.shardingsphere.infra.yaml.config.pojo.algorithm.YamlShardingSp
 @ToString
 public final class YamlOnRuleAlteredActionConfiguration implements YamlConfiguration {
     
-    private int blockQueueSize = 10000;
-    
     private YamlInputConfiguration input;
     
     private YamlOutputConfiguration output;
     
+    private YamlShardingSphereAlgorithmConfiguration streamChannel;
+    
     private YamlShardingSphereAlgorithmConfiguration completionDetector;
     
     private YamlShardingSphereAlgorithmConfiguration dataConsistencyChecker;
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rulealtered/OnRuleAlteredActionConfigurationYamlSwapper.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rulealtered/OnRuleAlteredActionConfigurationYamlSwapper.java
index 11e959e..744e171 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rulealtered/OnRuleAlteredActionConfigurationYamlSwapper.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rulealtered/OnRuleAlteredActionConfigurationYamlSwapper.java
@@ -43,9 +43,9 @@ public final class OnRuleAlteredActionConfigurationYamlSwapper implements YamlCo
             return null;
         }
         YamlOnRuleAlteredActionConfiguration result = new YamlOnRuleAlteredActionConfiguration();
-        result.setBlockQueueSize(data.getBlockQueueSize());
         result.setInput(INPUT_CONFIG_SWAPPER.swapToYamlConfiguration(data.getInput()));
         result.setOutput(OUTPUT_CONFIG_SWAPPER.swapToYamlConfiguration(data.getOutput()));
+        result.setStreamChannel(ALGORITHM_CONFIG_YAML_SWAPPER.swapToYamlConfiguration(data.getStreamChannel()));
         result.setCompletionDetector(ALGORITHM_CONFIG_YAML_SWAPPER.swapToYamlConfiguration(data.getCompletionDetector()));
         result.setDataConsistencyChecker(ALGORITHM_CONFIG_YAML_SWAPPER.swapToYamlConfiguration(data.getDataConsistencyChecker()));
         return result;
@@ -56,9 +56,10 @@ public final class OnRuleAlteredActionConfigurationYamlSwapper implements YamlCo
         if (null == yamlConfig) {
             return null;
         }
-        return new OnRuleAlteredActionConfiguration(yamlConfig.getBlockQueueSize(),
+        return new OnRuleAlteredActionConfiguration(
                 INPUT_CONFIG_SWAPPER.swapToObject(yamlConfig.getInput()),
                 OUTPUT_CONFIG_SWAPPER.swapToObject(yamlConfig.getOutput()),
+                ALGORITHM_CONFIG_YAML_SWAPPER.swapToObject(yamlConfig.getStreamChannel()),
                 ALGORITHM_CONFIG_YAML_SWAPPER.swapToObject(yamlConfig.getCompletionDetector()),
                 ALGORITHM_CONFIG_YAML_SWAPPER.swapToObject(yamlConfig.getDataConsistencyChecker()));
     }
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/yaml/config/swapper/rulealtered/OnRuleAlteredActionConfigurationYamlSwapperTest.java b/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/yaml/config/swapper/rulealtered/OnRuleAlteredActionConfigurationYamlSwapperTest.java
index 25b9f8f..e8b0a67 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/yaml/config/swapper/rulealtered/OnRuleAlteredActionConfigurationYamlSwapperTest.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/yaml/config/swapper/rulealtered/OnRuleAlteredActionConfigurationYamlSwapperTest.java
@@ -38,20 +38,22 @@ public final class OnRuleAlteredActionConfigurationYamlSwapperTest {
     @Test
     public void assertSwap() {
         YamlOnRuleAlteredActionConfiguration yamlConfig = new YamlOnRuleAlteredActionConfiguration();
-        yamlConfig.setBlockQueueSize(1000);
         Properties rateLimiterProps = new Properties();
         rateLimiterProps.setProperty("batch-size", "1000");
         rateLimiterProps.setProperty("qps", "50");
         YamlInputConfiguration yamlInputConfig = new YamlInputConfiguration();
+        yamlConfig.setInput(yamlInputConfig);
         yamlInputConfig.setWorkerThread(40);
         yamlInputConfig.setBatchSize(1000);
         yamlInputConfig.setRateLimiter(new YamlShardingSphereAlgorithmConfiguration("INPUT", rateLimiterProps));
-        yamlConfig.setInput(yamlInputConfig);
         YamlOutputConfiguration yamlOutputConfig = new YamlOutputConfiguration();
         yamlOutputConfig.setWorkerThread(40);
         yamlOutputConfig.setBatchSize(1000);
         yamlOutputConfig.setRateLimiter(new YamlShardingSphereAlgorithmConfiguration("OUTPUT", rateLimiterProps));
         yamlConfig.setOutput(yamlOutputConfig);
+        Properties streamChannelProps = new Properties();
+        streamChannelProps.setProperty("block-queue-size", "10000");
+        yamlConfig.setStreamChannel(new YamlShardingSphereAlgorithmConfiguration("MEMORY", streamChannelProps));
         Properties completionDetectorProps = new Properties();
         completionDetectorProps.setProperty("incremental-task-idle-minute-threshold", "30");
         yamlConfig.setCompletionDetector(new YamlShardingSphereAlgorithmConfiguration("IDLE", completionDetectorProps));
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/AutoAcknowledgeChannel.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/AutoAcknowledgeChannel.java
index 2d04a30..ea1e508 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/AutoAcknowledgeChannel.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/AutoAcknowledgeChannel.java
@@ -34,7 +34,7 @@ public final class AutoAcknowledgeChannel extends AbstractBitSetChannel {
     }
     
     @Override
-    public List<Record> fetchRecords(final int batchSize, final int timeout) {
+    public List<Record> fetchRecords(final int batchSize, final int timeoutSeconds) {
         throw new UnsupportedOperationException("Auto ack channel can not fetch records.");
     }
     
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/BitSetChannel.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/BitSetChannel.java
index 77a4cde..0ca3b30 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/BitSetChannel.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/BitSetChannel.java
@@ -32,18 +32,17 @@ public interface BitSetChannel {
      *
      * @param dataRecord data
      * @param index data index
-     * @throws InterruptedException if thread interrupted
      */
-    void pushRecord(Record dataRecord, long index) throws InterruptedException;
+    void pushRecord(Record dataRecord, long index);
     
     /**
      * Fetch {@code Record} from channel, if the timeout also returns the record.
      *
      * @param batchSize record batch size
-     * @param timeout timeout(seconds)
+     * @param timeoutSeconds timeout(seconds)
      * @return record
      */
-    List<Record> fetchRecords(int batchSize, int timeout);
+    List<Record> fetchRecords(int batchSize, int timeoutSeconds);
     
     /**
      * Ack the last batch.
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/BlockingQueueChannel.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/BlockingQueueChannel.java
index b4d4c24..e88cebd 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/BlockingQueueChannel.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/BlockingQueueChannel.java
@@ -43,18 +43,22 @@ public final class BlockingQueueChannel extends AbstractBitSetChannel {
     }
     
     @Override
-    public void pushRecord(final Record dataRecord, final long index) throws InterruptedException {
+    public void pushRecord(final Record dataRecord, final long index) {
         getManualBitSet().set(index);
-        queue.put(dataRecord);
+        try {
+            queue.put(dataRecord);
+        } catch (final InterruptedException ex) {
+            throw new RuntimeException("put " + dataRecord + " into queue at index " + index + " failed", ex);
+        }
     }
     
     // TODO thread-safe?
     @Override
-    public List<Record> fetchRecords(final int batchSize, final int timeout) {
+    public List<Record> fetchRecords(final int batchSize, final int timeoutSeconds) {
         List<Record> result = new ArrayList<>(batchSize);
         long start = System.currentTimeMillis();
         while (batchSize > queue.size()) {
-            if (timeout * 1000L <= System.currentTimeMillis() - start) {
+            if (timeoutSeconds * 1000L <= System.currentTimeMillis() - start) {
                 break;
             }
             ThreadUtil.sleep(100L);
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/MemoryChannel.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/MemoryChannel.java
index bc13ae1..b704955 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/MemoryChannel.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/MemoryChannel.java
@@ -18,12 +18,12 @@
 package org.apache.shardingsphere.data.pipeline.core.ingest.channel.distribution;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.ingest.channel.AckCallback;
 import org.apache.shardingsphere.data.pipeline.api.ingest.channel.Channel;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
-import org.apache.shardingsphere.data.pipeline.core.ingest.channel.AckCallback;
 
 import java.util.BitSet;
 import java.util.HashMap;
@@ -87,7 +87,7 @@ public final class MemoryChannel implements Channel {
     }
     
     @Override
-    public void pushRecord(final Record record) throws InterruptedException {
+    public void pushRecord(final Record record) {
         if (FinishedRecord.class.equals(record.getClass())) {
             for (int i = 0; i < channels.length; i++) {
                 pushRecord(record, i);
@@ -101,14 +101,14 @@ public final class MemoryChannel implements Channel {
         }
     }
     
-    private void pushRecord(final Record record, final int index) throws InterruptedException {
+    private void pushRecord(final Record record, final int index) {
         toBeAckBitSetIndexes.add(index);
         getBitSetChannel(index).pushRecord(record, indexAutoIncreaseGenerator.getAndIncrement());
     }
     
     @Override
-    public List<Record> fetchRecords(final int batchSize, final int timeout) {
-        return findChannel().fetchRecords(batchSize, timeout);
+    public List<Record> fetchRecords(final int batchSize, final int timeoutSeconds) {
+        return findChannel().fetchRecords(batchSize, timeoutSeconds);
     }
     
     @Override
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
index 6860835..98b39bb 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
@@ -187,9 +187,6 @@ public abstract class AbstractInventoryDumper extends AbstractLifecycleExecutor
     }
     
     private void pushRecord(final Record record) {
-        try {
-            channel.pushRecord(record);
-        } catch (final InterruptedException ignored) {
-        }
+        channel.pushRecord(record);
     }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
index b7a1b03..0b2cd1a 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
@@ -32,6 +32,7 @@ import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepare
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
 import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
 import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
+import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelFactory;
 import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
 import org.apache.shardingsphere.infra.config.rulealtered.OnRuleAlteredActionConfiguration.InputConfiguration;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
@@ -64,15 +65,16 @@ public final class InventoryTaskSplitter {
      * @param jobContext job context
      * @param taskConfig task configuration
      * @param dataSourceManager data source manager
+     * @param pipelineChannelFactory channel factory
      * @param importerExecuteEngine execute engine
      * @return split inventory data task
      */
     // TODO remove jobContext, use init JobProgress -  sourceDatabaseType - readBatchSize - rateLimitAlgorithm
     public List<InventoryTask> splitInventoryData(final RuleAlteredJobContext jobContext, final TaskConfiguration taskConfig, final PipelineDataSourceManager dataSourceManager,
-                                                  final ExecuteEngine importerExecuteEngine) {
+                                                  final PipelineChannelFactory pipelineChannelFactory, final ExecuteEngine importerExecuteEngine) {
         List<InventoryTask> result = new LinkedList<>();
         for (InventoryDumperConfiguration each : splitDumperConfig(jobContext, taskConfig.getDumperConfig(), dataSourceManager)) {
-            result.add(new InventoryTask(each, taskConfig.getImporterConfig(), importerExecuteEngine));
+            result.add(new InventoryTask(each, taskConfig.getImporterConfig(), pipelineChannelFactory, importerExecuteEngine));
         }
         return result;
     }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/ingest/channel/MemoryPipelineChannelFactory.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/ingest/channel/MemoryPipelineChannelFactory.java
new file mode 100644
index 0000000..44d00fd
--- /dev/null
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/ingest/channel/MemoryPipelineChannelFactory.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.spi.ingest.channel;
+
+import com.google.common.base.Strings;
+import org.apache.shardingsphere.data.pipeline.api.ingest.channel.AckCallback;
+import org.apache.shardingsphere.data.pipeline.api.ingest.channel.Channel;
+import org.apache.shardingsphere.data.pipeline.core.ingest.channel.distribution.MemoryChannel;
+import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelFactory;
+
+import java.util.Properties;
+
+/**
+ * Memory implementation of pipeline channel factory.
+ */
+public final class MemoryPipelineChannelFactory implements PipelineChannelFactory {
+    
+    public static final String TYPE = "MEMORY";
+    
+    private static final String BLOCK_QUEUE_SIZE_KEY = "block-queue-size";
+    
+    private int blockQueueSize = 10000;
+    
+    private Properties props = new Properties();
+    
+    @Override
+    public Properties getProps() {
+        return props;
+    }
+    
+    @Override
+    public void setProps(final Properties props) {
+        this.props = props;
+    }
+    
+    @Override
+    public void init() {
+        String blockQueueSizeValue = props.getProperty(BLOCK_QUEUE_SIZE_KEY);
+        if (!Strings.isNullOrEmpty(blockQueueSizeValue)) {
+            blockQueueSize = Integer.parseInt(blockQueueSizeValue);
+        }
+    }
+    
+    @Override
+    public Channel createPipelineChannel(final int outputConcurrency, final AckCallback ackCallback) {
+        return new MemoryChannel(outputConcurrency, blockQueueSize, ackCallback);
+    }
+    
+    @Override
+    public String getType() {
+        return TYPE;
+    }
+}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
index 3c1ce54..c1289c0 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
@@ -23,6 +23,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
+import org.apache.shardingsphere.data.pipeline.api.ingest.channel.Channel;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
 import org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskProgress;
@@ -30,9 +31,9 @@ import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourc
 import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobExecutionException;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
-import org.apache.shardingsphere.data.pipeline.core.ingest.channel.distribution.MemoryChannel;
 import org.apache.shardingsphere.data.pipeline.spi.importer.Importer;
 import org.apache.shardingsphere.data.pipeline.spi.importer.ImporterListener;
+import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelFactory;
 import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.Dumper;
 import org.apache.shardingsphere.scaling.core.job.dumper.DumperFactory;
 import org.apache.shardingsphere.scaling.core.job.importer.ImporterFactory;
@@ -53,12 +54,15 @@ public final class IncrementalTask extends AbstractLifecycleExecutor implements
     @Getter
     private final String taskId;
     
+    // TODO put in `output` config and ImporterConfiguration. Why is it just needed for incremental task but not inventory task.
     private final int concurrency;
     
     private final DumperConfiguration dumperConfig;
     
     private final ImporterConfiguration importerConfig;
     
+    private final PipelineChannelFactory pipelineChannelFactory;
+    
     private final ExecuteEngine incrementalDumperExecuteEngine;
     
     private final PipelineDataSourceManager dataSourceManager;
@@ -68,10 +72,12 @@ public final class IncrementalTask extends AbstractLifecycleExecutor implements
     @Getter
     private final IncrementalTaskProgress progress;
     
-    public IncrementalTask(final int concurrency, final DumperConfiguration dumperConfig, final ImporterConfiguration importerConfig, final ExecuteEngine incrementalDumperExecuteEngine) {
+    public IncrementalTask(final int concurrency, final DumperConfiguration dumperConfig, final ImporterConfiguration importerConfig,
+            final PipelineChannelFactory pipelineChannelFactory, final ExecuteEngine incrementalDumperExecuteEngine) {
         this.concurrency = concurrency;
         this.dumperConfig = dumperConfig;
         this.importerConfig = importerConfig;
+        this.pipelineChannelFactory = pipelineChannelFactory;
         this.incrementalDumperExecuteEngine = incrementalDumperExecuteEngine;
         dataSourceManager = new PipelineDataSourceManager();
         taskId = dumperConfig.getDataSourceName();
@@ -100,7 +106,7 @@ public final class IncrementalTask extends AbstractLifecycleExecutor implements
     }
     
     private void instanceChannel(final Collection<Importer> importers) {
-        MemoryChannel channel = new MemoryChannel(importers.size(), dumperConfig.getBlockQueueSize(), records -> {
+        Channel channel = pipelineChannelFactory.createPipelineChannel(importers.size(), records -> {
             Record lastHandledRecord = records.get(records.size() - 1);
             if (!(lastHandledRecord.getPosition() instanceof PlaceholderPosition)) {
                 progress.setPosition(lastHandledRecord.getPosition());
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
index f9221bf..5c4ecea 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
@@ -23,6 +23,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
+import org.apache.shardingsphere.data.pipeline.api.ingest.channel.Channel;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
@@ -31,8 +32,8 @@ import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourc
 import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobExecutionException;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
-import org.apache.shardingsphere.data.pipeline.core.ingest.channel.distribution.MemoryChannel;
 import org.apache.shardingsphere.data.pipeline.spi.importer.Importer;
+import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelFactory;
 import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.Dumper;
 import org.apache.shardingsphere.scaling.core.job.dumper.DumperFactory;
 import org.apache.shardingsphere.scaling.core.job.importer.ImporterFactory;
@@ -55,6 +56,8 @@ public final class InventoryTask extends AbstractLifecycleExecutor implements Pi
     
     private final ImporterConfiguration importerConfig;
     
+    private final PipelineChannelFactory pipelineChannelFactory;
+    
     private final ExecuteEngine importerExecuteEngine;
     
     private final PipelineDataSourceManager dataSourceManager;
@@ -63,9 +66,11 @@ public final class InventoryTask extends AbstractLifecycleExecutor implements Pi
     
     private volatile IngestPosition<?> position;
     
-    public InventoryTask(final InventoryDumperConfiguration inventoryDumperConfig, final ImporterConfiguration importerConfig, final ExecuteEngine importerExecuteEngine) {
+    public InventoryTask(final InventoryDumperConfiguration inventoryDumperConfig, final ImporterConfiguration importerConfig,
+                         final PipelineChannelFactory pipelineChannelFactory, final ExecuteEngine importerExecuteEngine) {
         this.inventoryDumperConfig = inventoryDumperConfig;
         this.importerConfig = importerConfig;
+        this.pipelineChannelFactory = pipelineChannelFactory;
         this.importerExecuteEngine = importerExecuteEngine;
         this.dataSourceManager = new PipelineDataSourceManager();
         taskId = generateTaskId(inventoryDumperConfig);
@@ -106,7 +111,7 @@ public final class InventoryTask extends AbstractLifecycleExecutor implements Pi
     }
     
     private void instanceChannel(final Importer importer) {
-        MemoryChannel channel = new MemoryChannel(inventoryDumperConfig.getBlockQueueSize(), records -> {
+        Channel channel = pipelineChannelFactory.createPipelineChannel(1, records -> {
             Optional<Record> record = records.stream().filter(each -> !(each.getPosition() instanceof PlaceholderPosition)).reduce((a, b) -> b);
             record.ifPresent(value -> position = value.getPosition());
         });
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
index 8bb0bdf..dd275ac 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
@@ -21,8 +21,10 @@ import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.detect.RuleAlteredJobAlmostCompletedParameter;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
+import org.apache.shardingsphere.data.pipeline.core.spi.ingest.channel.MemoryPipelineChannelFactory;
 import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCheckAlgorithm;
 import org.apache.shardingsphere.data.pipeline.spi.detect.JobCompletionDetectAlgorithm;
+import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelFactory;
 import org.apache.shardingsphere.data.pipeline.spi.lock.RowBasedJobLockAlgorithm;
 import org.apache.shardingsphere.data.pipeline.spi.lock.RuleBasedJobLockAlgorithm;
 import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
@@ -39,6 +41,8 @@ import org.apache.shardingsphere.infra.yaml.config.swapper.rulealtered.OnRuleAlt
 import org.apache.shardingsphere.mode.manager.ContextManager;
 import org.apache.shardingsphere.spi.ShardingSphereServiceLoader;
 
+import java.util.Properties;
+
 /**
  * Rule altered context.
  */
@@ -49,6 +53,7 @@ public final class RuleAlteredContext {
     
     static {
         ShardingSphereServiceLoader.register(JobRateLimitAlgorithm.class);
+        ShardingSphereServiceLoader.register(PipelineChannelFactory.class);
         ShardingSphereServiceLoader.register(JobCompletionDetectAlgorithm.class);
         ShardingSphereServiceLoader.register(RowBasedJobLockAlgorithm.class);
         ShardingSphereServiceLoader.register(DataConsistencyCheckAlgorithm.class);
@@ -65,6 +70,8 @@ public final class RuleAlteredContext {
     
     private final JobRateLimitAlgorithm outputRateLimitAlgorithm;
     
+    private final PipelineChannelFactory pipelineChannelFactory;
+    
     private final JobCompletionDetectAlgorithm<RuleAlteredJobAlmostCompletedParameter> completionDetectAlgorithm;
     
     private final RowBasedJobLockAlgorithm sourceWritingStopAlgorithm;
@@ -93,6 +100,11 @@ public final class RuleAlteredContext {
         }
         ShardingSphereAlgorithmConfiguration outputRateLimiter = outputConfig.getRateLimiter();
         outputRateLimitAlgorithm = null != outputRateLimiter ? ShardingSphereAlgorithmFactory.createAlgorithm(outputRateLimiter, JobRateLimitAlgorithm.class) : null;
+        ShardingSphereAlgorithmConfiguration streamChannel = onRuleAlteredActionConfig.getStreamChannel();
+        if (null == streamChannel) {
+            streamChannel = new ShardingSphereAlgorithmConfiguration(MemoryPipelineChannelFactory.TYPE, new Properties());
+        }
+        pipelineChannelFactory = ShardingSphereAlgorithmFactory.createAlgorithm(streamChannel, PipelineChannelFactory.class);
         ShardingSphereAlgorithmConfiguration completionDetector = onRuleAlteredActionConfig.getCompletionDetector();
         completionDetectAlgorithm = null != completionDetector ? ShardingSphereAlgorithmFactory.createAlgorithm(completionDetector, JobCompletionDetectAlgorithm.class) : null;
         sourceWritingStopAlgorithm = null;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
index 04c707b..480d6c5 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
@@ -32,6 +32,7 @@ import org.apache.shardingsphere.data.pipeline.core.prepare.InventoryTaskSplitte
 import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
 import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
 import org.apache.shardingsphere.data.pipeline.spi.check.datasource.DataSourceChecker;
+import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelFactory;
 import org.apache.shardingsphere.data.pipeline.spi.ingest.position.PositionInitializer;
 import org.apache.shardingsphere.data.pipeline.spi.rulealtered.DataSourcePreparer;
 import org.apache.shardingsphere.scaling.core.job.check.EnvironmentCheckerFactory;
@@ -110,20 +111,22 @@ public final class RuleAlteredJobPreparer {
     }
     
     private void initInventoryTasks(final RuleAlteredJobContext jobContext, final PipelineDataSourceManager dataSourceManager) {
+        PipelineChannelFactory pipelineChannelFactory = jobContext.getRuleAlteredContext().getPipelineChannelFactory();
         ExecuteEngine importerExecuteEngine = jobContext.getRuleAlteredContext().getImporterExecuteEngine();
         List<InventoryTask> allInventoryTasks = new LinkedList<>();
         for (TaskConfiguration each : jobContext.getTaskConfigs()) {
-            allInventoryTasks.addAll(inventoryTaskSplitter.splitInventoryData(jobContext, each, dataSourceManager, importerExecuteEngine));
+            allInventoryTasks.addAll(inventoryTaskSplitter.splitInventoryData(jobContext, each, dataSourceManager, pipelineChannelFactory, importerExecuteEngine));
         }
         jobContext.getInventoryTasks().addAll(allInventoryTasks);
     }
     
     private void initIncrementalTasks(final RuleAlteredJobContext jobContext, final PipelineDataSourceManager dataSourceManager) throws SQLException {
+        PipelineChannelFactory pipelineChannelFactory = jobContext.getRuleAlteredContext().getPipelineChannelFactory();
         ExecuteEngine incrementalDumperExecuteEngine = jobContext.getRuleAlteredContext().getIncrementalDumperExecuteEngine();
         for (TaskConfiguration each : jobContext.getTaskConfigs()) {
             each.getDumperConfig().setPosition(getIncrementalPosition(jobContext, each, dataSourceManager));
             jobContext.getIncrementalTasks().add(new IncrementalTask(each.getHandleConfig().getConcurrency(),
-                    each.getDumperConfig(), each.getImporterConfig(), incrementalDumperExecuteEngine));
+                    each.getDumperConfig(), each.getImporterConfig(), pipelineChannelFactory, incrementalDumperExecuteEngine));
         }
     }
     
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-scaling.yaml b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelFactory
similarity index 59%
copy from shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-scaling.yaml
copy to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelFactory
index bb8027d..7c0c3d6 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-scaling.yaml
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelFactory
@@ -15,29 +15,4 @@
 # limitations under the License.
 #
 
-scalingName: default_scaling
-scaling:
-  default_scaling:
-    blockQueueSize: 10000
-    input:
-      workerThread: 40
-      batchSize: 1000
-      rateLimiter:
-        type: QPS
-        props:
-          qps: 50
-    output:
-      workerThread: 40
-      batchSize: 1000
-      rateLimiter:
-        type: TPS
-        props:
-          tps: 2000
-    completionDetector:
-      type: IDLE
-      props:
-        incremental-task-idle-minute-threshold: 30
-    dataConsistencyChecker:
-      type: DATA_MATCH
-      props:
-        chunk-size: 1000
+org.apache.shardingsphere.data.pipeline.core.spi.ingest.channel.MemoryPipelineChannelFactory
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
index def627a..da8b6a1 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
@@ -199,9 +199,6 @@ public final class MySQLIncrementalDumper extends AbstractLifecycleExecutor impl
     }
     
     private void pushRecord(final Record record) {
-        try {
-            channel.pushRecord(record);
-        } catch (final InterruptedException ignored) {
-        }
+        channel.pushRecord(record);
     }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
index 504da0e..eb1d197 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
@@ -130,10 +130,7 @@ public final class OpenGaussWalDumper extends AbstractLifecycleExecutor implemen
     }
     
     private void pushRecord(final Record record) {
-        try {
-            channel.pushRecord(record);
-        } catch (final InterruptedException ignored) {
-        }
+        channel.pushRecord(record);
     }
 }
 
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java
index 20c046a..fcd6a4c 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java
@@ -100,10 +100,7 @@ public final class PostgreSQLWalDumper extends AbstractLifecycleExecutor impleme
     }
     
     private void pushRecord(final Record record) {
-        try {
-            channel.pushRecord(record);
-        } catch (final InterruptedException ignored) {
-        }
+        channel.pushRecord(record);
     }
 }
 
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/DumperConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/DumperConfiguration.java
index 8792f0d..2d6525d 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/DumperConfiguration.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/DumperConfiguration.java
@@ -37,8 +37,6 @@ public class DumperConfiguration {
     
     private PipelineDataSourceConfiguration dataSourceConfig;
     
-    private int blockQueueSize = 10000;
-    
     private IngestPosition<?> position;
     
     /**
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java
index 4bf773c..52716cc 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java
@@ -44,6 +44,5 @@ public final class InventoryDumperConfiguration extends DumperConfiguration {
         setDataSourceName(dumperConfig.getDataSourceName());
         setDataSourceConfig(dumperConfig.getDataSourceConfig());
         setTableNameMap(dumperConfig.getTableNameMap());
-        setBlockQueueSize(dumperConfig.getBlockQueueSize());
     }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/AckCallback.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/channel/AckCallback.java
similarity index 94%
copy from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/AckCallback.java
copy to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/channel/AckCallback.java
index 5a3bd0e..5446d34 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/AckCallback.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/channel/AckCallback.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.ingest.channel;
+package org.apache.shardingsphere.data.pipeline.api.ingest.channel;
 
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
 
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/channel/Channel.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/channel/Channel.java
index ccb1847..51cd0f0 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/channel/Channel.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/channel/Channel.java
@@ -24,24 +24,25 @@ import java.util.List;
 /**
  * Channel.
  */
+// TODO rename to PipelineChannel
 public interface Channel {
     
     /**
-     * push a {@code DataRecord} to channel.
+     * Push {@code DataRecord} into channel.
      *
      * @param dataRecord data
-     * @throws InterruptedException if thread interrupted
      */
-    void pushRecord(Record dataRecord) throws InterruptedException;
+    void pushRecord(Record dataRecord);
     
     /**
-     * fetch {@code Record} from channel, if the timeout also returns the record.
+     * Fetch {@code Record} list from channel.
+     * It might be blocked at most timeout seconds if available records count doesn't reach batch size.
      *
      * @param batchSize record batch size
-     * @param timeout timeout(seconds)
+     * @param timeoutSeconds timeout(seconds)
      * @return record
      */
-    List<Record> fetchRecords(int batchSize, int timeout);
+    List<Record> fetchRecords(int batchSize, int timeoutSeconds);
     
     /**
      * Ack the last batch.
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/AckCallback.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/channel/PipelineChannelFactory.java
similarity index 51%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/AckCallback.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/channel/PipelineChannelFactory.java
index 5a3bd0e..22277e8 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/AckCallback.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/channel/PipelineChannelFactory.java
@@ -15,21 +15,24 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.ingest.channel;
+package org.apache.shardingsphere.data.pipeline.spi.ingest.channel;
 
-import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
-
-import java.util.List;
+import org.apache.shardingsphere.data.pipeline.api.ingest.channel.AckCallback;
+import org.apache.shardingsphere.data.pipeline.api.ingest.channel.Channel;
+import org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithm;
+import org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmPostProcessor;
 
 /**
- * Record acknowledged callback.
+ * Pipeline channel factory, SPI.
  */
-public interface AckCallback {
+public interface PipelineChannelFactory extends ShardingSphereAlgorithm, ShardingSphereAlgorithmPostProcessor {
     
     /**
-     * Call after record acknowledged.
+     * Create pipeline channel.
      *
-     * @param records acknowledged record list
+     * @param outputConcurrency output concurrency
+     * @param ackCallback ack callback
+     * @return {@link Channel}
      */
-    void onAck(List<Record> records);
+    Channel createPipelineChannel(int outputConcurrency, AckCallback ackCallback);
 }
diff --git a/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/resources/conf/config-sharding.yaml b/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/resources/conf/config-sharding.yaml
index efdaa29..0ee1c67 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/resources/conf/config-sharding.yaml
+++ b/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/resources/conf/config-sharding.yaml
@@ -97,7 +97,6 @@
 #  scalingName: default_scaling
 #  scaling:
 #    default_scaling:
-#      blockQueueSize: 10000
 #      input:
 #        workerThread: 40
 #        batchSize: 1000
@@ -112,6 +111,10 @@
 #          type: TPS
 #          props:
 #            tps: 2000
+#      streamChannel:
+#        type: MEMORY
+#        props:
+#          block-queue-size: 10000
 #      completionDetector:
 #        type: IDLE
 #        props:
@@ -202,7 +205,6 @@
 #  scalingName: default_scaling
 #  scaling:
 #    default_scaling:
-#      blockQueueSize: 10000
 #      input:
 #        workerThread: 40
 #        batchSize: 1000
@@ -217,6 +219,10 @@
 #          type: TPS
 #          props:
 #            tps: 2000
+#      streamChannel:
+#        type: MEMORY
+#        props:
+#          block-queue-size: 10000
 #      completionDetector:
 #        type: IDLE
 #        props:
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
index 2c8c92c..78dfafc 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
@@ -129,12 +129,12 @@ public final class GovernanceRepositoryAPIImplTest {
         dumperConfig.setTableName("t_order");
         dumperConfig.setPrimaryKey("order_id");
         dumperConfig.setShardingItem(0);
-        return new InventoryTask(dumperConfig, taskConfig.getImporterConfig(), RuleAlteredContextUtil.getExecuteEngine());
+        return new InventoryTask(dumperConfig, taskConfig.getImporterConfig(), RuleAlteredContextUtil.getPipelineChannelFactory(), RuleAlteredContextUtil.getExecuteEngine());
     }
     
     private IncrementalTask mockIncrementalTask(final TaskConfiguration taskConfig) {
         DumperConfiguration dumperConfig = taskConfig.getDumperConfig();
         dumperConfig.setPosition(new PlaceholderPosition());
-        return new IncrementalTask(3, dumperConfig, taskConfig.getImporterConfig(), RuleAlteredContextUtil.getExecuteEngine());
+        return new IncrementalTask(3, dumperConfig, taskConfig.getImporterConfig(), RuleAlteredContextUtil.getPipelineChannelFactory(), RuleAlteredContextUtil.getExecuteEngine());
     }
 }
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/MemoryChannelTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/MemoryChannelTest.java
index 55318b7..96965a9 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/MemoryChannelTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/MemoryChannelTest.java
@@ -26,7 +26,7 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
-import org.apache.shardingsphere.data.pipeline.core.ingest.channel.AckCallback;
+import org.apache.shardingsphere.data.pipeline.api.ingest.channel.AckCallback;
 import org.junit.Test;
 
 import java.util.Arrays;
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitterTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
index 13ad21f..be7c39c 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
@@ -21,10 +21,12 @@ import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfigura
 import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.PrimaryKeyPosition;
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
+import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
 import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
 import org.apache.shardingsphere.data.pipeline.core.util.ResourceUtil;
 import org.apache.shardingsphere.data.pipeline.core.util.RuleAlteredContextUtil;
 import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
+import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelFactory;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -41,6 +43,10 @@ import static org.junit.Assert.assertThat;
 
 public final class InventoryTaskSplitterTest {
     
+    private static final ExecuteEngine EXECUTE_ENGINE = RuleAlteredContextUtil.getExecuteEngine();
+    
+    private static final PipelineChannelFactory PIPELINE_CHANNEL_FACTORY = RuleAlteredContextUtil.getPipelineChannelFactory();
+    
     private RuleAlteredJobContext jobContext;
     
     private TaskConfiguration taskConfig;
@@ -60,7 +66,7 @@ public final class InventoryTaskSplitterTest {
     public void assertSplitInventoryDataWithEmptyTable() throws SQLException {
         taskConfig.getHandleConfig().setShardingSize(10);
         initEmptyTablePrimaryEnvironment(taskConfig.getDumperConfig());
-        List<InventoryTask> actual = inventoryTaskSplitter.splitInventoryData(jobContext, taskConfig, dataSourceManager, RuleAlteredContextUtil.getExecuteEngine());
+        List<InventoryTask> actual = inventoryTaskSplitter.splitInventoryData(jobContext, taskConfig, dataSourceManager, PIPELINE_CHANNEL_FACTORY, EXECUTE_ENGINE);
         assertNotNull(actual);
         assertThat(actual.size(), is(1));
         assertThat(((PrimaryKeyPosition) actual.get(0).getProgress().getPosition()).getBeginValue(), is(0L));
@@ -71,7 +77,7 @@ public final class InventoryTaskSplitterTest {
     public void assertSplitInventoryDataWithIntPrimary() throws SQLException {
         taskConfig.getHandleConfig().setShardingSize(10);
         initIntPrimaryEnvironment(taskConfig.getDumperConfig());
-        List<InventoryTask> actual = inventoryTaskSplitter.splitInventoryData(jobContext, taskConfig, dataSourceManager, RuleAlteredContextUtil.getExecuteEngine());
+        List<InventoryTask> actual = inventoryTaskSplitter.splitInventoryData(jobContext, taskConfig, dataSourceManager, PIPELINE_CHANNEL_FACTORY, EXECUTE_ENGINE);
         assertNotNull(actual);
         assertThat(actual.size(), is(10));
         assertThat(((PrimaryKeyPosition) actual.get(9).getProgress().getPosition()).getBeginValue(), is(91L));
@@ -81,7 +87,7 @@ public final class InventoryTaskSplitterTest {
     @Test
     public void assertSplitInventoryDataWithCharPrimary() throws SQLException {
         initCharPrimaryEnvironment(taskConfig.getDumperConfig());
-        List<InventoryTask> actual = inventoryTaskSplitter.splitInventoryData(jobContext, taskConfig, dataSourceManager, RuleAlteredContextUtil.getExecuteEngine());
+        List<InventoryTask> actual = inventoryTaskSplitter.splitInventoryData(jobContext, taskConfig, dataSourceManager, PIPELINE_CHANNEL_FACTORY, EXECUTE_ENGINE);
         assertNotNull(actual);
         assertThat(actual.size(), is(1));
     }
@@ -89,7 +95,7 @@ public final class InventoryTaskSplitterTest {
     @Test
     public void assertSplitInventoryDataWithUnionPrimary() throws SQLException {
         initUnionPrimaryEnvironment(taskConfig.getDumperConfig());
-        List<InventoryTask> actual = inventoryTaskSplitter.splitInventoryData(jobContext, taskConfig, dataSourceManager, RuleAlteredContextUtil.getExecuteEngine());
+        List<InventoryTask> actual = inventoryTaskSplitter.splitInventoryData(jobContext, taskConfig, dataSourceManager, PIPELINE_CHANNEL_FACTORY, EXECUTE_ENGINE);
         assertNotNull(actual);
         assertThat(actual.size(), is(1));
     }
@@ -97,7 +103,7 @@ public final class InventoryTaskSplitterTest {
     @Test
     public void assertSplitInventoryDataWithoutPrimary() throws SQLException {
         initNoPrimaryEnvironment(taskConfig.getDumperConfig());
-        List<InventoryTask> actual = inventoryTaskSplitter.splitInventoryData(jobContext, taskConfig, dataSourceManager, RuleAlteredContextUtil.getExecuteEngine());
+        List<InventoryTask> actual = inventoryTaskSplitter.splitInventoryData(jobContext, taskConfig, dataSourceManager, PIPELINE_CHANNEL_FACTORY, EXECUTE_ENGINE);
         assertNotNull(actual);
         assertThat(actual.size(), is(1));
     }
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
index 96b06c4..c684563 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
@@ -44,7 +44,8 @@ public final class IncrementalTaskTest {
     public void setUp() {
         TaskConfiguration taskConfig = new RuleAlteredJobContext(ResourceUtil.mockJobConfig()).getTaskConfigs().iterator().next();
         taskConfig.getDumperConfig().setPosition(new PlaceholderPosition());
-        incrementalTask = new IncrementalTask(3, taskConfig.getDumperConfig(), taskConfig.getImporterConfig(), RuleAlteredContextUtil.getExecuteEngine());
+        incrementalTask = new IncrementalTask(3, taskConfig.getDumperConfig(), taskConfig.getImporterConfig(),
+                RuleAlteredContextUtil.getPipelineChannelFactory(), RuleAlteredContextUtil.getExecuteEngine());
     }
     
     @Test
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
index b27029b..3072163 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
@@ -57,7 +57,8 @@ public final class InventoryTaskTest {
             position = new PrimaryKeyPosition(0, 1000);
         }
         inventoryDumperConfig.setPosition(position);
-        try (InventoryTask inventoryTask = new InventoryTask(inventoryDumperConfig, taskConfig.getImporterConfig(), RuleAlteredContextUtil.getExecuteEngine())) {
+        try (InventoryTask inventoryTask = new InventoryTask(inventoryDumperConfig, taskConfig.getImporterConfig(),
+                RuleAlteredContextUtil.getPipelineChannelFactory(), RuleAlteredContextUtil.getExecuteEngine())) {
             inventoryTask.start();
         }
     }
@@ -72,7 +73,8 @@ public final class InventoryTaskTest {
             position = new PrimaryKeyPosition(0, 1000);
         }
         inventoryDumperConfig.setPosition(position);
-        try (InventoryTask inventoryTask = new InventoryTask(inventoryDumperConfig, taskConfig.getImporterConfig(), RuleAlteredContextUtil.getExecuteEngine())) {
+        try (InventoryTask inventoryTask = new InventoryTask(inventoryDumperConfig, taskConfig.getImporterConfig(),
+                RuleAlteredContextUtil.getPipelineChannelFactory(), RuleAlteredContextUtil.getExecuteEngine())) {
             inventoryTask.start();
             assertFalse(inventoryTask.getProgress().getPosition() instanceof FinishedPosition);
         }
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/RuleAlteredContextUtil.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/RuleAlteredContextUtil.java
index 0fc684c..cf81516 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/RuleAlteredContextUtil.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/RuleAlteredContextUtil.java
@@ -23,7 +23,9 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.Shardi
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
 import org.apache.shardingsphere.data.pipeline.core.fixture.EmbedTestingServer;
+import org.apache.shardingsphere.data.pipeline.core.spi.ingest.channel.MemoryPipelineChannelFactory;
 import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredContext;
+import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelFactory;
 import org.apache.shardingsphere.driver.jdbc.core.datasource.ShardingSphereDataSource;
 import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
 import org.apache.shardingsphere.mode.manager.ContextManager;
@@ -36,6 +38,8 @@ public final class RuleAlteredContextUtil {
     
     private static final ExecuteEngine EXECUTE_ENGINE = ExecuteEngine.newCachedThreadInstance();
     
+    private static final PipelineChannelFactory PIPELINE_CHANNEL_FACTORY = new MemoryPipelineChannelFactory();
+    
     /**
      * Mock mode configuration.
      */
@@ -74,4 +78,13 @@ public final class RuleAlteredContextUtil {
     public static ExecuteEngine getExecuteEngine() {
         return EXECUTE_ENGINE;
     }
+    
+    /**
+     * Get pipeline channel factory.
+     *
+     * @return channel factory
+     */
+    public static PipelineChannelFactory getPipelineChannelFactory() {
+        return PIPELINE_CHANNEL_FACTORY;
+    }
 }
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/config_sharding_sphere_jdbc_source.yaml b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/config_sharding_sphere_jdbc_source.yaml
index e6e492a..55d7c32 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/config_sharding_sphere_jdbc_source.yaml
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/config_sharding_sphere_jdbc_source.yaml
@@ -51,7 +51,6 @@ rules:
   scalingName: default_scaling
   scaling:
     default_scaling:
-      blockQueueSize: 10000
       input:
         workerThread: 40
         batchSize: 1000
@@ -66,6 +65,10 @@ rules:
           type: TPS
           props:
             tps: 2000
+      streamChannel:
+        type: MEMORY
+        props:
+          block-queue-size: 10000
       completionDetector:
         type: FIXTURE
       dataConsistencyChecker:
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/config_sharding_sphere_jdbc_target.yaml b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/config_sharding_sphere_jdbc_target.yaml
index b569b7d..02c7e6c 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/config_sharding_sphere_jdbc_target.yaml
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/config_sharding_sphere_jdbc_target.yaml
@@ -54,7 +54,6 @@ rules:
   scalingName: default_scaling
   scaling:
     default_scaling:
-      blockQueueSize: 10000
       input:
         workerThread: 40
         batchSize: 1000
@@ -69,6 +68,10 @@ rules:
           type: TPS
           props:
             tps: 2000
+      streamChannel:
+        type: MEMORY
+        props:
+          block-queue-size: 10000
       completionDetector:
         type: FIXTURE
       dataConsistencyChecker: