You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/08/18 11:22:31 UTC
[shardingsphere] branch master updated: Rename PipelineProcessConfiguration related input and output to read and write (#20261)
This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new de9ad7e8f07 Rename PipelineProcessConfiguration related input and output to read and write (#20261)
de9ad7e8f07 is described below
commit de9ad7e8f0757416c150c37b1f3ede31b4183ba9
Author: Hongsheng Zhong <sa...@126.com>
AuthorDate: Thu Aug 18 19:22:20 2022 +0800
Rename PipelineProcessConfiguration related input and output to read and write (#20261)
* Rename PipelineProcessConfiguration related input and output to read and write
* Continue rename
---
...hardingRuleAlteredJobConfigurationPreparer.java | 2 +-
.../ShardingScalingRuleStatementConverter.java | 16 ++++-----
.../ShardingScalingRulesQueryResultSetTest.java | 8 ++---
.../pipeline/PipelineProcessConfiguration.java | 4 +--
...uration.java => PipelineReadConfiguration.java} | 4 +--
...ration.java => PipelineWriteConfiguration.java} | 4 +--
.../OnRuleAlteredActionConfiguration.java | 8 ++---
.../pipeline/YamlPipelineProcessConfiguration.java | 4 +--
...ion.java => YamlPipelineReadConfiguration.java} | 10 +++---
...on.java => YamlPipelineWriteConfiguration.java} | 10 +++---
.../rule/YamlOnRuleAlteredActionConfiguration.java | 8 ++---
.../YamlPipelineProcessConfigurationSwapper.java | 12 +++----
...a => YamlPipelineReadConfigurationSwapper.java} | 16 ++++-----
... => YamlPipelineWriteConfigurationSwapper.java} | 16 ++++-----
...amlOnRuleAlteredActionConfigurationSwapper.java | 16 ++++-----
...amlPipelineProcessConfigurationSwapperTest.java | 12 +++----
...nRuleAlteredActionConfigurationSwapperTest.java | 8 ++---
.../api/context/PipelineProcessContext.java | 6 ++--
.../check/consistency/DataConsistencyChecker.java | 6 ++--
.../context/AbstractPipelineProcessContext.java | 40 +++++++++++-----------
.../core/prepare/InventoryTaskSplitter.java | 10 +++---
21 files changed, 110 insertions(+), 110 deletions(-)
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingRuleAlteredJobConfigurationPreparer.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingRuleAlteredJobConfigurationPreparer.java
index 36d60d5262b..e611ae7c40f 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingRuleAlteredJobConfigurationPreparer.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingRuleAlteredJobConfigurationPreparer.java
@@ -239,7 +239,7 @@ public final class ShardingRuleAlteredJobConfigurationPreparer implements RuleAl
private static ImporterConfiguration createImporterConfiguration(final MigrationJobConfiguration jobConfig, final PipelineProcessConfiguration pipelineProcessConfig,
final Map<LogicTableName, Set<String>> shardingColumnsMap, final TableNameSchemaNameMapping tableNameSchemaNameMapping) {
PipelineDataSourceConfiguration dataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getTarget().getType(), jobConfig.getTarget().getParameter());
- int batchSize = pipelineProcessConfig.getOutput().getBatchSize();
+ int batchSize = pipelineProcessConfig.getWrite().getBatchSize();
int retryTimes = jobConfig.getRetryTimes();
int concurrency = jobConfig.getConcurrency();
return new ImporterConfiguration(dataSourceConfig, unmodifiable(shardingColumnsMap), tableNameSchemaNameMapping, batchSize, retryTimes, concurrency);
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/converter/ShardingScalingRuleStatementConverter.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/converter/ShardingScalingRuleStatementConverter.java
index 09ecd1520f2..4dcf5416bfd 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/converter/ShardingScalingRuleStatementConverter.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/converter/ShardingScalingRuleStatementConverter.java
@@ -21,8 +21,8 @@ import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.shardingsphere.distsql.parser.segment.AlgorithmSegment;
import org.apache.shardingsphere.infra.config.algorithm.AlgorithmConfiguration;
-import org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineInputConfiguration;
-import org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineOutputConfiguration;
+import org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineReadConfiguration;
+import org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineWriteConfiguration;
import org.apache.shardingsphere.infra.config.rule.rulealtered.OnRuleAlteredActionConfiguration;
import org.apache.shardingsphere.migration.distsql.statement.segment.InputOrOutputSegment;
import org.apache.shardingsphere.migration.distsql.statement.segment.ShardingScalingRuleConfigurationSegment;
@@ -40,26 +40,26 @@ public final class ShardingScalingRuleStatementConverter {
* @return on rule altered action configuration
*/
public static OnRuleAlteredActionConfiguration convert(final ShardingScalingRuleConfigurationSegment segment) {
- PipelineInputConfiguration inputConfig = convertToInputConfiguration(segment.getInputSegment());
- PipelineOutputConfiguration outputConfig = convertToOutputConfiguration(segment.getOutputSegment());
+ PipelineReadConfiguration inputConfig = convertToInputConfiguration(segment.getInputSegment());
+ PipelineWriteConfiguration outputConfig = convertToOutputConfiguration(segment.getOutputSegment());
AlgorithmConfiguration streamChannel = convertToAlgorithm(segment.getStreamChannel());
AlgorithmConfiguration completionDetector = convertToAlgorithm(segment.getCompletionDetector());
AlgorithmConfiguration dataConsistencyChecker = convertToAlgorithm(segment.getDataConsistencyCalculator());
return new OnRuleAlteredActionConfiguration(inputConfig, outputConfig, streamChannel, completionDetector, dataConsistencyChecker);
}
- private static PipelineInputConfiguration convertToInputConfiguration(final InputOrOutputSegment inputSegment) {
+ private static PipelineReadConfiguration convertToInputConfiguration(final InputOrOutputSegment inputSegment) {
if (null == inputSegment) {
return null;
}
- return new PipelineInputConfiguration(inputSegment.getWorkerThread(), inputSegment.getBatchSize(), inputSegment.getShardingSize(), convertToAlgorithm(inputSegment.getRateLimiter()));
+ return new PipelineReadConfiguration(inputSegment.getWorkerThread(), inputSegment.getBatchSize(), inputSegment.getShardingSize(), convertToAlgorithm(inputSegment.getRateLimiter()));
}
- private static PipelineOutputConfiguration convertToOutputConfiguration(final InputOrOutputSegment outputSegment) {
+ private static PipelineWriteConfiguration convertToOutputConfiguration(final InputOrOutputSegment outputSegment) {
if (null == outputSegment) {
return null;
}
- return new PipelineOutputConfiguration(outputSegment.getWorkerThread(), outputSegment.getBatchSize(), convertToAlgorithm(outputSegment.getRateLimiter()));
+ return new PipelineWriteConfiguration(outputSegment.getWorkerThread(), outputSegment.getBatchSize(), convertToAlgorithm(outputSegment.getRateLimiter()));
}
private static AlgorithmConfiguration convertToAlgorithm(final AlgorithmSegment segment) {
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/test/java/org/apache/shardingsphere/scaling/distsql/handler/query/ShardingScalingRulesQueryResultSetTest.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/test/java/org/apache/shardingsphere/scaling/distsql/handler/query/ShardingScalingRulesQueryResultSetTest.java
index 8b37b741c0f..5ba69104f48 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/test/java/org/apache/shardingsphere/scaling/distsql/handler/query/ShardingScalingRulesQueryResultSetTest.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/test/java/org/apache/shardingsphere/scaling/distsql/handler/query/ShardingScalingRulesQueryResultSetTest.java
@@ -19,8 +19,8 @@ package org.apache.shardingsphere.scaling.distsql.handler.query;
import org.apache.shardingsphere.infra.config.algorithm.AlgorithmConfiguration;
import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
-import org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineInputConfiguration;
-import org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineOutputConfiguration;
+import org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineReadConfiguration;
+import org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineWriteConfiguration;
import org.apache.shardingsphere.infra.config.rule.rulealtered.OnRuleAlteredActionConfiguration;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.migration.distsql.handler.query.ShardingScalingRulesQueryResultSet;
@@ -82,8 +82,8 @@ public final class ShardingScalingRulesQueryResultSetTest {
}
private OnRuleAlteredActionConfiguration buildCompleteConfiguration() {
- PipelineInputConfiguration inputConfig = new PipelineInputConfiguration(10, 100, 10, new AlgorithmConfiguration("QPS", createProperties("qps", "50")));
- PipelineOutputConfiguration outputConfig = new PipelineOutputConfiguration(10, 100, new AlgorithmConfiguration("TPS", createProperties("tps", "2000")));
+ PipelineReadConfiguration inputConfig = new PipelineReadConfiguration(10, 100, 10, new AlgorithmConfiguration("QPS", createProperties("qps", "50")));
+ PipelineWriteConfiguration outputConfig = new PipelineWriteConfiguration(10, 100, new AlgorithmConfiguration("TPS", createProperties("tps", "2000")));
AlgorithmConfiguration streamChannel = new AlgorithmConfiguration("MEMORY", createProperties("block-queue-size", "10000"));
AlgorithmConfiguration completionDetector = new AlgorithmConfiguration("IDLE", createProperties("incremental-task-idle-seconds-threshold", "1800"));
AlgorithmConfiguration dataConsistencyChecker = new AlgorithmConfiguration("DATA_MATCH", createProperties("chunk-size", "1000"));
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/data/pipeline/PipelineProcessConfiguration.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/data/pipeline/PipelineProcessConfiguration.java
index 928960d3026..a9932adafda 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/data/pipeline/PipelineProcessConfiguration.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/data/pipeline/PipelineProcessConfiguration.java
@@ -30,9 +30,9 @@ import org.apache.shardingsphere.infra.config.algorithm.AlgorithmConfiguration;
@ToString
public final class PipelineProcessConfiguration {
- private final PipelineInputConfiguration input;
+ private final PipelineReadConfiguration read;
- private final PipelineOutputConfiguration output;
+ private final PipelineWriteConfiguration write;
private final AlgorithmConfiguration streamChannel;
}
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/data/pipeline/PipelineInputConfiguration.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/data/pipeline/PipelineReadConfiguration.java
similarity index 94%
rename from shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/data/pipeline/PipelineInputConfiguration.java
rename to shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/data/pipeline/PipelineReadConfiguration.java
index d723b011eec..207af4c44e6 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/data/pipeline/PipelineInputConfiguration.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/data/pipeline/PipelineReadConfiguration.java
@@ -23,12 +23,12 @@ import lombok.ToString;
import org.apache.shardingsphere.infra.config.algorithm.AlgorithmConfiguration;
/**
- * Pipeline input configuration.
+ * Pipeline read configuration.
*/
@RequiredArgsConstructor
@Getter
@ToString
-public final class PipelineInputConfiguration {
+public final class PipelineReadConfiguration {
private final Integer workerThread;
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/data/pipeline/PipelineOutputConfiguration.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/data/pipeline/PipelineWriteConfiguration.java
similarity index 93%
rename from shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/data/pipeline/PipelineOutputConfiguration.java
rename to shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/data/pipeline/PipelineWriteConfiguration.java
index af617327bc1..e9ff81a53d0 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/data/pipeline/PipelineOutputConfiguration.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/data/pipeline/PipelineWriteConfiguration.java
@@ -23,12 +23,12 @@ import lombok.ToString;
import org.apache.shardingsphere.infra.config.algorithm.AlgorithmConfiguration;
/**
- * Pipeline output configuration.
+ * Pipeline write configuration.
*/
@RequiredArgsConstructor
@Getter
@ToString
-public final class PipelineOutputConfiguration {
+public final class PipelineWriteConfiguration {
private final Integer workerThread;
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/rulealtered/OnRuleAlteredActionConfiguration.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/rulealtered/OnRuleAlteredActionConfiguration.java
index bcc6dd0451a..a7f77e8de44 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/rulealtered/OnRuleAlteredActionConfiguration.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/rulealtered/OnRuleAlteredActionConfiguration.java
@@ -21,8 +21,8 @@ import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
import org.apache.shardingsphere.infra.config.algorithm.AlgorithmConfiguration;
-import org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineInputConfiguration;
-import org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineOutputConfiguration;
+import org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineReadConfiguration;
+import org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineWriteConfiguration;
/**
* On rule altered action configuration.
@@ -32,9 +32,9 @@ import org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineOutputC
@ToString
public final class OnRuleAlteredActionConfiguration {
- private final PipelineInputConfiguration input;
+ private final PipelineReadConfiguration input;
- private final PipelineOutputConfiguration output;
+ private final PipelineWriteConfiguration output;
private final AlgorithmConfiguration streamChannel;
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineProcessConfiguration.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineProcessConfiguration.java
index 71bb47943e6..3cd18b1864e 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineProcessConfiguration.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineProcessConfiguration.java
@@ -31,9 +31,9 @@ import org.apache.shardingsphere.infra.yaml.config.pojo.algorithm.YamlAlgorithmC
@ToString
public final class YamlPipelineProcessConfiguration implements YamlConfiguration {
- private YamlPipelineInputConfiguration input;
+ private YamlPipelineReadConfiguration read;
- private YamlPipelineOutputConfiguration output;
+ private YamlPipelineWriteConfiguration write;
private YamlAlgorithmConfiguration streamChannel;
}
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineInputConfiguration.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineReadConfiguration.java
similarity index 87%
rename from shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineInputConfiguration.java
rename to shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineReadConfiguration.java
index 3c7ffd0bbc2..4e13aabca34 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineInputConfiguration.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineReadConfiguration.java
@@ -22,10 +22,10 @@ import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
import org.apache.shardingsphere.infra.yaml.config.pojo.algorithm.YamlAlgorithmConfiguration;
/**
- * YAML pipeline input configuration.
+ * YAML pipeline read configuration.
*/
@Data
-public final class YamlPipelineInputConfiguration implements YamlConfiguration {
+public final class YamlPipelineReadConfiguration implements YamlConfiguration {
private static final Integer DEFAULT_WORKER_THREAD = 40;
@@ -44,10 +44,10 @@ public final class YamlPipelineInputConfiguration implements YamlConfiguration {
/**
* Build with default value.
*
- * @return input configuration
+ * @return read configuration
*/
- public static YamlPipelineInputConfiguration buildWithDefaultValue() {
- return new YamlPipelineInputConfiguration();
+ public static YamlPipelineReadConfiguration buildWithDefaultValue() {
+ return new YamlPipelineReadConfiguration();
}
/**
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineOutputConfiguration.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineWriteConfiguration.java
similarity index 85%
rename from shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineOutputConfiguration.java
rename to shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineWriteConfiguration.java
index 578357cd1d0..fb5460ae6b6 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineOutputConfiguration.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineWriteConfiguration.java
@@ -22,10 +22,10 @@ import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
import org.apache.shardingsphere.infra.yaml.config.pojo.algorithm.YamlAlgorithmConfiguration;
/**
- * YAML pipeline output configuration.
+ * YAML pipeline write configuration.
*/
@Data
-public final class YamlPipelineOutputConfiguration implements YamlConfiguration {
+public final class YamlPipelineWriteConfiguration implements YamlConfiguration {
private static final Integer DEFAULT_WORKER_THREAD = 40;
@@ -40,10 +40,10 @@ public final class YamlPipelineOutputConfiguration implements YamlConfiguration
/**
* Build with default value.
*
- * @return output configuration
+ * @return write configuration
*/
- public static YamlPipelineOutputConfiguration buildWithDefaultValue() {
- return new YamlPipelineOutputConfiguration();
+ public static YamlPipelineWriteConfiguration buildWithDefaultValue() {
+ return new YamlPipelineWriteConfiguration();
}
/**
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/rule/YamlOnRuleAlteredActionConfiguration.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/rule/YamlOnRuleAlteredActionConfiguration.java
index 2c742bc0d60..93cda75fdca 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/rule/YamlOnRuleAlteredActionConfiguration.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/rule/YamlOnRuleAlteredActionConfiguration.java
@@ -22,8 +22,8 @@ import lombok.Setter;
import lombok.ToString;
import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
import org.apache.shardingsphere.infra.yaml.config.pojo.algorithm.YamlAlgorithmConfiguration;
-import org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineInputConfiguration;
-import org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineOutputConfiguration;
+import org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineReadConfiguration;
+import org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineWriteConfiguration;
/**
* YAML on rule altered action configuration.
@@ -33,9 +33,9 @@ import org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipeli
@ToString
public final class YamlOnRuleAlteredActionConfiguration implements YamlConfiguration {
- private YamlPipelineInputConfiguration input;
+ private YamlPipelineReadConfiguration input;
- private YamlPipelineOutputConfiguration output;
+ private YamlPipelineWriteConfiguration output;
private YamlAlgorithmConfiguration streamChannel;
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineProcessConfigurationSwapper.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineProcessConfigurationSwapper.java
index 16154b6f2fc..7c1f7cf4950 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineProcessConfigurationSwapper.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineProcessConfigurationSwapper.java
@@ -29,9 +29,9 @@ public final class YamlPipelineProcessConfigurationSwapper implements YamlConfig
private static final YamlAlgorithmConfigurationSwapper ALGORITHM_CONFIG_SWAPPER = new YamlAlgorithmConfigurationSwapper();
- private static final YamlPipelineInputConfigurationSwapper INPUT_CONFIG_SWAPPER = new YamlPipelineInputConfigurationSwapper();
+ private static final YamlPipelineReadConfigurationSwapper READ_CONFIG_SWAPPER = new YamlPipelineReadConfigurationSwapper();
- private static final YamlPipelineOutputConfigurationSwapper OUTPUT_CONFIG_SWAPPER = new YamlPipelineOutputConfigurationSwapper();
+ private static final YamlPipelineWriteConfigurationSwapper WRITE_CONFIG_SWAPPER = new YamlPipelineWriteConfigurationSwapper();
@Override
public YamlPipelineProcessConfiguration swapToYamlConfiguration(final PipelineProcessConfiguration data) {
@@ -39,8 +39,8 @@ public final class YamlPipelineProcessConfigurationSwapper implements YamlConfig
return null;
}
YamlPipelineProcessConfiguration result = new YamlPipelineProcessConfiguration();
- result.setInput(INPUT_CONFIG_SWAPPER.swapToYamlConfiguration(data.getInput()));
- result.setOutput(OUTPUT_CONFIG_SWAPPER.swapToYamlConfiguration(data.getOutput()));
+ result.setRead(READ_CONFIG_SWAPPER.swapToYamlConfiguration(data.getRead()));
+ result.setWrite(WRITE_CONFIG_SWAPPER.swapToYamlConfiguration(data.getWrite()));
result.setStreamChannel(ALGORITHM_CONFIG_SWAPPER.swapToYamlConfiguration(data.getStreamChannel()));
return result;
}
@@ -51,8 +51,8 @@ public final class YamlPipelineProcessConfigurationSwapper implements YamlConfig
return null;
}
return new PipelineProcessConfiguration(
- INPUT_CONFIG_SWAPPER.swapToObject(yamlConfig.getInput()),
- OUTPUT_CONFIG_SWAPPER.swapToObject(yamlConfig.getOutput()),
+ READ_CONFIG_SWAPPER.swapToObject(yamlConfig.getRead()),
+ WRITE_CONFIG_SWAPPER.swapToObject(yamlConfig.getWrite()),
ALGORITHM_CONFIG_SWAPPER.swapToObject(yamlConfig.getStreamChannel()));
}
}
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineInputConfigurationSwapper.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineReadConfigurationSwapper.java
similarity index 73%
rename from shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineInputConfigurationSwapper.java
rename to shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineReadConfigurationSwapper.java
index ba705d67a15..28e60dc3eee 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineInputConfigurationSwapper.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineReadConfigurationSwapper.java
@@ -18,25 +18,25 @@
package org.apache.shardingsphere.infra.yaml.config.swapper.rule.data.pipeline;
import lombok.Data;
-import org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineInputConfiguration;
-import org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineInputConfiguration;
+import org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineReadConfiguration;
+import org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineReadConfiguration;
import org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
import org.apache.shardingsphere.infra.yaml.config.swapper.algorithm.YamlAlgorithmConfigurationSwapper;
/**
- * YAML pipeline input configuration swapper.
+ * YAML pipeline read configuration swapper.
*/
@Data
-public final class YamlPipelineInputConfigurationSwapper implements YamlConfigurationSwapper<YamlPipelineInputConfiguration, PipelineInputConfiguration> {
+public final class YamlPipelineReadConfigurationSwapper implements YamlConfigurationSwapper<YamlPipelineReadConfiguration, PipelineReadConfiguration> {
private static final YamlAlgorithmConfigurationSwapper ALGORITHM_CONFIG_SWAPPER = new YamlAlgorithmConfigurationSwapper();
@Override
- public YamlPipelineInputConfiguration swapToYamlConfiguration(final PipelineInputConfiguration data) {
+ public YamlPipelineReadConfiguration swapToYamlConfiguration(final PipelineReadConfiguration data) {
if (null == data) {
return null;
}
- YamlPipelineInputConfiguration result = new YamlPipelineInputConfiguration();
+ YamlPipelineReadConfiguration result = new YamlPipelineReadConfiguration();
result.setWorkerThread(data.getWorkerThread());
result.setBatchSize(data.getBatchSize());
result.setShardingSize(data.getShardingSize());
@@ -45,10 +45,10 @@ public final class YamlPipelineInputConfigurationSwapper implements YamlConfigur
}
@Override
- public PipelineInputConfiguration swapToObject(final YamlPipelineInputConfiguration yamlConfig) {
+ public PipelineReadConfiguration swapToObject(final YamlPipelineReadConfiguration yamlConfig) {
return null == yamlConfig
? null
- : new PipelineInputConfiguration(yamlConfig.getWorkerThread(), yamlConfig.getBatchSize(), yamlConfig.getShardingSize(),
+ : new PipelineReadConfiguration(yamlConfig.getWorkerThread(), yamlConfig.getBatchSize(), yamlConfig.getShardingSize(),
ALGORITHM_CONFIG_SWAPPER.swapToObject(yamlConfig.getRateLimiter()));
}
}
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineOutputConfigurationSwapper.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineWriteConfigurationSwapper.java
similarity index 70%
rename from shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineOutputConfigurationSwapper.java
rename to shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineWriteConfigurationSwapper.java
index de24408ea96..a86c8b4e4f0 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineOutputConfigurationSwapper.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineWriteConfigurationSwapper.java
@@ -18,25 +18,25 @@
package org.apache.shardingsphere.infra.yaml.config.swapper.rule.data.pipeline;
import lombok.Data;
-import org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineOutputConfiguration;
-import org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineOutputConfiguration;
+import org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineWriteConfiguration;
+import org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineWriteConfiguration;
import org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
import org.apache.shardingsphere.infra.yaml.config.swapper.algorithm.YamlAlgorithmConfigurationSwapper;
/**
- * YAML pipeline output configuration swapper.
+ * YAML pipeline write configuration swapper.
*/
@Data
-public final class YamlPipelineOutputConfigurationSwapper implements YamlConfigurationSwapper<YamlPipelineOutputConfiguration, PipelineOutputConfiguration> {
+public final class YamlPipelineWriteConfigurationSwapper implements YamlConfigurationSwapper<YamlPipelineWriteConfiguration, PipelineWriteConfiguration> {
private static final YamlAlgorithmConfigurationSwapper ALGORITHM_CONFIG_SWAPPER = new YamlAlgorithmConfigurationSwapper();
@Override
- public YamlPipelineOutputConfiguration swapToYamlConfiguration(final PipelineOutputConfiguration data) {
+ public YamlPipelineWriteConfiguration swapToYamlConfiguration(final PipelineWriteConfiguration data) {
if (null == data) {
return null;
}
- YamlPipelineOutputConfiguration result = new YamlPipelineOutputConfiguration();
+ YamlPipelineWriteConfiguration result = new YamlPipelineWriteConfiguration();
result.setWorkerThread(data.getWorkerThread());
result.setBatchSize(data.getBatchSize());
result.setRateLimiter(ALGORITHM_CONFIG_SWAPPER.swapToYamlConfiguration(data.getRateLimiter()));
@@ -44,9 +44,9 @@ public final class YamlPipelineOutputConfigurationSwapper implements YamlConfigu
}
@Override
- public PipelineOutputConfiguration swapToObject(final YamlPipelineOutputConfiguration yamlConfig) {
+ public PipelineWriteConfiguration swapToObject(final YamlPipelineWriteConfiguration yamlConfig) {
return null == yamlConfig
? null
- : new PipelineOutputConfiguration(yamlConfig.getWorkerThread(), yamlConfig.getBatchSize(), ALGORITHM_CONFIG_SWAPPER.swapToObject(yamlConfig.getRateLimiter()));
+ : new PipelineWriteConfiguration(yamlConfig.getWorkerThread(), yamlConfig.getBatchSize(), ALGORITHM_CONFIG_SWAPPER.swapToObject(yamlConfig.getRateLimiter()));
}
}
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/rulealtered/YamlOnRuleAlteredActionConfigurationSwapper.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/rulealtered/YamlOnRuleAlteredActionConfigurationSwapper.java
index f5deaeaba24..99ccbade6fa 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/rulealtered/YamlOnRuleAlteredActionConfigurationSwapper.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/rulealtered/YamlOnRuleAlteredActionConfigurationSwapper.java
@@ -21,8 +21,8 @@ import org.apache.shardingsphere.infra.config.rule.rulealtered.OnRuleAlteredActi
import org.apache.shardingsphere.infra.yaml.config.pojo.rule.YamlOnRuleAlteredActionConfiguration;
import org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
import org.apache.shardingsphere.infra.yaml.config.swapper.algorithm.YamlAlgorithmConfigurationSwapper;
-import org.apache.shardingsphere.infra.yaml.config.swapper.rule.data.pipeline.YamlPipelineOutputConfigurationSwapper;
-import org.apache.shardingsphere.infra.yaml.config.swapper.rule.data.pipeline.YamlPipelineInputConfigurationSwapper;
+import org.apache.shardingsphere.infra.yaml.config.swapper.rule.data.pipeline.YamlPipelineWriteConfigurationSwapper;
+import org.apache.shardingsphere.infra.yaml.config.swapper.rule.data.pipeline.YamlPipelineReadConfigurationSwapper;
/**
* YAML on rule altered action configuration swapper.
@@ -31,9 +31,9 @@ public final class YamlOnRuleAlteredActionConfigurationSwapper implements YamlCo
private static final YamlAlgorithmConfigurationSwapper ALGORITHM_CONFIG_SWAPPER = new YamlAlgorithmConfigurationSwapper();
- private static final YamlPipelineInputConfigurationSwapper INPUT_CONFIG_SWAPPER = new YamlPipelineInputConfigurationSwapper();
+ private static final YamlPipelineReadConfigurationSwapper READ_CONFIG_SWAPPER = new YamlPipelineReadConfigurationSwapper();
- private static final YamlPipelineOutputConfigurationSwapper OUTPUT_CONFIG_SWAPPER = new YamlPipelineOutputConfigurationSwapper();
+ private static final YamlPipelineWriteConfigurationSwapper WRITE_CONFIG_SWAPPER = new YamlPipelineWriteConfigurationSwapper();
@Override
public YamlOnRuleAlteredActionConfiguration swapToYamlConfiguration(final OnRuleAlteredActionConfiguration data) {
@@ -41,8 +41,8 @@ public final class YamlOnRuleAlteredActionConfigurationSwapper implements YamlCo
return null;
}
YamlOnRuleAlteredActionConfiguration result = new YamlOnRuleAlteredActionConfiguration();
- result.setInput(INPUT_CONFIG_SWAPPER.swapToYamlConfiguration(data.getInput()));
- result.setOutput(OUTPUT_CONFIG_SWAPPER.swapToYamlConfiguration(data.getOutput()));
+ result.setInput(READ_CONFIG_SWAPPER.swapToYamlConfiguration(data.getInput()));
+ result.setOutput(WRITE_CONFIG_SWAPPER.swapToYamlConfiguration(data.getOutput()));
result.setStreamChannel(ALGORITHM_CONFIG_SWAPPER.swapToYamlConfiguration(data.getStreamChannel()));
result.setCompletionDetector(ALGORITHM_CONFIG_SWAPPER.swapToYamlConfiguration(data.getCompletionDetector()));
result.setDataConsistencyChecker(ALGORITHM_CONFIG_SWAPPER.swapToYamlConfiguration(data.getDataConsistencyCalculator()));
@@ -55,8 +55,8 @@ public final class YamlOnRuleAlteredActionConfigurationSwapper implements YamlCo
return null;
}
return new OnRuleAlteredActionConfiguration(
- INPUT_CONFIG_SWAPPER.swapToObject(yamlConfig.getInput()),
- OUTPUT_CONFIG_SWAPPER.swapToObject(yamlConfig.getOutput()),
+ READ_CONFIG_SWAPPER.swapToObject(yamlConfig.getInput()),
+ WRITE_CONFIG_SWAPPER.swapToObject(yamlConfig.getOutput()),
ALGORITHM_CONFIG_SWAPPER.swapToObject(yamlConfig.getStreamChannel()),
ALGORITHM_CONFIG_SWAPPER.swapToObject(yamlConfig.getCompletionDetector()),
ALGORITHM_CONFIG_SWAPPER.swapToObject(yamlConfig.getDataConsistencyChecker()));
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineProcessConfigurationSwapperTest.java b/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineProcessConfigurationSwapperTest.java
index 8558f0197c1..1fc13fa0082 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineProcessConfigurationSwapperTest.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineProcessConfigurationSwapperTest.java
@@ -20,8 +20,8 @@ package org.apache.shardingsphere.infra.yaml.config.swapper.rule.data.pipeline;
import org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.infra.yaml.config.pojo.algorithm.YamlAlgorithmConfiguration;
-import org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineInputConfiguration;
-import org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineOutputConfiguration;
+import org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineReadConfiguration;
+import org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineWriteConfiguration;
import org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineProcessConfiguration;
import org.junit.Test;
@@ -41,12 +41,12 @@ public final class YamlPipelineProcessConfigurationSwapperTest {
Properties rateLimiterProps = new Properties();
rateLimiterProps.setProperty("batch-size", "1000");
rateLimiterProps.setProperty("qps", "50");
- YamlPipelineInputConfiguration yamlInputConfig = YamlPipelineInputConfiguration.buildWithDefaultValue();
- yamlConfig.setInput(yamlInputConfig);
+ YamlPipelineReadConfiguration yamlInputConfig = YamlPipelineReadConfiguration.buildWithDefaultValue();
+ yamlConfig.setRead(yamlInputConfig);
yamlInputConfig.setRateLimiter(new YamlAlgorithmConfiguration("INPUT", rateLimiterProps));
- YamlPipelineOutputConfiguration yamlOutputConfig = YamlPipelineOutputConfiguration.buildWithDefaultValue();
+ YamlPipelineWriteConfiguration yamlOutputConfig = YamlPipelineWriteConfiguration.buildWithDefaultValue();
yamlOutputConfig.setRateLimiter(new YamlAlgorithmConfiguration("OUTPUT", rateLimiterProps));
- yamlConfig.setOutput(yamlOutputConfig);
+ yamlConfig.setWrite(yamlOutputConfig);
Properties streamChannelProps = new Properties();
streamChannelProps.setProperty("block-queue-size", "10000");
yamlConfig.setStreamChannel(new YamlAlgorithmConfiguration("MEMORY", streamChannelProps));
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/rulealtered/YamlOnRuleAlteredActionConfigurationSwapperTest.java b/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/rulealtered/YamlOnRuleAlteredActionConfigurationSwapperTest.java
index 42c6f5a4e9e..2cc32ed1c3a 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/rulealtered/YamlOnRuleAlteredActionConfigurationSwapperTest.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/rulealtered/YamlOnRuleAlteredActionConfigurationSwapperTest.java
@@ -20,8 +20,8 @@ package org.apache.shardingsphere.infra.yaml.config.swapper.rule.rulealtered;
import org.apache.shardingsphere.infra.config.rule.rulealtered.OnRuleAlteredActionConfiguration;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.infra.yaml.config.pojo.algorithm.YamlAlgorithmConfiguration;
-import org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineInputConfiguration;
-import org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineOutputConfiguration;
+import org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineReadConfiguration;
+import org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineWriteConfiguration;
import org.apache.shardingsphere.infra.yaml.config.pojo.rule.YamlOnRuleAlteredActionConfiguration;
import org.junit.Test;
@@ -41,10 +41,10 @@ public final class YamlOnRuleAlteredActionConfigurationSwapperTest {
Properties rateLimiterProps = new Properties();
rateLimiterProps.setProperty("batch-size", "1000");
rateLimiterProps.setProperty("qps", "50");
- YamlPipelineInputConfiguration yamlInputConfig = YamlPipelineInputConfiguration.buildWithDefaultValue();
+ YamlPipelineReadConfiguration yamlInputConfig = YamlPipelineReadConfiguration.buildWithDefaultValue();
yamlConfig.setInput(yamlInputConfig);
yamlInputConfig.setRateLimiter(new YamlAlgorithmConfiguration("INPUT", rateLimiterProps));
- YamlPipelineOutputConfiguration yamlOutputConfig = YamlPipelineOutputConfiguration.buildWithDefaultValue();
+ YamlPipelineWriteConfiguration yamlOutputConfig = YamlPipelineWriteConfiguration.buildWithDefaultValue();
yamlOutputConfig.setRateLimiter(new YamlAlgorithmConfiguration("OUTPUT", rateLimiterProps));
yamlConfig.setOutput(yamlOutputConfig);
Properties streamChannelProps = new Properties();
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/PipelineProcessContext.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/PipelineProcessContext.java
index a0138e7ba4b..ab015372778 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/PipelineProcessContext.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/PipelineProcessContext.java
@@ -41,9 +41,9 @@ public interface PipelineProcessContext {
PipelineChannelCreator getPipelineChannelCreator();
/**
- * Get job input rate limit algorithm.
+ * Get job read rate limit algorithm.
*
- * @return job input rate limit algorithm
+ * @return job read rate limit algorithm
*/
- JobRateLimitAlgorithm getInputRateLimitAlgorithm();
+ JobRateLimitAlgorithm getReadRateLimitAlgorithm();
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
index 3a18c77820c..08f2e2a0631 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
@@ -157,7 +157,7 @@ public final class DataConsistencyChecker {
PipelineDataSourceConfiguration targetDataSourceConfig = jobConfig.getTarget();
ThreadFactory threadFactory = ExecutorThreadFactoryBuilder.build("job-" + getJobIdDigest(jobConfig.getJobId()) + "-data-check-%d");
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), threadFactory);
- JobRateLimitAlgorithm inputRateLimitAlgorithm = RuleAlteredJobWorker.createRuleAlteredContext(jobConfig).getInputRateLimitAlgorithm();
+ JobRateLimitAlgorithm readRateLimitAlgorithm = RuleAlteredJobWorker.createRuleAlteredContext(jobConfig).getReadRateLimitAlgorithm();
Map<String, DataConsistencyContentCheckResult> result = new HashMap<>(logicTableNames.size(), 1);
try (
PipelineDataSourceWrapper sourceDataSource = PipelineDataSourceFactory.newInstance(sourceDataSourceConfig);
@@ -177,8 +177,8 @@ public final class DataConsistencyChecker {
Iterator<Object> targetCalculatedResults = calculator.calculate(targetParameter).iterator();
boolean contentMatched = true;
while (sourceCalculatedResults.hasNext() && targetCalculatedResults.hasNext()) {
- if (null != inputRateLimitAlgorithm) {
- inputRateLimitAlgorithm.intercept(JobOperationType.SELECT, 1);
+ if (null != readRateLimitAlgorithm) {
+ readRateLimitAlgorithm.intercept(JobOperationType.SELECT, 1);
}
Future<Object> sourceFuture = executor.submit(sourceCalculatedResults::next);
Future<Object> targetFuture = executor.submit(targetCalculatedResults::next);
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/AbstractPipelineProcessContext.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/AbstractPipelineProcessContext.java
index 4178c1dbea0..774ca9e6145 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/AbstractPipelineProcessContext.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/AbstractPipelineProcessContext.java
@@ -30,12 +30,12 @@ import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChanne
import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithmFactory;
import org.apache.shardingsphere.infra.config.algorithm.AlgorithmConfiguration;
-import org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineInputConfiguration;
-import org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineOutputConfiguration;
+import org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineReadConfiguration;
+import org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineWriteConfiguration;
import org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
import org.apache.shardingsphere.infra.yaml.config.pojo.algorithm.YamlAlgorithmConfiguration;
-import org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineInputConfiguration;
-import org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineOutputConfiguration;
+import org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineReadConfiguration;
+import org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineWriteConfiguration;
import org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineProcessConfiguration;
import org.apache.shardingsphere.infra.yaml.config.swapper.rule.data.pipeline.YamlPipelineProcessConfigurationSwapper;
@@ -52,9 +52,9 @@ public abstract class AbstractPipelineProcessContext implements PipelineProcessC
private final PipelineProcessConfiguration pipelineProcessConfig;
- private final JobRateLimitAlgorithm inputRateLimitAlgorithm;
+ private final JobRateLimitAlgorithm readRateLimitAlgorithm;
- private final JobRateLimitAlgorithm outputRateLimitAlgorithm;
+ private final JobRateLimitAlgorithm writeRateLimitAlgorithm;
private final PipelineChannelCreator pipelineChannelCreator;
@@ -67,19 +67,19 @@ public abstract class AbstractPipelineProcessContext implements PipelineProcessC
public AbstractPipelineProcessContext(final String jobId, final PipelineProcessConfiguration originalProcessConfig) {
PipelineProcessConfiguration processConfig = convertProcessConfig(originalProcessConfig);
this.pipelineProcessConfig = processConfig;
- PipelineInputConfiguration inputConfig = processConfig.getInput();
- AlgorithmConfiguration inputRateLimiter = inputConfig.getRateLimiter();
- inputRateLimitAlgorithm = null != inputRateLimiter ? JobRateLimitAlgorithmFactory.newInstance(inputRateLimiter) : null;
- PipelineOutputConfiguration outputConfig = processConfig.getOutput();
- AlgorithmConfiguration outputRateLimiter = outputConfig.getRateLimiter();
- outputRateLimitAlgorithm = null != outputRateLimiter ? JobRateLimitAlgorithmFactory.newInstance(outputRateLimiter) : null;
+ PipelineReadConfiguration readConfig = processConfig.getRead();
+ AlgorithmConfiguration readRateLimiter = readConfig.getRateLimiter();
+ readRateLimitAlgorithm = null != readRateLimiter ? JobRateLimitAlgorithmFactory.newInstance(readRateLimiter) : null;
+ PipelineWriteConfiguration writeConfig = processConfig.getWrite();
+ AlgorithmConfiguration writeRateLimiter = writeConfig.getRateLimiter();
+ writeRateLimitAlgorithm = null != writeRateLimiter ? JobRateLimitAlgorithmFactory.newInstance(writeRateLimiter) : null;
AlgorithmConfiguration streamChannel = processConfig.getStreamChannel();
pipelineChannelCreator = PipelineChannelCreatorFactory.newInstance(streamChannel);
inventoryDumperExecuteEngineLazyInitializer = new LazyInitializer<ExecuteEngine>() {
@Override
protected ExecuteEngine initialize() {
- return ExecuteEngine.newFixedThreadInstance(inputConfig.getWorkerThread(), "Inventory-" + jobId);
+ return ExecuteEngine.newFixedThreadInstance(readConfig.getWorkerThread(), "Inventory-" + jobId);
}
};
incrementalDumperExecuteEngineLazyInitializer = new LazyInitializer<ExecuteEngine>() {
@@ -93,22 +93,22 @@ public abstract class AbstractPipelineProcessContext implements PipelineProcessC
@Override
protected ExecuteEngine initialize() {
- return ExecuteEngine.newFixedThreadInstance(outputConfig.getWorkerThread(), "Importer-" + jobId);
+ return ExecuteEngine.newFixedThreadInstance(writeConfig.getWorkerThread(), "Importer-" + jobId);
}
};
}
private PipelineProcessConfiguration convertProcessConfig(final PipelineProcessConfiguration originalProcessConfig) {
YamlPipelineProcessConfiguration yamlActionConfig = SWAPPER.swapToYamlConfiguration(originalProcessConfig);
- if (null == yamlActionConfig.getInput()) {
- yamlActionConfig.setInput(YamlPipelineInputConfiguration.buildWithDefaultValue());
+ if (null == yamlActionConfig.getRead()) {
+ yamlActionConfig.setRead(YamlPipelineReadConfiguration.buildWithDefaultValue());
} else {
- yamlActionConfig.getInput().fillInNullFieldsWithDefaultValue();
+ yamlActionConfig.getRead().fillInNullFieldsWithDefaultValue();
}
- if (null == yamlActionConfig.getOutput()) {
- yamlActionConfig.setOutput(YamlPipelineOutputConfiguration.buildWithDefaultValue());
+ if (null == yamlActionConfig.getWrite()) {
+ yamlActionConfig.setWrite(YamlPipelineWriteConfiguration.buildWithDefaultValue());
} else {
- yamlActionConfig.getOutput().fillInNullFieldsWithDefaultValue();
+ yamlActionConfig.getWrite().fillInNullFieldsWithDefaultValue();
}
if (null == yamlActionConfig.getStreamChannel()) {
yamlActionConfig.setStreamChannel(new YamlAlgorithmConfiguration(MemoryPipelineChannelCreator.TYPE, new Properties()));
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
index 072ea2756bf..7769110cd9d 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
@@ -48,7 +48,7 @@ import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
-import org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineInputConfiguration;
+import org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineReadConfiguration;
import javax.sql.DataSource;
import java.sql.Connection;
@@ -120,9 +120,9 @@ public final class InventoryTaskSplitter {
final InventoryDumperConfiguration dumperConfig) {
Collection<InventoryDumperConfiguration> result = new LinkedList<>();
PipelineProcessContext ruleAlteredContext = jobItemContext.getJobProcessContext();
- PipelineInputConfiguration inputConfig = ruleAlteredContext.getPipelineProcessConfig().getInput();
- int batchSize = inputConfig.getBatchSize();
- JobRateLimitAlgorithm rateLimitAlgorithm = ruleAlteredContext.getInputRateLimitAlgorithm();
+ PipelineReadConfiguration readConfig = ruleAlteredContext.getPipelineProcessConfig().getRead();
+ int batchSize = readConfig.getBatchSize();
+ JobRateLimitAlgorithm rateLimitAlgorithm = ruleAlteredContext.getReadRateLimitAlgorithm();
Collection<IngestPosition<?>> inventoryPositions = getInventoryPositions(jobItemContext, dumperConfig, dataSource, metaDataLoader);
int i = 0;
for (IngestPosition<?> inventoryPosition : inventoryPositions) {
@@ -201,7 +201,7 @@ public final class InventoryTaskSplitter {
PipelineJobConfiguration jobConfig = jobItemContext.getJobConfig();
String sql = PipelineSQLBuilderFactory.getInstance(jobConfig.getSourceDatabaseType())
.buildSplitByPrimaryKeyRangeSQL(dumperConfig.getSchemaName(new LogicTableName(dumperConfig.getLogicTableName())), dumperConfig.getActualTableName(), dumperConfig.getUniqueKey());
- int shardingSize = jobItemContext.getJobProcessContext().getPipelineProcessConfig().getInput().getShardingSize();
+ int shardingSize = jobItemContext.getJobProcessContext().getPipelineProcessConfig().getRead().getShardingSize();
try (
Connection connection = dataSource.getConnection();
PreparedStatement ps = connection.prepareStatement(sql)) {