You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/08/18 11:22:31 UTC

[shardingsphere] branch master updated: Rename PipelineProcessConfiguration related input and output to read and write (#20261)

This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new de9ad7e8f07 Rename PipelineProcessConfiguration related input and output to read and write (#20261)
de9ad7e8f07 is described below

commit de9ad7e8f0757416c150c37b1f3ede31b4183ba9
Author: Hongsheng Zhong <sa...@126.com>
AuthorDate: Thu Aug 18 19:22:20 2022 +0800

    Rename PipelineProcessConfiguration related input and output to read and write (#20261)
    
    * Rename PipelineProcessConfiguration related input and output to read and write
    
    * Continue rename
---
 ...hardingRuleAlteredJobConfigurationPreparer.java |  2 +-
 .../ShardingScalingRuleStatementConverter.java     | 16 ++++-----
 .../ShardingScalingRulesQueryResultSetTest.java    |  8 ++---
 .../pipeline/PipelineProcessConfiguration.java     |  4 +--
 ...uration.java => PipelineReadConfiguration.java} |  4 +--
 ...ration.java => PipelineWriteConfiguration.java} |  4 +--
 .../OnRuleAlteredActionConfiguration.java          |  8 ++---
 .../pipeline/YamlPipelineProcessConfiguration.java |  4 +--
 ...ion.java => YamlPipelineReadConfiguration.java} | 10 +++---
 ...on.java => YamlPipelineWriteConfiguration.java} | 10 +++---
 .../rule/YamlOnRuleAlteredActionConfiguration.java |  8 ++---
 .../YamlPipelineProcessConfigurationSwapper.java   | 12 +++----
 ...a => YamlPipelineReadConfigurationSwapper.java} | 16 ++++-----
 ... => YamlPipelineWriteConfigurationSwapper.java} | 16 ++++-----
 ...amlOnRuleAlteredActionConfigurationSwapper.java | 16 ++++-----
 ...amlPipelineProcessConfigurationSwapperTest.java | 12 +++----
 ...nRuleAlteredActionConfigurationSwapperTest.java |  8 ++---
 .../api/context/PipelineProcessContext.java        |  6 ++--
 .../check/consistency/DataConsistencyChecker.java  |  6 ++--
 .../context/AbstractPipelineProcessContext.java    | 40 +++++++++++-----------
 .../core/prepare/InventoryTaskSplitter.java        | 10 +++---
 21 files changed, 110 insertions(+), 110 deletions(-)

diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingRuleAlteredJobConfigurationPreparer.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingRuleAlteredJobConfigurationPreparer.java
index 36d60d5262b..e611ae7c40f 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingRuleAlteredJobConfigurationPreparer.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingRuleAlteredJobConfigurationPreparer.java
@@ -239,7 +239,7 @@ public final class ShardingRuleAlteredJobConfigurationPreparer implements RuleAl
     private static ImporterConfiguration createImporterConfiguration(final MigrationJobConfiguration jobConfig, final PipelineProcessConfiguration pipelineProcessConfig,
                                                                      final Map<LogicTableName, Set<String>> shardingColumnsMap, final TableNameSchemaNameMapping tableNameSchemaNameMapping) {
         PipelineDataSourceConfiguration dataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getTarget().getType(), jobConfig.getTarget().getParameter());
-        int batchSize = pipelineProcessConfig.getOutput().getBatchSize();
+        int batchSize = pipelineProcessConfig.getWrite().getBatchSize();
         int retryTimes = jobConfig.getRetryTimes();
         int concurrency = jobConfig.getConcurrency();
         return new ImporterConfiguration(dataSourceConfig, unmodifiable(shardingColumnsMap), tableNameSchemaNameMapping, batchSize, retryTimes, concurrency);
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/converter/ShardingScalingRuleStatementConverter.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/converter/ShardingScalingRuleStatementConverter.java
index 09ecd1520f2..4dcf5416bfd 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/converter/ShardingScalingRuleStatementConverter.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/converter/ShardingScalingRuleStatementConverter.java
@@ -21,8 +21,8 @@ import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 import org.apache.shardingsphere.distsql.parser.segment.AlgorithmSegment;
 import org.apache.shardingsphere.infra.config.algorithm.AlgorithmConfiguration;
-import org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineInputConfiguration;
-import org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineOutputConfiguration;
+import org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineReadConfiguration;
+import org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineWriteConfiguration;
 import org.apache.shardingsphere.infra.config.rule.rulealtered.OnRuleAlteredActionConfiguration;
 import org.apache.shardingsphere.migration.distsql.statement.segment.InputOrOutputSegment;
 import org.apache.shardingsphere.migration.distsql.statement.segment.ShardingScalingRuleConfigurationSegment;
@@ -40,26 +40,26 @@ public final class ShardingScalingRuleStatementConverter {
      * @return on rule altered action configuration
      */
     public static OnRuleAlteredActionConfiguration convert(final ShardingScalingRuleConfigurationSegment segment) {
-        PipelineInputConfiguration inputConfig = convertToInputConfiguration(segment.getInputSegment());
-        PipelineOutputConfiguration outputConfig = convertToOutputConfiguration(segment.getOutputSegment());
+        PipelineReadConfiguration inputConfig = convertToInputConfiguration(segment.getInputSegment());
+        PipelineWriteConfiguration outputConfig = convertToOutputConfiguration(segment.getOutputSegment());
         AlgorithmConfiguration streamChannel = convertToAlgorithm(segment.getStreamChannel());
         AlgorithmConfiguration completionDetector = convertToAlgorithm(segment.getCompletionDetector());
         AlgorithmConfiguration dataConsistencyChecker = convertToAlgorithm(segment.getDataConsistencyCalculator());
         return new OnRuleAlteredActionConfiguration(inputConfig, outputConfig, streamChannel, completionDetector, dataConsistencyChecker);
     }
     
-    private static PipelineInputConfiguration convertToInputConfiguration(final InputOrOutputSegment inputSegment) {
+    private static PipelineReadConfiguration convertToInputConfiguration(final InputOrOutputSegment inputSegment) {
         if (null == inputSegment) {
             return null;
         }
-        return new PipelineInputConfiguration(inputSegment.getWorkerThread(), inputSegment.getBatchSize(), inputSegment.getShardingSize(), convertToAlgorithm(inputSegment.getRateLimiter()));
+        return new PipelineReadConfiguration(inputSegment.getWorkerThread(), inputSegment.getBatchSize(), inputSegment.getShardingSize(), convertToAlgorithm(inputSegment.getRateLimiter()));
     }
     
-    private static PipelineOutputConfiguration convertToOutputConfiguration(final InputOrOutputSegment outputSegment) {
+    private static PipelineWriteConfiguration convertToOutputConfiguration(final InputOrOutputSegment outputSegment) {
         if (null == outputSegment) {
             return null;
         }
-        return new PipelineOutputConfiguration(outputSegment.getWorkerThread(), outputSegment.getBatchSize(), convertToAlgorithm(outputSegment.getRateLimiter()));
+        return new PipelineWriteConfiguration(outputSegment.getWorkerThread(), outputSegment.getBatchSize(), convertToAlgorithm(outputSegment.getRateLimiter()));
     }
     
     private static AlgorithmConfiguration convertToAlgorithm(final AlgorithmSegment segment) {
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/test/java/org/apache/shardingsphere/scaling/distsql/handler/query/ShardingScalingRulesQueryResultSetTest.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/test/java/org/apache/shardingsphere/scaling/distsql/handler/query/ShardingScalingRulesQueryResultSetTest.java
index 8b37b741c0f..5ba69104f48 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/test/java/org/apache/shardingsphere/scaling/distsql/handler/query/ShardingScalingRulesQueryResultSetTest.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/test/java/org/apache/shardingsphere/scaling/distsql/handler/query/ShardingScalingRulesQueryResultSetTest.java
@@ -19,8 +19,8 @@ package org.apache.shardingsphere.scaling.distsql.handler.query;
 
 import org.apache.shardingsphere.infra.config.algorithm.AlgorithmConfiguration;
 import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
-import org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineInputConfiguration;
-import org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineOutputConfiguration;
+import org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineReadConfiguration;
+import org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineWriteConfiguration;
 import org.apache.shardingsphere.infra.config.rule.rulealtered.OnRuleAlteredActionConfiguration;
 import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import org.apache.shardingsphere.migration.distsql.handler.query.ShardingScalingRulesQueryResultSet;
@@ -82,8 +82,8 @@ public final class ShardingScalingRulesQueryResultSetTest {
     }
     
     private OnRuleAlteredActionConfiguration buildCompleteConfiguration() {
-        PipelineInputConfiguration inputConfig = new PipelineInputConfiguration(10, 100, 10, new AlgorithmConfiguration("QPS", createProperties("qps", "50")));
-        PipelineOutputConfiguration outputConfig = new PipelineOutputConfiguration(10, 100, new AlgorithmConfiguration("TPS", createProperties("tps", "2000")));
+        PipelineReadConfiguration inputConfig = new PipelineReadConfiguration(10, 100, 10, new AlgorithmConfiguration("QPS", createProperties("qps", "50")));
+        PipelineWriteConfiguration outputConfig = new PipelineWriteConfiguration(10, 100, new AlgorithmConfiguration("TPS", createProperties("tps", "2000")));
         AlgorithmConfiguration streamChannel = new AlgorithmConfiguration("MEMORY", createProperties("block-queue-size", "10000"));
         AlgorithmConfiguration completionDetector = new AlgorithmConfiguration("IDLE", createProperties("incremental-task-idle-seconds-threshold", "1800"));
         AlgorithmConfiguration dataConsistencyChecker = new AlgorithmConfiguration("DATA_MATCH", createProperties("chunk-size", "1000"));
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/data/pipeline/PipelineProcessConfiguration.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/data/pipeline/PipelineProcessConfiguration.java
index 928960d3026..a9932adafda 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/data/pipeline/PipelineProcessConfiguration.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/data/pipeline/PipelineProcessConfiguration.java
@@ -30,9 +30,9 @@ import org.apache.shardingsphere.infra.config.algorithm.AlgorithmConfiguration;
 @ToString
 public final class PipelineProcessConfiguration {
     
-    private final PipelineInputConfiguration input;
+    private final PipelineReadConfiguration read;
     
-    private final PipelineOutputConfiguration output;
+    private final PipelineWriteConfiguration write;
     
     private final AlgorithmConfiguration streamChannel;
 }
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/data/pipeline/PipelineInputConfiguration.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/data/pipeline/PipelineReadConfiguration.java
similarity index 94%
rename from shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/data/pipeline/PipelineInputConfiguration.java
rename to shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/data/pipeline/PipelineReadConfiguration.java
index d723b011eec..207af4c44e6 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/data/pipeline/PipelineInputConfiguration.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/data/pipeline/PipelineReadConfiguration.java
@@ -23,12 +23,12 @@ import lombok.ToString;
 import org.apache.shardingsphere.infra.config.algorithm.AlgorithmConfiguration;
 
 /**
- * Pipeline input configuration.
+ * Pipeline read configuration.
  */
 @RequiredArgsConstructor
 @Getter
 @ToString
-public final class PipelineInputConfiguration {
+public final class PipelineReadConfiguration {
     
     private final Integer workerThread;
     
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/data/pipeline/PipelineOutputConfiguration.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/data/pipeline/PipelineWriteConfiguration.java
similarity index 93%
rename from shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/data/pipeline/PipelineOutputConfiguration.java
rename to shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/data/pipeline/PipelineWriteConfiguration.java
index af617327bc1..e9ff81a53d0 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/data/pipeline/PipelineOutputConfiguration.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/data/pipeline/PipelineWriteConfiguration.java
@@ -23,12 +23,12 @@ import lombok.ToString;
 import org.apache.shardingsphere.infra.config.algorithm.AlgorithmConfiguration;
 
 /**
- * Pipeline output configuration.
+ * Pipeline write configuration.
  */
 @RequiredArgsConstructor
 @Getter
 @ToString
-public final class PipelineOutputConfiguration {
+public final class PipelineWriteConfiguration {
     
     private final Integer workerThread;
     
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/rulealtered/OnRuleAlteredActionConfiguration.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/rulealtered/OnRuleAlteredActionConfiguration.java
index bcc6dd0451a..a7f77e8de44 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/rulealtered/OnRuleAlteredActionConfiguration.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/rulealtered/OnRuleAlteredActionConfiguration.java
@@ -21,8 +21,8 @@ import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.ToString;
 import org.apache.shardingsphere.infra.config.algorithm.AlgorithmConfiguration;
-import org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineInputConfiguration;
-import org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineOutputConfiguration;
+import org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineReadConfiguration;
+import org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineWriteConfiguration;
 
 /**
  * On rule altered action configuration.
@@ -32,9 +32,9 @@ import org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineOutputC
 @ToString
 public final class OnRuleAlteredActionConfiguration {
     
-    private final PipelineInputConfiguration input;
+    private final PipelineReadConfiguration input;
     
-    private final PipelineOutputConfiguration output;
+    private final PipelineWriteConfiguration output;
     
     private final AlgorithmConfiguration streamChannel;
     
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineProcessConfiguration.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineProcessConfiguration.java
index 71bb47943e6..3cd18b1864e 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineProcessConfiguration.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineProcessConfiguration.java
@@ -31,9 +31,9 @@ import org.apache.shardingsphere.infra.yaml.config.pojo.algorithm.YamlAlgorithmC
 @ToString
 public final class YamlPipelineProcessConfiguration implements YamlConfiguration {
     
-    private YamlPipelineInputConfiguration input;
+    private YamlPipelineReadConfiguration read;
     
-    private YamlPipelineOutputConfiguration output;
+    private YamlPipelineWriteConfiguration write;
     
     private YamlAlgorithmConfiguration streamChannel;
 }
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineInputConfiguration.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineReadConfiguration.java
similarity index 87%
rename from shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineInputConfiguration.java
rename to shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineReadConfiguration.java
index 3c7ffd0bbc2..4e13aabca34 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineInputConfiguration.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineReadConfiguration.java
@@ -22,10 +22,10 @@ import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
 import org.apache.shardingsphere.infra.yaml.config.pojo.algorithm.YamlAlgorithmConfiguration;
 
 /**
- * YAML pipeline input configuration.
+ * YAML pipeline read configuration.
  */
 @Data
-public final class YamlPipelineInputConfiguration implements YamlConfiguration {
+public final class YamlPipelineReadConfiguration implements YamlConfiguration {
     
     private static final Integer DEFAULT_WORKER_THREAD = 40;
     
@@ -44,10 +44,10 @@ public final class YamlPipelineInputConfiguration implements YamlConfiguration {
     /**
      * Build with default value.
      *
-     * @return input configuration
+     * @return read configuration
      */
-    public static YamlPipelineInputConfiguration buildWithDefaultValue() {
-        return new YamlPipelineInputConfiguration();
+    public static YamlPipelineReadConfiguration buildWithDefaultValue() {
+        return new YamlPipelineReadConfiguration();
     }
     
     /**
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineOutputConfiguration.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineWriteConfiguration.java
similarity index 85%
rename from shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineOutputConfiguration.java
rename to shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineWriteConfiguration.java
index 578357cd1d0..fb5460ae6b6 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineOutputConfiguration.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineWriteConfiguration.java
@@ -22,10 +22,10 @@ import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
 import org.apache.shardingsphere.infra.yaml.config.pojo.algorithm.YamlAlgorithmConfiguration;
 
 /**
- * YAML pipeline output configuration.
+ * YAML pipeline write configuration.
  */
 @Data
-public final class YamlPipelineOutputConfiguration implements YamlConfiguration {
+public final class YamlPipelineWriteConfiguration implements YamlConfiguration {
     
     private static final Integer DEFAULT_WORKER_THREAD = 40;
     
@@ -40,10 +40,10 @@ public final class YamlPipelineOutputConfiguration implements YamlConfiguration
     /**
      * Build with default value.
      *
-     * @return output configuration
+     * @return write configuration
      */
-    public static YamlPipelineOutputConfiguration buildWithDefaultValue() {
-        return new YamlPipelineOutputConfiguration();
+    public static YamlPipelineWriteConfiguration buildWithDefaultValue() {
+        return new YamlPipelineWriteConfiguration();
     }
     
     /**
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/rule/YamlOnRuleAlteredActionConfiguration.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/rule/YamlOnRuleAlteredActionConfiguration.java
index 2c742bc0d60..93cda75fdca 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/rule/YamlOnRuleAlteredActionConfiguration.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/rule/YamlOnRuleAlteredActionConfiguration.java
@@ -22,8 +22,8 @@ import lombok.Setter;
 import lombok.ToString;
 import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
 import org.apache.shardingsphere.infra.yaml.config.pojo.algorithm.YamlAlgorithmConfiguration;
-import org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineInputConfiguration;
-import org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineOutputConfiguration;
+import org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineReadConfiguration;
+import org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineWriteConfiguration;
 
 /**
  * YAML on rule altered action configuration.
@@ -33,9 +33,9 @@ import org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipeli
 @ToString
 public final class YamlOnRuleAlteredActionConfiguration implements YamlConfiguration {
     
-    private YamlPipelineInputConfiguration input;
+    private YamlPipelineReadConfiguration input;
     
-    private YamlPipelineOutputConfiguration output;
+    private YamlPipelineWriteConfiguration output;
     
     private YamlAlgorithmConfiguration streamChannel;
     
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineProcessConfigurationSwapper.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineProcessConfigurationSwapper.java
index 16154b6f2fc..7c1f7cf4950 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineProcessConfigurationSwapper.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineProcessConfigurationSwapper.java
@@ -29,9 +29,9 @@ public final class YamlPipelineProcessConfigurationSwapper implements YamlConfig
     
     private static final YamlAlgorithmConfigurationSwapper ALGORITHM_CONFIG_SWAPPER = new YamlAlgorithmConfigurationSwapper();
     
-    private static final YamlPipelineInputConfigurationSwapper INPUT_CONFIG_SWAPPER = new YamlPipelineInputConfigurationSwapper();
+    private static final YamlPipelineReadConfigurationSwapper READ_CONFIG_SWAPPER = new YamlPipelineReadConfigurationSwapper();
     
-    private static final YamlPipelineOutputConfigurationSwapper OUTPUT_CONFIG_SWAPPER = new YamlPipelineOutputConfigurationSwapper();
+    private static final YamlPipelineWriteConfigurationSwapper WRITE_CONFIG_SWAPPER = new YamlPipelineWriteConfigurationSwapper();
     
     @Override
     public YamlPipelineProcessConfiguration swapToYamlConfiguration(final PipelineProcessConfiguration data) {
@@ -39,8 +39,8 @@ public final class YamlPipelineProcessConfigurationSwapper implements YamlConfig
             return null;
         }
         YamlPipelineProcessConfiguration result = new YamlPipelineProcessConfiguration();
-        result.setInput(INPUT_CONFIG_SWAPPER.swapToYamlConfiguration(data.getInput()));
-        result.setOutput(OUTPUT_CONFIG_SWAPPER.swapToYamlConfiguration(data.getOutput()));
+        result.setRead(READ_CONFIG_SWAPPER.swapToYamlConfiguration(data.getRead()));
+        result.setWrite(WRITE_CONFIG_SWAPPER.swapToYamlConfiguration(data.getWrite()));
         result.setStreamChannel(ALGORITHM_CONFIG_SWAPPER.swapToYamlConfiguration(data.getStreamChannel()));
         return result;
     }
@@ -51,8 +51,8 @@ public final class YamlPipelineProcessConfigurationSwapper implements YamlConfig
             return null;
         }
         return new PipelineProcessConfiguration(
-                INPUT_CONFIG_SWAPPER.swapToObject(yamlConfig.getInput()),
-                OUTPUT_CONFIG_SWAPPER.swapToObject(yamlConfig.getOutput()),
+                READ_CONFIG_SWAPPER.swapToObject(yamlConfig.getRead()),
+                WRITE_CONFIG_SWAPPER.swapToObject(yamlConfig.getWrite()),
                 ALGORITHM_CONFIG_SWAPPER.swapToObject(yamlConfig.getStreamChannel()));
     }
 }
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineInputConfigurationSwapper.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineReadConfigurationSwapper.java
similarity index 73%
rename from shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineInputConfigurationSwapper.java
rename to shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineReadConfigurationSwapper.java
index ba705d67a15..28e60dc3eee 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineInputConfigurationSwapper.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineReadConfigurationSwapper.java
@@ -18,25 +18,25 @@
 package org.apache.shardingsphere.infra.yaml.config.swapper.rule.data.pipeline;
 
 import lombok.Data;
-import org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineInputConfiguration;
-import org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineInputConfiguration;
+import org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineReadConfiguration;
+import org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineReadConfiguration;
 import org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
 import org.apache.shardingsphere.infra.yaml.config.swapper.algorithm.YamlAlgorithmConfigurationSwapper;
 
 /**
- * YAML pipeline input configuration swapper.
+ * YAML pipeline read configuration swapper.
  */
 @Data
-public final class YamlPipelineInputConfigurationSwapper implements YamlConfigurationSwapper<YamlPipelineInputConfiguration, PipelineInputConfiguration> {
+public final class YamlPipelineReadConfigurationSwapper implements YamlConfigurationSwapper<YamlPipelineReadConfiguration, PipelineReadConfiguration> {
     
     private static final YamlAlgorithmConfigurationSwapper ALGORITHM_CONFIG_SWAPPER = new YamlAlgorithmConfigurationSwapper();
     
     @Override
-    public YamlPipelineInputConfiguration swapToYamlConfiguration(final PipelineInputConfiguration data) {
+    public YamlPipelineReadConfiguration swapToYamlConfiguration(final PipelineReadConfiguration data) {
         if (null == data) {
             return null;
         }
-        YamlPipelineInputConfiguration result = new YamlPipelineInputConfiguration();
+        YamlPipelineReadConfiguration result = new YamlPipelineReadConfiguration();
         result.setWorkerThread(data.getWorkerThread());
         result.setBatchSize(data.getBatchSize());
         result.setShardingSize(data.getShardingSize());
@@ -45,10 +45,10 @@ public final class YamlPipelineInputConfigurationSwapper implements YamlConfigur
     }
     
     @Override
-    public PipelineInputConfiguration swapToObject(final YamlPipelineInputConfiguration yamlConfig) {
+    public PipelineReadConfiguration swapToObject(final YamlPipelineReadConfiguration yamlConfig) {
         return null == yamlConfig
                 ? null
-                : new PipelineInputConfiguration(yamlConfig.getWorkerThread(), yamlConfig.getBatchSize(), yamlConfig.getShardingSize(),
+                : new PipelineReadConfiguration(yamlConfig.getWorkerThread(), yamlConfig.getBatchSize(), yamlConfig.getShardingSize(),
                         ALGORITHM_CONFIG_SWAPPER.swapToObject(yamlConfig.getRateLimiter()));
     }
 }
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineOutputConfigurationSwapper.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineWriteConfigurationSwapper.java
similarity index 70%
rename from shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineOutputConfigurationSwapper.java
rename to shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineWriteConfigurationSwapper.java
index de24408ea96..a86c8b4e4f0 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineOutputConfigurationSwapper.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineWriteConfigurationSwapper.java
@@ -18,25 +18,25 @@
 package org.apache.shardingsphere.infra.yaml.config.swapper.rule.data.pipeline;
 
 import lombok.Data;
-import org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineOutputConfiguration;
-import org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineOutputConfiguration;
+import org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineWriteConfiguration;
+import org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineWriteConfiguration;
 import org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
 import org.apache.shardingsphere.infra.yaml.config.swapper.algorithm.YamlAlgorithmConfigurationSwapper;
 
 /**
- * YAML pipeline output configuration swapper.
+ * YAML pipeline write configuration swapper.
  */
 @Data
-public final class YamlPipelineOutputConfigurationSwapper implements YamlConfigurationSwapper<YamlPipelineOutputConfiguration, PipelineOutputConfiguration> {
+public final class YamlPipelineWriteConfigurationSwapper implements YamlConfigurationSwapper<YamlPipelineWriteConfiguration, PipelineWriteConfiguration> {
     
     private static final YamlAlgorithmConfigurationSwapper ALGORITHM_CONFIG_SWAPPER = new YamlAlgorithmConfigurationSwapper();
     
     @Override
-    public YamlPipelineOutputConfiguration swapToYamlConfiguration(final PipelineOutputConfiguration data) {
+    public YamlPipelineWriteConfiguration swapToYamlConfiguration(final PipelineWriteConfiguration data) {
         if (null == data) {
             return null;
         }
-        YamlPipelineOutputConfiguration result = new YamlPipelineOutputConfiguration();
+        YamlPipelineWriteConfiguration result = new YamlPipelineWriteConfiguration();
         result.setWorkerThread(data.getWorkerThread());
         result.setBatchSize(data.getBatchSize());
         result.setRateLimiter(ALGORITHM_CONFIG_SWAPPER.swapToYamlConfiguration(data.getRateLimiter()));
@@ -44,9 +44,9 @@ public final class YamlPipelineOutputConfigurationSwapper implements YamlConfigu
     }
     
     @Override
-    public PipelineOutputConfiguration swapToObject(final YamlPipelineOutputConfiguration yamlConfig) {
+    public PipelineWriteConfiguration swapToObject(final YamlPipelineWriteConfiguration yamlConfig) {
         return null == yamlConfig
                 ? null
-                : new PipelineOutputConfiguration(yamlConfig.getWorkerThread(), yamlConfig.getBatchSize(), ALGORITHM_CONFIG_SWAPPER.swapToObject(yamlConfig.getRateLimiter()));
+                : new PipelineWriteConfiguration(yamlConfig.getWorkerThread(), yamlConfig.getBatchSize(), ALGORITHM_CONFIG_SWAPPER.swapToObject(yamlConfig.getRateLimiter()));
     }
 }
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/rulealtered/YamlOnRuleAlteredActionConfigurationSwapper.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/rulealtered/YamlOnRuleAlteredActionConfigurationSwapper.java
index f5deaeaba24..99ccbade6fa 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/rulealtered/YamlOnRuleAlteredActionConfigurationSwapper.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/rulealtered/YamlOnRuleAlteredActionConfigurationSwapper.java
@@ -21,8 +21,8 @@ import org.apache.shardingsphere.infra.config.rule.rulealtered.OnRuleAlteredActi
 import org.apache.shardingsphere.infra.yaml.config.pojo.rule.YamlOnRuleAlteredActionConfiguration;
 import org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
 import org.apache.shardingsphere.infra.yaml.config.swapper.algorithm.YamlAlgorithmConfigurationSwapper;
-import org.apache.shardingsphere.infra.yaml.config.swapper.rule.data.pipeline.YamlPipelineOutputConfigurationSwapper;
-import org.apache.shardingsphere.infra.yaml.config.swapper.rule.data.pipeline.YamlPipelineInputConfigurationSwapper;
+import org.apache.shardingsphere.infra.yaml.config.swapper.rule.data.pipeline.YamlPipelineWriteConfigurationSwapper;
+import org.apache.shardingsphere.infra.yaml.config.swapper.rule.data.pipeline.YamlPipelineReadConfigurationSwapper;
 
 /**
  * YAML on rule altered action configuration swapper.
@@ -31,9 +31,9 @@ public final class YamlOnRuleAlteredActionConfigurationSwapper implements YamlCo
     
     private static final YamlAlgorithmConfigurationSwapper ALGORITHM_CONFIG_SWAPPER = new YamlAlgorithmConfigurationSwapper();
     
-    private static final YamlPipelineInputConfigurationSwapper INPUT_CONFIG_SWAPPER = new YamlPipelineInputConfigurationSwapper();
+    private static final YamlPipelineReadConfigurationSwapper READ_CONFIG_SWAPPER = new YamlPipelineReadConfigurationSwapper();
     
-    private static final YamlPipelineOutputConfigurationSwapper OUTPUT_CONFIG_SWAPPER = new YamlPipelineOutputConfigurationSwapper();
+    private static final YamlPipelineWriteConfigurationSwapper WRITE_CONFIG_SWAPPER = new YamlPipelineWriteConfigurationSwapper();
     
     @Override
     public YamlOnRuleAlteredActionConfiguration swapToYamlConfiguration(final OnRuleAlteredActionConfiguration data) {
@@ -41,8 +41,8 @@ public final class YamlOnRuleAlteredActionConfigurationSwapper implements YamlCo
             return null;
         }
         YamlOnRuleAlteredActionConfiguration result = new YamlOnRuleAlteredActionConfiguration();
-        result.setInput(INPUT_CONFIG_SWAPPER.swapToYamlConfiguration(data.getInput()));
-        result.setOutput(OUTPUT_CONFIG_SWAPPER.swapToYamlConfiguration(data.getOutput()));
+        result.setInput(READ_CONFIG_SWAPPER.swapToYamlConfiguration(data.getInput()));
+        result.setOutput(WRITE_CONFIG_SWAPPER.swapToYamlConfiguration(data.getOutput()));
         result.setStreamChannel(ALGORITHM_CONFIG_SWAPPER.swapToYamlConfiguration(data.getStreamChannel()));
         result.setCompletionDetector(ALGORITHM_CONFIG_SWAPPER.swapToYamlConfiguration(data.getCompletionDetector()));
         result.setDataConsistencyChecker(ALGORITHM_CONFIG_SWAPPER.swapToYamlConfiguration(data.getDataConsistencyCalculator()));
@@ -55,8 +55,8 @@ public final class YamlOnRuleAlteredActionConfigurationSwapper implements YamlCo
             return null;
         }
         return new OnRuleAlteredActionConfiguration(
-                INPUT_CONFIG_SWAPPER.swapToObject(yamlConfig.getInput()),
-                OUTPUT_CONFIG_SWAPPER.swapToObject(yamlConfig.getOutput()),
+                READ_CONFIG_SWAPPER.swapToObject(yamlConfig.getInput()),
+                WRITE_CONFIG_SWAPPER.swapToObject(yamlConfig.getOutput()),
                 ALGORITHM_CONFIG_SWAPPER.swapToObject(yamlConfig.getStreamChannel()),
                 ALGORITHM_CONFIG_SWAPPER.swapToObject(yamlConfig.getCompletionDetector()),
                 ALGORITHM_CONFIG_SWAPPER.swapToObject(yamlConfig.getDataConsistencyChecker()));
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineProcessConfigurationSwapperTest.java b/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineProcessConfigurationSwapperTest.java
index 8558f0197c1..1fc13fa0082 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineProcessConfigurationSwapperTest.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineProcessConfigurationSwapperTest.java
@@ -20,8 +20,8 @@ package org.apache.shardingsphere.infra.yaml.config.swapper.rule.data.pipeline;
 import org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import org.apache.shardingsphere.infra.yaml.config.pojo.algorithm.YamlAlgorithmConfiguration;
-import org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineInputConfiguration;
-import org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineOutputConfiguration;
+import org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineReadConfiguration;
+import org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineWriteConfiguration;
 import org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineProcessConfiguration;
 import org.junit.Test;
 
@@ -41,12 +41,12 @@ public final class YamlPipelineProcessConfigurationSwapperTest {
         Properties rateLimiterProps = new Properties();
         rateLimiterProps.setProperty("batch-size", "1000");
         rateLimiterProps.setProperty("qps", "50");
-        YamlPipelineInputConfiguration yamlInputConfig = YamlPipelineInputConfiguration.buildWithDefaultValue();
-        yamlConfig.setInput(yamlInputConfig);
+        YamlPipelineReadConfiguration yamlInputConfig = YamlPipelineReadConfiguration.buildWithDefaultValue();
+        yamlConfig.setRead(yamlInputConfig);
         yamlInputConfig.setRateLimiter(new YamlAlgorithmConfiguration("INPUT", rateLimiterProps));
-        YamlPipelineOutputConfiguration yamlOutputConfig = YamlPipelineOutputConfiguration.buildWithDefaultValue();
+        YamlPipelineWriteConfiguration yamlOutputConfig = YamlPipelineWriteConfiguration.buildWithDefaultValue();
         yamlOutputConfig.setRateLimiter(new YamlAlgorithmConfiguration("OUTPUT", rateLimiterProps));
-        yamlConfig.setOutput(yamlOutputConfig);
+        yamlConfig.setWrite(yamlOutputConfig);
         Properties streamChannelProps = new Properties();
         streamChannelProps.setProperty("block-queue-size", "10000");
         yamlConfig.setStreamChannel(new YamlAlgorithmConfiguration("MEMORY", streamChannelProps));
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/rulealtered/YamlOnRuleAlteredActionConfigurationSwapperTest.java b/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/rulealtered/YamlOnRuleAlteredActionConfigurationSwapperTest.java
index 42c6f5a4e9e..2cc32ed1c3a 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/rulealtered/YamlOnRuleAlteredActionConfigurationSwapperTest.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/rulealtered/YamlOnRuleAlteredActionConfigurationSwapperTest.java
@@ -20,8 +20,8 @@ package org.apache.shardingsphere.infra.yaml.config.swapper.rule.rulealtered;
 import org.apache.shardingsphere.infra.config.rule.rulealtered.OnRuleAlteredActionConfiguration;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import org.apache.shardingsphere.infra.yaml.config.pojo.algorithm.YamlAlgorithmConfiguration;
-import org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineInputConfiguration;
-import org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineOutputConfiguration;
+import org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineReadConfiguration;
+import org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineWriteConfiguration;
 import org.apache.shardingsphere.infra.yaml.config.pojo.rule.YamlOnRuleAlteredActionConfiguration;
 import org.junit.Test;
 
@@ -41,10 +41,10 @@ public final class YamlOnRuleAlteredActionConfigurationSwapperTest {
         Properties rateLimiterProps = new Properties();
         rateLimiterProps.setProperty("batch-size", "1000");
         rateLimiterProps.setProperty("qps", "50");
-        YamlPipelineInputConfiguration yamlInputConfig = YamlPipelineInputConfiguration.buildWithDefaultValue();
+        YamlPipelineReadConfiguration yamlInputConfig = YamlPipelineReadConfiguration.buildWithDefaultValue();
         yamlConfig.setInput(yamlInputConfig);
         yamlInputConfig.setRateLimiter(new YamlAlgorithmConfiguration("INPUT", rateLimiterProps));
-        YamlPipelineOutputConfiguration yamlOutputConfig = YamlPipelineOutputConfiguration.buildWithDefaultValue();
+        YamlPipelineWriteConfiguration yamlOutputConfig = YamlPipelineWriteConfiguration.buildWithDefaultValue();
         yamlOutputConfig.setRateLimiter(new YamlAlgorithmConfiguration("OUTPUT", rateLimiterProps));
         yamlConfig.setOutput(yamlOutputConfig);
         Properties streamChannelProps = new Properties();
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/PipelineProcessContext.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/PipelineProcessContext.java
index a0138e7ba4b..ab015372778 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/PipelineProcessContext.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/PipelineProcessContext.java
@@ -41,9 +41,9 @@ public interface PipelineProcessContext {
     PipelineChannelCreator getPipelineChannelCreator();
     
     /**
-     * Get job input rate limit algorithm.
+     * Get job read rate limit algorithm.
      *
-     * @return job input rate limit algorithm
+     * @return job read rate limit algorithm
      */
-    JobRateLimitAlgorithm getInputRateLimitAlgorithm();
+    JobRateLimitAlgorithm getReadRateLimitAlgorithm();
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
index 3a18c77820c..08f2e2a0631 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
@@ -157,7 +157,7 @@ public final class DataConsistencyChecker {
         PipelineDataSourceConfiguration targetDataSourceConfig = jobConfig.getTarget();
         ThreadFactory threadFactory = ExecutorThreadFactoryBuilder.build("job-" + getJobIdDigest(jobConfig.getJobId()) + "-data-check-%d");
         ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), threadFactory);
-        JobRateLimitAlgorithm inputRateLimitAlgorithm = RuleAlteredJobWorker.createRuleAlteredContext(jobConfig).getInputRateLimitAlgorithm();
+        JobRateLimitAlgorithm readRateLimitAlgorithm = RuleAlteredJobWorker.createRuleAlteredContext(jobConfig).getReadRateLimitAlgorithm();
         Map<String, DataConsistencyContentCheckResult> result = new HashMap<>(logicTableNames.size(), 1);
         try (
                 PipelineDataSourceWrapper sourceDataSource = PipelineDataSourceFactory.newInstance(sourceDataSourceConfig);
@@ -177,8 +177,8 @@ public final class DataConsistencyChecker {
                 Iterator<Object> targetCalculatedResults = calculator.calculate(targetParameter).iterator();
                 boolean contentMatched = true;
                 while (sourceCalculatedResults.hasNext() && targetCalculatedResults.hasNext()) {
-                    if (null != inputRateLimitAlgorithm) {
-                        inputRateLimitAlgorithm.intercept(JobOperationType.SELECT, 1);
+                    if (null != readRateLimitAlgorithm) {
+                        readRateLimitAlgorithm.intercept(JobOperationType.SELECT, 1);
                     }
                     Future<Object> sourceFuture = executor.submit(sourceCalculatedResults::next);
                     Future<Object> targetFuture = executor.submit(targetCalculatedResults::next);
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/AbstractPipelineProcessContext.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/AbstractPipelineProcessContext.java
index 4178c1dbea0..774ca9e6145 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/AbstractPipelineProcessContext.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/AbstractPipelineProcessContext.java
@@ -30,12 +30,12 @@ import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChanne
 import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
 import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithmFactory;
 import org.apache.shardingsphere.infra.config.algorithm.AlgorithmConfiguration;
-import org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineInputConfiguration;
-import org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineOutputConfiguration;
+import org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineReadConfiguration;
+import org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineWriteConfiguration;
 import org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
 import org.apache.shardingsphere.infra.yaml.config.pojo.algorithm.YamlAlgorithmConfiguration;
-import org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineInputConfiguration;
-import org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineOutputConfiguration;
+import org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineReadConfiguration;
+import org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineWriteConfiguration;
 import org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineProcessConfiguration;
 import org.apache.shardingsphere.infra.yaml.config.swapper.rule.data.pipeline.YamlPipelineProcessConfigurationSwapper;
 
@@ -52,9 +52,9 @@ public abstract class AbstractPipelineProcessContext implements PipelineProcessC
     
     private final PipelineProcessConfiguration pipelineProcessConfig;
     
-    private final JobRateLimitAlgorithm inputRateLimitAlgorithm;
+    private final JobRateLimitAlgorithm readRateLimitAlgorithm;
     
-    private final JobRateLimitAlgorithm outputRateLimitAlgorithm;
+    private final JobRateLimitAlgorithm writeRateLimitAlgorithm;
     
     private final PipelineChannelCreator pipelineChannelCreator;
     
@@ -67,19 +67,19 @@ public abstract class AbstractPipelineProcessContext implements PipelineProcessC
     public AbstractPipelineProcessContext(final String jobId, final PipelineProcessConfiguration originalProcessConfig) {
         PipelineProcessConfiguration processConfig = convertProcessConfig(originalProcessConfig);
         this.pipelineProcessConfig = processConfig;
-        PipelineInputConfiguration inputConfig = processConfig.getInput();
-        AlgorithmConfiguration inputRateLimiter = inputConfig.getRateLimiter();
-        inputRateLimitAlgorithm = null != inputRateLimiter ? JobRateLimitAlgorithmFactory.newInstance(inputRateLimiter) : null;
-        PipelineOutputConfiguration outputConfig = processConfig.getOutput();
-        AlgorithmConfiguration outputRateLimiter = outputConfig.getRateLimiter();
-        outputRateLimitAlgorithm = null != outputRateLimiter ? JobRateLimitAlgorithmFactory.newInstance(outputRateLimiter) : null;
+        PipelineReadConfiguration readConfig = processConfig.getRead();
+        AlgorithmConfiguration readRateLimiter = readConfig.getRateLimiter();
+        readRateLimitAlgorithm = null != readRateLimiter ? JobRateLimitAlgorithmFactory.newInstance(readRateLimiter) : null;
+        PipelineWriteConfiguration writeConfig = processConfig.getWrite();
+        AlgorithmConfiguration writeRateLimiter = writeConfig.getRateLimiter();
+        writeRateLimitAlgorithm = null != writeRateLimiter ? JobRateLimitAlgorithmFactory.newInstance(writeRateLimiter) : null;
         AlgorithmConfiguration streamChannel = processConfig.getStreamChannel();
         pipelineChannelCreator = PipelineChannelCreatorFactory.newInstance(streamChannel);
         inventoryDumperExecuteEngineLazyInitializer = new LazyInitializer<ExecuteEngine>() {
             
             @Override
             protected ExecuteEngine initialize() {
-                return ExecuteEngine.newFixedThreadInstance(inputConfig.getWorkerThread(), "Inventory-" + jobId);
+                return ExecuteEngine.newFixedThreadInstance(readConfig.getWorkerThread(), "Inventory-" + jobId);
             }
         };
         incrementalDumperExecuteEngineLazyInitializer = new LazyInitializer<ExecuteEngine>() {
@@ -93,22 +93,22 @@ public abstract class AbstractPipelineProcessContext implements PipelineProcessC
             
             @Override
             protected ExecuteEngine initialize() {
-                return ExecuteEngine.newFixedThreadInstance(outputConfig.getWorkerThread(), "Importer-" + jobId);
+                return ExecuteEngine.newFixedThreadInstance(writeConfig.getWorkerThread(), "Importer-" + jobId);
             }
         };
     }
     
     private PipelineProcessConfiguration convertProcessConfig(final PipelineProcessConfiguration originalProcessConfig) {
         YamlPipelineProcessConfiguration yamlActionConfig = SWAPPER.swapToYamlConfiguration(originalProcessConfig);
-        if (null == yamlActionConfig.getInput()) {
-            yamlActionConfig.setInput(YamlPipelineInputConfiguration.buildWithDefaultValue());
+        if (null == yamlActionConfig.getRead()) {
+            yamlActionConfig.setRead(YamlPipelineReadConfiguration.buildWithDefaultValue());
         } else {
-            yamlActionConfig.getInput().fillInNullFieldsWithDefaultValue();
+            yamlActionConfig.getRead().fillInNullFieldsWithDefaultValue();
         }
-        if (null == yamlActionConfig.getOutput()) {
-            yamlActionConfig.setOutput(YamlPipelineOutputConfiguration.buildWithDefaultValue());
+        if (null == yamlActionConfig.getWrite()) {
+            yamlActionConfig.setWrite(YamlPipelineWriteConfiguration.buildWithDefaultValue());
         } else {
-            yamlActionConfig.getOutput().fillInNullFieldsWithDefaultValue();
+            yamlActionConfig.getWrite().fillInNullFieldsWithDefaultValue();
         }
         if (null == yamlActionConfig.getStreamChannel()) {
             yamlActionConfig.setStreamChannel(new YamlAlgorithmConfiguration(MemoryPipelineChannelCreator.TYPE, new Properties()));
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
index 072ea2756bf..7769110cd9d 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
@@ -48,7 +48,7 @@ import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
 import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
 import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
-import org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineInputConfiguration;
+import org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineReadConfiguration;
 
 import javax.sql.DataSource;
 import java.sql.Connection;
@@ -120,9 +120,9 @@ public final class InventoryTaskSplitter {
                                                                        final InventoryDumperConfiguration dumperConfig) {
         Collection<InventoryDumperConfiguration> result = new LinkedList<>();
         PipelineProcessContext ruleAlteredContext = jobItemContext.getJobProcessContext();
-        PipelineInputConfiguration inputConfig = ruleAlteredContext.getPipelineProcessConfig().getInput();
-        int batchSize = inputConfig.getBatchSize();
-        JobRateLimitAlgorithm rateLimitAlgorithm = ruleAlteredContext.getInputRateLimitAlgorithm();
+        PipelineReadConfiguration readConfig = ruleAlteredContext.getPipelineProcessConfig().getRead();
+        int batchSize = readConfig.getBatchSize();
+        JobRateLimitAlgorithm rateLimitAlgorithm = ruleAlteredContext.getReadRateLimitAlgorithm();
         Collection<IngestPosition<?>> inventoryPositions = getInventoryPositions(jobItemContext, dumperConfig, dataSource, metaDataLoader);
         int i = 0;
         for (IngestPosition<?> inventoryPosition : inventoryPositions) {
@@ -201,7 +201,7 @@ public final class InventoryTaskSplitter {
         PipelineJobConfiguration jobConfig = jobItemContext.getJobConfig();
         String sql = PipelineSQLBuilderFactory.getInstance(jobConfig.getSourceDatabaseType())
                 .buildSplitByPrimaryKeyRangeSQL(dumperConfig.getSchemaName(new LogicTableName(dumperConfig.getLogicTableName())), dumperConfig.getActualTableName(), dumperConfig.getUniqueKey());
-        int shardingSize = jobItemContext.getJobProcessContext().getPipelineProcessConfig().getInput().getShardingSize();
+        int shardingSize = jobItemContext.getJobProcessContext().getPipelineProcessConfig().getRead().getShardingSize();
         try (
                 Connection connection = dataSource.getConnection();
                 PreparedStatement ps = connection.prepareStatement(sql)) {