You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2022/01/08 11:58:36 UTC
[shardingsphere] branch master updated: Add streamChannel in scaling config and refactor blockQueueSize config; Add PipelineChannelFactory SPI and MEMORY impl; Integrate streamChannel with incremental and inventory task (#14624)
This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 67e6590 Add streamChannel in scaling config and refactor blockQueueSize config; Add PipelineChannelFactory SPI and MEMORY impl; Integrate streamChannel with incremental and inventory task (#14624)
67e6590 is described below
commit 67e6590665caf2724de6fa96f272b369759c3172
Author: Hongsheng Zhong <sa...@126.com>
AuthorDate: Sat Jan 8 19:57:30 2022 +0800
Add streamChannel in scaling config and refactor blockQueueSize config; Add PipelineChannelFactory SPI and MEMORY impl; Integrate streamChannel with incremental and inventory task (#14624)
---
.../user-manual/shardingsphere-scaling/build.cn.md | 10 +++-
.../user-manual/shardingsphere-scaling/build.en.md | 10 +++-
.../resources/yaml/encrypt-dataConverters.yaml | 5 +-
...hardingRuleAlteredJobConfigurationPreparer.java | 14 +----
.../src/test/resources/yaml/sharding-rule.yaml | 5 +-
.../src/test/resources/yaml/sharding-scaling.yaml | 5 +-
.../OnRuleAlteredActionConfiguration.java | 4 +-
.../YamlOnRuleAlteredActionConfiguration.java | 4 +-
...nRuleAlteredActionConfigurationYamlSwapper.java | 5 +-
...eAlteredActionConfigurationYamlSwapperTest.java | 6 +-
.../distribution/AutoAcknowledgeChannel.java | 2 +-
.../ingest/channel/distribution/BitSetChannel.java | 7 +--
.../channel/distribution/BlockingQueueChannel.java | 12 ++--
.../ingest/channel/distribution/MemoryChannel.java | 10 ++--
.../ingest/dumper/AbstractInventoryDumper.java | 5 +-
.../core/prepare/InventoryTaskSplitter.java | 6 +-
.../channel/MemoryPipelineChannelFactory.java | 68 ++++++++++++++++++++++
.../data/pipeline/core/task/IncrementalTask.java | 12 +++-
.../data/pipeline/core/task/InventoryTask.java | 11 +++-
.../scenario/rulealtered/RuleAlteredContext.java | 12 ++++
.../rulealtered/RuleAlteredJobPreparer.java | 7 ++-
...eline.spi.ingest.channel.PipelineChannelFactory | 27 +--------
.../mysql/ingest/MySQLIncrementalDumper.java | 5 +-
.../opengauss/ingest/OpenGaussWalDumper.java | 5 +-
.../postgresql/ingest/PostgreSQLWalDumper.java | 5 +-
.../api/config/ingest/DumperConfiguration.java | 2 -
.../ingest/InventoryDumperConfiguration.java | 1 -
.../pipeline/api}/ingest/channel/AckCallback.java | 2 +-
.../data/pipeline/api/ingest/channel/Channel.java | 13 +++--
.../ingest/channel/PipelineChannelFactory.java} | 21 ++++---
.../src/main/resources/conf/config-sharding.yaml | 10 +++-
.../api/impl/GovernanceRepositoryAPIImplTest.java | 4 +-
.../channel/distribution/MemoryChannelTest.java | 2 +-
.../core/prepare/InventoryTaskSplitterTest.java | 16 +++--
.../pipeline/core/task/IncrementalTaskTest.java | 3 +-
.../data/pipeline/core/task/InventoryTaskTest.java | 6 +-
.../pipeline/core/util/RuleAlteredContextUtil.java | 13 +++++
.../config_sharding_sphere_jdbc_source.yaml | 5 +-
.../config_sharding_sphere_jdbc_target.yaml | 5 +-
39 files changed, 238 insertions(+), 127 deletions(-)
diff --git a/docs/document/content/user-manual/shardingsphere-scaling/build.cn.md b/docs/document/content/user-manual/shardingsphere-scaling/build.cn.md
index 50547dc..9ea5edd 100644
--- a/docs/document/content/user-manual/shardingsphere-scaling/build.cn.md
+++ b/docs/document/content/user-manual/shardingsphere-scaling/build.cn.md
@@ -53,7 +53,6 @@ rules:
scalingName: # 启用的弹性伸缩配置名称
scaling:
<scaling-action-config-name> (+):
- blockQueueSize: # 数据通道阻塞队列大小
input:
workerThread: # 从源端摄取全量数据的线程池大小
batchSize: # 一次查询操作返回的最大记录数
@@ -68,6 +67,10 @@ rules:
type: # 算法类型。可选项:TPS
props: # 算法属性
tps: # tps属性。适用算法类型:TPS
+ streamChannel: # 数据通道,连接生产者和消费者,用于 input 和 output 环节。如果没配置则默认使用 MEMORY 类型
+ type: # 算法类型。可选项:MEMORY
+ props: # 算法属性
+ block-queue-size: # 属性:阻塞队列大小
completionDetector: # 作业是否接近完成检测算法。如果不配置,那么系统无法自动进行后续步骤,可以通过 DistSQL 手动操作。
type: # 算法类型。可选项:IDLE
props: # 算法属性
@@ -87,7 +90,6 @@ rules:
scalingName: default_scaling
scaling:
default_scaling:
- blockQueueSize: 10000
input:
workerThread: 40
batchSize: 1000
@@ -102,6 +104,10 @@ rules:
type: TPS
props:
tps: 2000
+ streamChannel:
+ type: MEMORY
+ props:
+ block-queue-size: 10000
completionDetector:
type: IDLE
props:
diff --git a/docs/document/content/user-manual/shardingsphere-scaling/build.en.md b/docs/document/content/user-manual/shardingsphere-scaling/build.en.md
index 3fe6b41..fa58430 100644
--- a/docs/document/content/user-manual/shardingsphere-scaling/build.en.md
+++ b/docs/document/content/user-manual/shardingsphere-scaling/build.en.md
@@ -52,7 +52,6 @@ rules:
scalingName: # Enabled scaling action config name
scaling:
<scaling-action-config-name> (+):
- blockQueueSize: # Data channel blocking queue size
input:
workerThread: # Worker thread pool size for inventory data ingestion from source
batchSize: # Maximum records count of a DML select operation
@@ -67,6 +66,10 @@ rules:
type: # Algorithm type. Options: TPS
props: # Algorithm properties
tps: # TPS property. Available for types: TPS
+ streamChannel: # Algorithm of channel that connect producer and consumer, used for input and output. If it's not configured, then system will use MEMORY type
+ type: # Algorithm type. Options: MEMORY
+ props: # Algorithm properties
+ block-queue-size: # Property: data channel block queue size. Available for types: MEMORY
completionDetector: # Completion detect algorithm. If it's not configured, then system won't continue to do next steps automatically.
type: # Algorithm type. Options: IDLE
props: # Algorithm properties
@@ -86,7 +89,6 @@ rules:
scalingName: default_scaling
scaling:
default_scaling:
- blockQueueSize: 10000
input:
workerThread: 40
batchSize: 1000
@@ -101,6 +103,10 @@ rules:
type: TPS
props:
tps: 2000
+ streamChannel:
+ type: MEMORY
+ props:
+ block-queue-size: 10000
completionDetector:
type: IDLE
props:
diff --git a/shardingsphere-features/shardingsphere-encrypt/shardingsphere-encrypt-core/src/test/resources/yaml/encrypt-dataConverters.yaml b/shardingsphere-features/shardingsphere-encrypt/shardingsphere-encrypt-core/src/test/resources/yaml/encrypt-dataConverters.yaml
index 39d44da..04a1ae2 100644
--- a/shardingsphere-features/shardingsphere-encrypt/shardingsphere-encrypt-core/src/test/resources/yaml/encrypt-dataConverters.yaml
+++ b/shardingsphere-features/shardingsphere-encrypt/shardingsphere-encrypt-core/src/test/resources/yaml/encrypt-dataConverters.yaml
@@ -18,7 +18,6 @@
dataConverterName: default_convert
dataConverters:
default_convert:
- blockQueueSize: 10000
input:
workerThread: 40
batchSize: 1000
@@ -33,6 +32,10 @@ dataConverters:
type: TPS
props:
tps: 2000
+ streamChannel:
+ type: MEMORY
+ props:
+ block-queue-size: 10000
completionDetector:
type: IDLE
props:
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/schedule/ShardingRuleAlteredJobConfigurationPreparer.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/schedule/ShardingRuleAlteredJobConfigurationPreparer.java
index b761b6e..aa2419c 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/schedule/ShardingRuleAlteredJobConfigurationPreparer.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/schedule/ShardingRuleAlteredJobConfigurationPreparer.java
@@ -34,7 +34,6 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.Shardi
import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredJobConfigurationPreparer;
import org.apache.shardingsphere.infra.config.datasource.DataSourceConfiguration;
-import org.apache.shardingsphere.infra.config.rulealtered.OnRuleAlteredActionConfiguration;
import org.apache.shardingsphere.infra.datanode.DataNode;
import org.apache.shardingsphere.infra.yaml.config.swapper.YamlDataSourceConfigurationSwapper;
import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
@@ -140,18 +139,13 @@ public final class ShardingRuleAlteredJobConfigurationPreparer implements RuleAl
tableMap.put(dataNode.getTableName(), each.getLogicTableName());
}
}
- OnRuleAlteredActionConfiguration ruleAlteredActionConfig = getRuleAlteredActionConfig(targetRuleConfig.orElse(sourceRuleConfig)).orElse(null);
- DumperConfiguration dumperConfig = createDumperConfig(dataSourceName, sourceDataSource.get(dataSourceName).getProps(), tableMap, ruleAlteredActionConfig);
+ DumperConfiguration dumperConfig = createDumperConfig(dataSourceName, sourceDataSource.get(dataSourceName).getProps(), tableMap);
ImporterConfiguration importerConfig = createImporterConfig(pipelineConfig, handleConfig, shardingColumnsMap);
TaskConfiguration taskConfig = new TaskConfiguration(handleConfig, dumperConfig, importerConfig);
log.info("toTaskConfigs, dataSourceName={}, taskConfig={}", dataSourceName, taskConfig);
return Collections.singletonList(taskConfig);
}
- private Optional<OnRuleAlteredActionConfiguration> getRuleAlteredActionConfig(final ShardingRuleConfiguration shardingRuleConfig) {
- return Optional.ofNullable(shardingRuleConfig.getScaling().get(shardingRuleConfig.getScalingName()));
- }
-
private static ShardingSpherePipelineDataSourceConfiguration getSourceConfiguration(final PipelineConfiguration pipelineConfig) {
PipelineDataSourceConfiguration result = PipelineDataSourceConfigurationFactory.newInstance(pipelineConfig.getSource().getType(), pipelineConfig.getSource().getParameter());
return (ShardingSpherePipelineDataSourceConfiguration) result;
@@ -194,15 +188,11 @@ public final class ShardingRuleAlteredJobConfigurationPreparer implements RuleAl
return Collections.emptySet();
}
- private static DumperConfiguration createDumperConfig(final String dataSourceName, final Map<String, Object> props, final Map<String, String> tableMap,
- final OnRuleAlteredActionConfiguration ruleAlteredActionConfig) {
+ private static DumperConfiguration createDumperConfig(final String dataSourceName, final Map<String, Object> props, final Map<String, String> tableMap) {
DumperConfiguration result = new DumperConfiguration();
result.setDataSourceName(dataSourceName);
result.setDataSourceConfig(new StandardPipelineDataSourceConfiguration(YamlEngine.marshal(props)));
result.setTableNameMap(tableMap);
- if (null != ruleAlteredActionConfig) {
- result.setBlockQueueSize(ruleAlteredActionConfig.getBlockQueueSize());
- }
return result;
}
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-rule.yaml b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-rule.yaml
index 4dd83cf..58f8850 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-rule.yaml
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-rule.yaml
@@ -108,7 +108,6 @@ rules:
scalingName: default_scaling
scaling:
default_scaling:
- blockQueueSize: 10000
input:
workerThread: 40
batchSize: 1000
@@ -123,6 +122,10 @@ rules:
type: TPS
props:
tps: 2000
+ streamChannel:
+ type: MEMORY
+ props:
+ block-queue-size: 10000
completionDetector:
type: IDLE
props:
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-scaling.yaml b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-scaling.yaml
index bb8027d..20ee9fe 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-scaling.yaml
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-scaling.yaml
@@ -18,7 +18,6 @@
scalingName: default_scaling
scaling:
default_scaling:
- blockQueueSize: 10000
input:
workerThread: 40
batchSize: 1000
@@ -33,6 +32,10 @@ scaling:
type: TPS
props:
tps: 2000
+ streamChannel:
+ type: MEMORY
+ props:
+ block-queue-size: 10000
completionDetector:
type: IDLE
props:
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rulealtered/OnRuleAlteredActionConfiguration.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rulealtered/OnRuleAlteredActionConfiguration.java
index 07a82a1..f917025 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rulealtered/OnRuleAlteredActionConfiguration.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rulealtered/OnRuleAlteredActionConfiguration.java
@@ -30,12 +30,12 @@ import org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmC
@ToString
public final class OnRuleAlteredActionConfiguration {
- private final int blockQueueSize;
-
private final InputConfiguration input;
private final OutputConfiguration output;
+ private final ShardingSphereAlgorithmConfiguration streamChannel;
+
private final ShardingSphereAlgorithmConfiguration completionDetector;
private final ShardingSphereAlgorithmConfiguration dataConsistencyChecker;
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/rulealtered/YamlOnRuleAlteredActionConfiguration.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/rulealtered/YamlOnRuleAlteredActionConfiguration.java
index 4934ea8..d8875e7 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/rulealtered/YamlOnRuleAlteredActionConfiguration.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/rulealtered/YamlOnRuleAlteredActionConfiguration.java
@@ -32,12 +32,12 @@ import org.apache.shardingsphere.infra.yaml.config.pojo.algorithm.YamlShardingSp
@ToString
public final class YamlOnRuleAlteredActionConfiguration implements YamlConfiguration {
- private int blockQueueSize = 10000;
-
private YamlInputConfiguration input;
private YamlOutputConfiguration output;
+ private YamlShardingSphereAlgorithmConfiguration streamChannel;
+
private YamlShardingSphereAlgorithmConfiguration completionDetector;
private YamlShardingSphereAlgorithmConfiguration dataConsistencyChecker;
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rulealtered/OnRuleAlteredActionConfigurationYamlSwapper.java b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rulealtered/OnRuleAlteredActionConfigurationYamlSwapper.java
index 11e959e..744e171 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rulealtered/OnRuleAlteredActionConfigurationYamlSwapper.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rulealtered/OnRuleAlteredActionConfigurationYamlSwapper.java
@@ -43,9 +43,9 @@ public final class OnRuleAlteredActionConfigurationYamlSwapper implements YamlCo
return null;
}
YamlOnRuleAlteredActionConfiguration result = new YamlOnRuleAlteredActionConfiguration();
- result.setBlockQueueSize(data.getBlockQueueSize());
result.setInput(INPUT_CONFIG_SWAPPER.swapToYamlConfiguration(data.getInput()));
result.setOutput(OUTPUT_CONFIG_SWAPPER.swapToYamlConfiguration(data.getOutput()));
+ result.setStreamChannel(ALGORITHM_CONFIG_YAML_SWAPPER.swapToYamlConfiguration(data.getStreamChannel()));
result.setCompletionDetector(ALGORITHM_CONFIG_YAML_SWAPPER.swapToYamlConfiguration(data.getCompletionDetector()));
result.setDataConsistencyChecker(ALGORITHM_CONFIG_YAML_SWAPPER.swapToYamlConfiguration(data.getDataConsistencyChecker()));
return result;
@@ -56,9 +56,10 @@ public final class OnRuleAlteredActionConfigurationYamlSwapper implements YamlCo
if (null == yamlConfig) {
return null;
}
- return new OnRuleAlteredActionConfiguration(yamlConfig.getBlockQueueSize(),
+ return new OnRuleAlteredActionConfiguration(
INPUT_CONFIG_SWAPPER.swapToObject(yamlConfig.getInput()),
OUTPUT_CONFIG_SWAPPER.swapToObject(yamlConfig.getOutput()),
+ ALGORITHM_CONFIG_YAML_SWAPPER.swapToObject(yamlConfig.getStreamChannel()),
ALGORITHM_CONFIG_YAML_SWAPPER.swapToObject(yamlConfig.getCompletionDetector()),
ALGORITHM_CONFIG_YAML_SWAPPER.swapToObject(yamlConfig.getDataConsistencyChecker()));
}
diff --git a/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/yaml/config/swapper/rulealtered/OnRuleAlteredActionConfigurationYamlSwapperTest.java b/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/yaml/config/swapper/rulealtered/OnRuleAlteredActionConfigurationYamlSwapperTest.java
index 25b9f8f..e8b0a67 100644
--- a/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/yaml/config/swapper/rulealtered/OnRuleAlteredActionConfigurationYamlSwapperTest.java
+++ b/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/yaml/config/swapper/rulealtered/OnRuleAlteredActionConfigurationYamlSwapperTest.java
@@ -38,20 +38,22 @@ public final class OnRuleAlteredActionConfigurationYamlSwapperTest {
@Test
public void assertSwap() {
YamlOnRuleAlteredActionConfiguration yamlConfig = new YamlOnRuleAlteredActionConfiguration();
- yamlConfig.setBlockQueueSize(1000);
Properties rateLimiterProps = new Properties();
rateLimiterProps.setProperty("batch-size", "1000");
rateLimiterProps.setProperty("qps", "50");
YamlInputConfiguration yamlInputConfig = new YamlInputConfiguration();
+ yamlConfig.setInput(yamlInputConfig);
yamlInputConfig.setWorkerThread(40);
yamlInputConfig.setBatchSize(1000);
yamlInputConfig.setRateLimiter(new YamlShardingSphereAlgorithmConfiguration("INPUT", rateLimiterProps));
- yamlConfig.setInput(yamlInputConfig);
YamlOutputConfiguration yamlOutputConfig = new YamlOutputConfiguration();
yamlOutputConfig.setWorkerThread(40);
yamlOutputConfig.setBatchSize(1000);
yamlOutputConfig.setRateLimiter(new YamlShardingSphereAlgorithmConfiguration("OUTPUT", rateLimiterProps));
yamlConfig.setOutput(yamlOutputConfig);
+ Properties streamChannelProps = new Properties();
+ streamChannelProps.setProperty("block-queue-size", "10000");
+ yamlConfig.setStreamChannel(new YamlShardingSphereAlgorithmConfiguration("MEMORY", streamChannelProps));
Properties completionDetectorProps = new Properties();
completionDetectorProps.setProperty("incremental-task-idle-minute-threshold", "30");
yamlConfig.setCompletionDetector(new YamlShardingSphereAlgorithmConfiguration("IDLE", completionDetectorProps));
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/AutoAcknowledgeChannel.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/AutoAcknowledgeChannel.java
index 2d04a30..ea1e508 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/AutoAcknowledgeChannel.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/AutoAcknowledgeChannel.java
@@ -34,7 +34,7 @@ public final class AutoAcknowledgeChannel extends AbstractBitSetChannel {
}
@Override
- public List<Record> fetchRecords(final int batchSize, final int timeout) {
+ public List<Record> fetchRecords(final int batchSize, final int timeoutSeconds) {
throw new UnsupportedOperationException("Auto ack channel can not fetch records.");
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/BitSetChannel.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/BitSetChannel.java
index 77a4cde..0ca3b30 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/BitSetChannel.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/BitSetChannel.java
@@ -32,18 +32,17 @@ public interface BitSetChannel {
*
* @param dataRecord data
* @param index data index
- * @throws InterruptedException if thread interrupted
*/
- void pushRecord(Record dataRecord, long index) throws InterruptedException;
+ void pushRecord(Record dataRecord, long index);
/**
* Fetch {@code Record} from channel, if the timeout also returns the record.
*
* @param batchSize record batch size
- * @param timeout timeout(seconds)
+ * @param timeoutSeconds timeout(seconds)
* @return record
*/
- List<Record> fetchRecords(int batchSize, int timeout);
+ List<Record> fetchRecords(int batchSize, int timeoutSeconds);
/**
* Ack the last batch.
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/BlockingQueueChannel.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/BlockingQueueChannel.java
index b4d4c24..e88cebd 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/BlockingQueueChannel.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/BlockingQueueChannel.java
@@ -43,18 +43,22 @@ public final class BlockingQueueChannel extends AbstractBitSetChannel {
}
@Override
- public void pushRecord(final Record dataRecord, final long index) throws InterruptedException {
+ public void pushRecord(final Record dataRecord, final long index) {
getManualBitSet().set(index);
- queue.put(dataRecord);
+ try {
+ queue.put(dataRecord);
+ } catch (final InterruptedException ex) {
+ throw new RuntimeException("put " + dataRecord + " into queue at index " + index + " failed", ex);
+ }
}
// TODO thread-safe?
@Override
- public List<Record> fetchRecords(final int batchSize, final int timeout) {
+ public List<Record> fetchRecords(final int batchSize, final int timeoutSeconds) {
List<Record> result = new ArrayList<>(batchSize);
long start = System.currentTimeMillis();
while (batchSize > queue.size()) {
- if (timeout * 1000L <= System.currentTimeMillis() - start) {
+ if (timeoutSeconds * 1000L <= System.currentTimeMillis() - start) {
break;
}
ThreadUtil.sleep(100L);
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/MemoryChannel.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/MemoryChannel.java
index bc13ae1..b704955 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/MemoryChannel.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/MemoryChannel.java
@@ -18,12 +18,12 @@
package org.apache.shardingsphere.data.pipeline.core.ingest.channel.distribution;
import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.ingest.channel.AckCallback;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.Channel;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
-import org.apache.shardingsphere.data.pipeline.core.ingest.channel.AckCallback;
import java.util.BitSet;
import java.util.HashMap;
@@ -87,7 +87,7 @@ public final class MemoryChannel implements Channel {
}
@Override
- public void pushRecord(final Record record) throws InterruptedException {
+ public void pushRecord(final Record record) {
if (FinishedRecord.class.equals(record.getClass())) {
for (int i = 0; i < channels.length; i++) {
pushRecord(record, i);
@@ -101,14 +101,14 @@ public final class MemoryChannel implements Channel {
}
}
- private void pushRecord(final Record record, final int index) throws InterruptedException {
+ private void pushRecord(final Record record, final int index) {
toBeAckBitSetIndexes.add(index);
getBitSetChannel(index).pushRecord(record, indexAutoIncreaseGenerator.getAndIncrement());
}
@Override
- public List<Record> fetchRecords(final int batchSize, final int timeout) {
- return findChannel().fetchRecords(batchSize, timeout);
+ public List<Record> fetchRecords(final int batchSize, final int timeoutSeconds) {
+ return findChannel().fetchRecords(batchSize, timeoutSeconds);
}
@Override
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
index 6860835..98b39bb 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
@@ -187,9 +187,6 @@ public abstract class AbstractInventoryDumper extends AbstractLifecycleExecutor
}
private void pushRecord(final Record record) {
- try {
- channel.pushRecord(record);
- } catch (final InterruptedException ignored) {
- }
+ channel.pushRecord(record);
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
index b7a1b03..0b2cd1a 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
@@ -32,6 +32,7 @@ import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepare
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
+import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelFactory;
import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
import org.apache.shardingsphere.infra.config.rulealtered.OnRuleAlteredActionConfiguration.InputConfiguration;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
@@ -64,15 +65,16 @@ public final class InventoryTaskSplitter {
* @param jobContext job context
* @param taskConfig task configuration
* @param dataSourceManager data source manager
+ * @param pipelineChannelFactory channel factory
* @param importerExecuteEngine execute engine
* @return split inventory data task
*/
// TODO remove jobContext, use init JobProgress - sourceDatabaseType - readBatchSize - rateLimitAlgorithm
public List<InventoryTask> splitInventoryData(final RuleAlteredJobContext jobContext, final TaskConfiguration taskConfig, final PipelineDataSourceManager dataSourceManager,
- final ExecuteEngine importerExecuteEngine) {
+ final PipelineChannelFactory pipelineChannelFactory, final ExecuteEngine importerExecuteEngine) {
List<InventoryTask> result = new LinkedList<>();
for (InventoryDumperConfiguration each : splitDumperConfig(jobContext, taskConfig.getDumperConfig(), dataSourceManager)) {
- result.add(new InventoryTask(each, taskConfig.getImporterConfig(), importerExecuteEngine));
+ result.add(new InventoryTask(each, taskConfig.getImporterConfig(), pipelineChannelFactory, importerExecuteEngine));
}
return result;
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/ingest/channel/MemoryPipelineChannelFactory.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/ingest/channel/MemoryPipelineChannelFactory.java
new file mode 100644
index 0000000..44d00fd
--- /dev/null
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/ingest/channel/MemoryPipelineChannelFactory.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.spi.ingest.channel;
+
+import com.google.common.base.Strings;
+import org.apache.shardingsphere.data.pipeline.api.ingest.channel.AckCallback;
+import org.apache.shardingsphere.data.pipeline.api.ingest.channel.Channel;
+import org.apache.shardingsphere.data.pipeline.core.ingest.channel.distribution.MemoryChannel;
+import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelFactory;
+
+import java.util.Properties;
+
+/**
+ * Memory implementation of pipeline channel factory.
+ */
+public final class MemoryPipelineChannelFactory implements PipelineChannelFactory {
+
+ public static final String TYPE = "MEMORY";
+
+ private static final String BLOCK_QUEUE_SIZE_KEY = "block-queue-size";
+
+ private int blockQueueSize = 10000;
+
+ private Properties props = new Properties();
+
+ @Override
+ public Properties getProps() {
+ return props;
+ }
+
+ @Override
+ public void setProps(final Properties props) {
+ this.props = props;
+ }
+
+ @Override
+ public void init() {
+ String blockQueueSizeValue = props.getProperty(BLOCK_QUEUE_SIZE_KEY);
+ if (!Strings.isNullOrEmpty(blockQueueSizeValue)) {
+ blockQueueSize = Integer.parseInt(blockQueueSizeValue);
+ }
+ }
+
+ @Override
+ public Channel createPipelineChannel(final int outputConcurrency, final AckCallback ackCallback) {
+ return new MemoryChannel(outputConcurrency, blockQueueSize, ackCallback);
+ }
+
+ @Override
+ public String getType() {
+ return TYPE;
+ }
+}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
index 3c1ce54..c1289c0 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
@@ -23,6 +23,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
+import org.apache.shardingsphere.data.pipeline.api.ingest.channel.Channel;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskProgress;
@@ -30,9 +31,9 @@ import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourc
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobExecutionException;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
-import org.apache.shardingsphere.data.pipeline.core.ingest.channel.distribution.MemoryChannel;
import org.apache.shardingsphere.data.pipeline.spi.importer.Importer;
import org.apache.shardingsphere.data.pipeline.spi.importer.ImporterListener;
+import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelFactory;
import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.Dumper;
import org.apache.shardingsphere.scaling.core.job.dumper.DumperFactory;
import org.apache.shardingsphere.scaling.core.job.importer.ImporterFactory;
@@ -53,12 +54,15 @@ public final class IncrementalTask extends AbstractLifecycleExecutor implements
@Getter
private final String taskId;
+ // TODO put in `output` config and ImporterConfiguration. Why is it just needed for incremental task but not inventory task.
private final int concurrency;
private final DumperConfiguration dumperConfig;
private final ImporterConfiguration importerConfig;
+ private final PipelineChannelFactory pipelineChannelFactory;
+
private final ExecuteEngine incrementalDumperExecuteEngine;
private final PipelineDataSourceManager dataSourceManager;
@@ -68,10 +72,12 @@ public final class IncrementalTask extends AbstractLifecycleExecutor implements
@Getter
private final IncrementalTaskProgress progress;
- public IncrementalTask(final int concurrency, final DumperConfiguration dumperConfig, final ImporterConfiguration importerConfig, final ExecuteEngine incrementalDumperExecuteEngine) {
+ public IncrementalTask(final int concurrency, final DumperConfiguration dumperConfig, final ImporterConfiguration importerConfig,
+ final PipelineChannelFactory pipelineChannelFactory, final ExecuteEngine incrementalDumperExecuteEngine) {
this.concurrency = concurrency;
this.dumperConfig = dumperConfig;
this.importerConfig = importerConfig;
+ this.pipelineChannelFactory = pipelineChannelFactory;
this.incrementalDumperExecuteEngine = incrementalDumperExecuteEngine;
dataSourceManager = new PipelineDataSourceManager();
taskId = dumperConfig.getDataSourceName();
@@ -100,7 +106,7 @@ public final class IncrementalTask extends AbstractLifecycleExecutor implements
}
private void instanceChannel(final Collection<Importer> importers) {
- MemoryChannel channel = new MemoryChannel(importers.size(), dumperConfig.getBlockQueueSize(), records -> {
+ Channel channel = pipelineChannelFactory.createPipelineChannel(importers.size(), records -> {
Record lastHandledRecord = records.get(records.size() - 1);
if (!(lastHandledRecord.getPosition() instanceof PlaceholderPosition)) {
progress.setPosition(lastHandledRecord.getPosition());
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
index f9221bf..5c4ecea 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
@@ -23,6 +23,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
+import org.apache.shardingsphere.data.pipeline.api.ingest.channel.Channel;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
@@ -31,8 +32,8 @@ import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourc
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobExecutionException;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
-import org.apache.shardingsphere.data.pipeline.core.ingest.channel.distribution.MemoryChannel;
import org.apache.shardingsphere.data.pipeline.spi.importer.Importer;
+import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelFactory;
import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.Dumper;
import org.apache.shardingsphere.scaling.core.job.dumper.DumperFactory;
import org.apache.shardingsphere.scaling.core.job.importer.ImporterFactory;
@@ -55,6 +56,8 @@ public final class InventoryTask extends AbstractLifecycleExecutor implements Pi
private final ImporterConfiguration importerConfig;
+ private final PipelineChannelFactory pipelineChannelFactory;
+
private final ExecuteEngine importerExecuteEngine;
private final PipelineDataSourceManager dataSourceManager;
@@ -63,9 +66,11 @@ public final class InventoryTask extends AbstractLifecycleExecutor implements Pi
private volatile IngestPosition<?> position;
- public InventoryTask(final InventoryDumperConfiguration inventoryDumperConfig, final ImporterConfiguration importerConfig, final ExecuteEngine importerExecuteEngine) {
+ public InventoryTask(final InventoryDumperConfiguration inventoryDumperConfig, final ImporterConfiguration importerConfig,
+ final PipelineChannelFactory pipelineChannelFactory, final ExecuteEngine importerExecuteEngine) {
this.inventoryDumperConfig = inventoryDumperConfig;
this.importerConfig = importerConfig;
+ this.pipelineChannelFactory = pipelineChannelFactory;
this.importerExecuteEngine = importerExecuteEngine;
this.dataSourceManager = new PipelineDataSourceManager();
taskId = generateTaskId(inventoryDumperConfig);
@@ -106,7 +111,7 @@ public final class InventoryTask extends AbstractLifecycleExecutor implements Pi
}
private void instanceChannel(final Importer importer) {
- MemoryChannel channel = new MemoryChannel(inventoryDumperConfig.getBlockQueueSize(), records -> {
+ Channel channel = pipelineChannelFactory.createPipelineChannel(1, records -> {
Optional<Record> record = records.stream().filter(each -> !(each.getPosition() instanceof PlaceholderPosition)).reduce((a, b) -> b);
record.ifPresent(value -> position = value.getPosition());
});
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
index 8bb0bdf..dd275ac 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
@@ -21,8 +21,10 @@ import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.detect.RuleAlteredJobAlmostCompletedParameter;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
+import org.apache.shardingsphere.data.pipeline.core.spi.ingest.channel.MemoryPipelineChannelFactory;
import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCheckAlgorithm;
import org.apache.shardingsphere.data.pipeline.spi.detect.JobCompletionDetectAlgorithm;
+import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelFactory;
import org.apache.shardingsphere.data.pipeline.spi.lock.RowBasedJobLockAlgorithm;
import org.apache.shardingsphere.data.pipeline.spi.lock.RuleBasedJobLockAlgorithm;
import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
@@ -39,6 +41,8 @@ import org.apache.shardingsphere.infra.yaml.config.swapper.rulealtered.OnRuleAlt
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.spi.ShardingSphereServiceLoader;
+import java.util.Properties;
+
/**
* Rule altered context.
*/
@@ -49,6 +53,7 @@ public final class RuleAlteredContext {
static {
ShardingSphereServiceLoader.register(JobRateLimitAlgorithm.class);
+ ShardingSphereServiceLoader.register(PipelineChannelFactory.class);
ShardingSphereServiceLoader.register(JobCompletionDetectAlgorithm.class);
ShardingSphereServiceLoader.register(RowBasedJobLockAlgorithm.class);
ShardingSphereServiceLoader.register(DataConsistencyCheckAlgorithm.class);
@@ -65,6 +70,8 @@ public final class RuleAlteredContext {
private final JobRateLimitAlgorithm outputRateLimitAlgorithm;
+ private final PipelineChannelFactory pipelineChannelFactory;
+
private final JobCompletionDetectAlgorithm<RuleAlteredJobAlmostCompletedParameter> completionDetectAlgorithm;
private final RowBasedJobLockAlgorithm sourceWritingStopAlgorithm;
@@ -93,6 +100,11 @@ public final class RuleAlteredContext {
}
ShardingSphereAlgorithmConfiguration outputRateLimiter = outputConfig.getRateLimiter();
outputRateLimitAlgorithm = null != outputRateLimiter ? ShardingSphereAlgorithmFactory.createAlgorithm(outputRateLimiter, JobRateLimitAlgorithm.class) : null;
+ ShardingSphereAlgorithmConfiguration streamChannel = onRuleAlteredActionConfig.getStreamChannel();
+ if (null == streamChannel) {
+ streamChannel = new ShardingSphereAlgorithmConfiguration(MemoryPipelineChannelFactory.TYPE, new Properties());
+ }
+ pipelineChannelFactory = ShardingSphereAlgorithmFactory.createAlgorithm(streamChannel, PipelineChannelFactory.class);
ShardingSphereAlgorithmConfiguration completionDetector = onRuleAlteredActionConfig.getCompletionDetector();
completionDetectAlgorithm = null != completionDetector ? ShardingSphereAlgorithmFactory.createAlgorithm(completionDetector, JobCompletionDetectAlgorithm.class) : null;
sourceWritingStopAlgorithm = null;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
index 04c707b..480d6c5 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
@@ -32,6 +32,7 @@ import org.apache.shardingsphere.data.pipeline.core.prepare.InventoryTaskSplitte
import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
import org.apache.shardingsphere.data.pipeline.spi.check.datasource.DataSourceChecker;
+import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelFactory;
import org.apache.shardingsphere.data.pipeline.spi.ingest.position.PositionInitializer;
import org.apache.shardingsphere.data.pipeline.spi.rulealtered.DataSourcePreparer;
import org.apache.shardingsphere.scaling.core.job.check.EnvironmentCheckerFactory;
@@ -110,20 +111,22 @@ public final class RuleAlteredJobPreparer {
}
private void initInventoryTasks(final RuleAlteredJobContext jobContext, final PipelineDataSourceManager dataSourceManager) {
+ PipelineChannelFactory pipelineChannelFactory = jobContext.getRuleAlteredContext().getPipelineChannelFactory();
ExecuteEngine importerExecuteEngine = jobContext.getRuleAlteredContext().getImporterExecuteEngine();
List<InventoryTask> allInventoryTasks = new LinkedList<>();
for (TaskConfiguration each : jobContext.getTaskConfigs()) {
- allInventoryTasks.addAll(inventoryTaskSplitter.splitInventoryData(jobContext, each, dataSourceManager, importerExecuteEngine));
+ allInventoryTasks.addAll(inventoryTaskSplitter.splitInventoryData(jobContext, each, dataSourceManager, pipelineChannelFactory, importerExecuteEngine));
}
jobContext.getInventoryTasks().addAll(allInventoryTasks);
}
private void initIncrementalTasks(final RuleAlteredJobContext jobContext, final PipelineDataSourceManager dataSourceManager) throws SQLException {
+ PipelineChannelFactory pipelineChannelFactory = jobContext.getRuleAlteredContext().getPipelineChannelFactory();
ExecuteEngine incrementalDumperExecuteEngine = jobContext.getRuleAlteredContext().getIncrementalDumperExecuteEngine();
for (TaskConfiguration each : jobContext.getTaskConfigs()) {
each.getDumperConfig().setPosition(getIncrementalPosition(jobContext, each, dataSourceManager));
jobContext.getIncrementalTasks().add(new IncrementalTask(each.getHandleConfig().getConcurrency(),
- each.getDumperConfig(), each.getImporterConfig(), incrementalDumperExecuteEngine));
+ each.getDumperConfig(), each.getImporterConfig(), pipelineChannelFactory, incrementalDumperExecuteEngine));
}
}
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-scaling.yaml b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelFactory
similarity index 59%
copy from shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-scaling.yaml
copy to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelFactory
index bb8027d..7c0c3d6 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/sharding-scaling.yaml
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelFactory
@@ -15,29 +15,4 @@
# limitations under the License.
#
-scalingName: default_scaling
-scaling:
- default_scaling:
- blockQueueSize: 10000
- input:
- workerThread: 40
- batchSize: 1000
- rateLimiter:
- type: QPS
- props:
- qps: 50
- output:
- workerThread: 40
- batchSize: 1000
- rateLimiter:
- type: TPS
- props:
- tps: 2000
- completionDetector:
- type: IDLE
- props:
- incremental-task-idle-minute-threshold: 30
- dataConsistencyChecker:
- type: DATA_MATCH
- props:
- chunk-size: 1000
+org.apache.shardingsphere.data.pipeline.core.spi.ingest.channel.MemoryPipelineChannelFactory
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
index def627a..da8b6a1 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
@@ -199,9 +199,6 @@ public final class MySQLIncrementalDumper extends AbstractLifecycleExecutor impl
}
private void pushRecord(final Record record) {
- try {
- channel.pushRecord(record);
- } catch (final InterruptedException ignored) {
- }
+ channel.pushRecord(record);
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
index 504da0e..eb1d197 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
@@ -130,10 +130,7 @@ public final class OpenGaussWalDumper extends AbstractLifecycleExecutor implemen
}
private void pushRecord(final Record record) {
- try {
- channel.pushRecord(record);
- } catch (final InterruptedException ignored) {
- }
+ channel.pushRecord(record);
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java
index 20c046a..fcd6a4c 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java
@@ -100,10 +100,7 @@ public final class PostgreSQLWalDumper extends AbstractLifecycleExecutor impleme
}
private void pushRecord(final Record record) {
- try {
- channel.pushRecord(record);
- } catch (final InterruptedException ignored) {
- }
+ channel.pushRecord(record);
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/DumperConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/DumperConfiguration.java
index 8792f0d..2d6525d 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/DumperConfiguration.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/DumperConfiguration.java
@@ -37,8 +37,6 @@ public class DumperConfiguration {
private PipelineDataSourceConfiguration dataSourceConfig;
- private int blockQueueSize = 10000;
-
private IngestPosition<?> position;
/**
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java
index 4bf773c..52716cc 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java
@@ -44,6 +44,5 @@ public final class InventoryDumperConfiguration extends DumperConfiguration {
setDataSourceName(dumperConfig.getDataSourceName());
setDataSourceConfig(dumperConfig.getDataSourceConfig());
setTableNameMap(dumperConfig.getTableNameMap());
- setBlockQueueSize(dumperConfig.getBlockQueueSize());
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/AckCallback.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/channel/AckCallback.java
similarity index 94%
copy from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/AckCallback.java
copy to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/channel/AckCallback.java
index 5a3bd0e..5446d34 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/AckCallback.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/channel/AckCallback.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.ingest.channel;
+package org.apache.shardingsphere.data.pipeline.api.ingest.channel;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/channel/Channel.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/channel/Channel.java
index ccb1847..51cd0f0 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/channel/Channel.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/channel/Channel.java
@@ -24,24 +24,25 @@ import java.util.List;
/**
* Channel.
*/
+// TODO rename to PipelineChannel
public interface Channel {
/**
- * push a {@code DataRecord} to channel.
+ * Push {@code DataRecord} into channel.
*
* @param dataRecord data
- * @throws InterruptedException if thread interrupted
*/
- void pushRecord(Record dataRecord) throws InterruptedException;
+ void pushRecord(Record dataRecord);
/**
- * fetch {@code Record} from channel, if the timeout also returns the record.
+ * Fetch {@code Record} list from channel.
+ * It might be blocked at most timeout seconds if available records count doesn't reach batch size.
*
* @param batchSize record batch size
- * @param timeout timeout(seconds)
+ * @param timeoutSeconds timeout(seconds)
* @return record
*/
- List<Record> fetchRecords(int batchSize, int timeout);
+ List<Record> fetchRecords(int batchSize, int timeoutSeconds);
/**
* Ack the last batch.
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/AckCallback.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/channel/PipelineChannelFactory.java
similarity index 51%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/AckCallback.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/channel/PipelineChannelFactory.java
index 5a3bd0e..22277e8 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/AckCallback.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/channel/PipelineChannelFactory.java
@@ -15,21 +15,24 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.ingest.channel;
+package org.apache.shardingsphere.data.pipeline.spi.ingest.channel;
-import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
-
-import java.util.List;
+import org.apache.shardingsphere.data.pipeline.api.ingest.channel.AckCallback;
+import org.apache.shardingsphere.data.pipeline.api.ingest.channel.Channel;
+import org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithm;
+import org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmPostProcessor;
/**
- * Record acknowledged callback.
+ * Pipeline channel factory, SPI.
*/
-public interface AckCallback {
+public interface PipelineChannelFactory extends ShardingSphereAlgorithm, ShardingSphereAlgorithmPostProcessor {
/**
- * Call after record acknowledged.
+ * Create pipeline channel.
*
- * @param records acknowledged record list
+ * @param outputConcurrency output concurrency
+ * @param ackCallback ack callback
+ * @return {@link Channel}
*/
- void onAck(List<Record> records);
+ Channel createPipelineChannel(int outputConcurrency, AckCallback ackCallback);
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/resources/conf/config-sharding.yaml b/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/resources/conf/config-sharding.yaml
index efdaa29..0ee1c67 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/resources/conf/config-sharding.yaml
+++ b/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/resources/conf/config-sharding.yaml
@@ -97,7 +97,6 @@
# scalingName: default_scaling
# scaling:
# default_scaling:
-# blockQueueSize: 10000
# input:
# workerThread: 40
# batchSize: 1000
@@ -112,6 +111,10 @@
# type: TPS
# props:
# tps: 2000
+# streamChannel:
+# type: MEMORY
+# props:
+# block-queue-size: 10000
# completionDetector:
# type: IDLE
# props:
@@ -202,7 +205,6 @@
# scalingName: default_scaling
# scaling:
# default_scaling:
-# blockQueueSize: 10000
# input:
# workerThread: 40
# batchSize: 1000
@@ -217,6 +219,10 @@
# type: TPS
# props:
# tps: 2000
+# streamChannel:
+# type: MEMORY
+# props:
+# block-queue-size: 10000
# completionDetector:
# type: IDLE
# props:
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
index 2c8c92c..78dfafc 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
@@ -129,12 +129,12 @@ public final class GovernanceRepositoryAPIImplTest {
dumperConfig.setTableName("t_order");
dumperConfig.setPrimaryKey("order_id");
dumperConfig.setShardingItem(0);
- return new InventoryTask(dumperConfig, taskConfig.getImporterConfig(), RuleAlteredContextUtil.getExecuteEngine());
+ return new InventoryTask(dumperConfig, taskConfig.getImporterConfig(), RuleAlteredContextUtil.getPipelineChannelFactory(), RuleAlteredContextUtil.getExecuteEngine());
}
private IncrementalTask mockIncrementalTask(final TaskConfiguration taskConfig) {
DumperConfiguration dumperConfig = taskConfig.getDumperConfig();
dumperConfig.setPosition(new PlaceholderPosition());
- return new IncrementalTask(3, dumperConfig, taskConfig.getImporterConfig(), RuleAlteredContextUtil.getExecuteEngine());
+ return new IncrementalTask(3, dumperConfig, taskConfig.getImporterConfig(), RuleAlteredContextUtil.getPipelineChannelFactory(), RuleAlteredContextUtil.getExecuteEngine());
}
}
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/MemoryChannelTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/MemoryChannelTest.java
index 55318b7..96965a9 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/MemoryChannelTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/distribution/MemoryChannelTest.java
@@ -26,7 +26,7 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
-import org.apache.shardingsphere.data.pipeline.core.ingest.channel.AckCallback;
+import org.apache.shardingsphere.data.pipeline.api.ingest.channel.AckCallback;
import org.junit.Test;
import java.util.Arrays;
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitterTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
index 13ad21f..be7c39c 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
@@ -21,10 +21,12 @@ import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfigura
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.PrimaryKeyPosition;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
+import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
import org.apache.shardingsphere.data.pipeline.core.util.ResourceUtil;
import org.apache.shardingsphere.data.pipeline.core.util.RuleAlteredContextUtil;
import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
+import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -41,6 +43,10 @@ import static org.junit.Assert.assertThat;
public final class InventoryTaskSplitterTest {
+ private static final ExecuteEngine EXECUTE_ENGINE = RuleAlteredContextUtil.getExecuteEngine();
+
+ private static final PipelineChannelFactory PIPELINE_CHANNEL_FACTORY = RuleAlteredContextUtil.getPipelineChannelFactory();
+
private RuleAlteredJobContext jobContext;
private TaskConfiguration taskConfig;
@@ -60,7 +66,7 @@ public final class InventoryTaskSplitterTest {
public void assertSplitInventoryDataWithEmptyTable() throws SQLException {
taskConfig.getHandleConfig().setShardingSize(10);
initEmptyTablePrimaryEnvironment(taskConfig.getDumperConfig());
- List<InventoryTask> actual = inventoryTaskSplitter.splitInventoryData(jobContext, taskConfig, dataSourceManager, RuleAlteredContextUtil.getExecuteEngine());
+ List<InventoryTask> actual = inventoryTaskSplitter.splitInventoryData(jobContext, taskConfig, dataSourceManager, PIPELINE_CHANNEL_FACTORY, EXECUTE_ENGINE);
assertNotNull(actual);
assertThat(actual.size(), is(1));
assertThat(((PrimaryKeyPosition) actual.get(0).getProgress().getPosition()).getBeginValue(), is(0L));
@@ -71,7 +77,7 @@ public final class InventoryTaskSplitterTest {
public void assertSplitInventoryDataWithIntPrimary() throws SQLException {
taskConfig.getHandleConfig().setShardingSize(10);
initIntPrimaryEnvironment(taskConfig.getDumperConfig());
- List<InventoryTask> actual = inventoryTaskSplitter.splitInventoryData(jobContext, taskConfig, dataSourceManager, RuleAlteredContextUtil.getExecuteEngine());
+ List<InventoryTask> actual = inventoryTaskSplitter.splitInventoryData(jobContext, taskConfig, dataSourceManager, PIPELINE_CHANNEL_FACTORY, EXECUTE_ENGINE);
assertNotNull(actual);
assertThat(actual.size(), is(10));
assertThat(((PrimaryKeyPosition) actual.get(9).getProgress().getPosition()).getBeginValue(), is(91L));
@@ -81,7 +87,7 @@ public final class InventoryTaskSplitterTest {
@Test
public void assertSplitInventoryDataWithCharPrimary() throws SQLException {
initCharPrimaryEnvironment(taskConfig.getDumperConfig());
- List<InventoryTask> actual = inventoryTaskSplitter.splitInventoryData(jobContext, taskConfig, dataSourceManager, RuleAlteredContextUtil.getExecuteEngine());
+ List<InventoryTask> actual = inventoryTaskSplitter.splitInventoryData(jobContext, taskConfig, dataSourceManager, PIPELINE_CHANNEL_FACTORY, EXECUTE_ENGINE);
assertNotNull(actual);
assertThat(actual.size(), is(1));
}
@@ -89,7 +95,7 @@ public final class InventoryTaskSplitterTest {
@Test
public void assertSplitInventoryDataWithUnionPrimary() throws SQLException {
initUnionPrimaryEnvironment(taskConfig.getDumperConfig());
- List<InventoryTask> actual = inventoryTaskSplitter.splitInventoryData(jobContext, taskConfig, dataSourceManager, RuleAlteredContextUtil.getExecuteEngine());
+ List<InventoryTask> actual = inventoryTaskSplitter.splitInventoryData(jobContext, taskConfig, dataSourceManager, PIPELINE_CHANNEL_FACTORY, EXECUTE_ENGINE);
assertNotNull(actual);
assertThat(actual.size(), is(1));
}
@@ -97,7 +103,7 @@ public final class InventoryTaskSplitterTest {
@Test
public void assertSplitInventoryDataWithoutPrimary() throws SQLException {
initNoPrimaryEnvironment(taskConfig.getDumperConfig());
- List<InventoryTask> actual = inventoryTaskSplitter.splitInventoryData(jobContext, taskConfig, dataSourceManager, RuleAlteredContextUtil.getExecuteEngine());
+ List<InventoryTask> actual = inventoryTaskSplitter.splitInventoryData(jobContext, taskConfig, dataSourceManager, PIPELINE_CHANNEL_FACTORY, EXECUTE_ENGINE);
assertNotNull(actual);
assertThat(actual.size(), is(1));
}
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
index 96b06c4..c684563 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
@@ -44,7 +44,8 @@ public final class IncrementalTaskTest {
public void setUp() {
TaskConfiguration taskConfig = new RuleAlteredJobContext(ResourceUtil.mockJobConfig()).getTaskConfigs().iterator().next();
taskConfig.getDumperConfig().setPosition(new PlaceholderPosition());
- incrementalTask = new IncrementalTask(3, taskConfig.getDumperConfig(), taskConfig.getImporterConfig(), RuleAlteredContextUtil.getExecuteEngine());
+ incrementalTask = new IncrementalTask(3, taskConfig.getDumperConfig(), taskConfig.getImporterConfig(),
+ RuleAlteredContextUtil.getPipelineChannelFactory(), RuleAlteredContextUtil.getExecuteEngine());
}
@Test
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
index b27029b..3072163 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
@@ -57,7 +57,8 @@ public final class InventoryTaskTest {
position = new PrimaryKeyPosition(0, 1000);
}
inventoryDumperConfig.setPosition(position);
- try (InventoryTask inventoryTask = new InventoryTask(inventoryDumperConfig, taskConfig.getImporterConfig(), RuleAlteredContextUtil.getExecuteEngine())) {
+ try (InventoryTask inventoryTask = new InventoryTask(inventoryDumperConfig, taskConfig.getImporterConfig(),
+ RuleAlteredContextUtil.getPipelineChannelFactory(), RuleAlteredContextUtil.getExecuteEngine())) {
inventoryTask.start();
}
}
@@ -72,7 +73,8 @@ public final class InventoryTaskTest {
position = new PrimaryKeyPosition(0, 1000);
}
inventoryDumperConfig.setPosition(position);
- try (InventoryTask inventoryTask = new InventoryTask(inventoryDumperConfig, taskConfig.getImporterConfig(), RuleAlteredContextUtil.getExecuteEngine())) {
+ try (InventoryTask inventoryTask = new InventoryTask(inventoryDumperConfig, taskConfig.getImporterConfig(),
+ RuleAlteredContextUtil.getPipelineChannelFactory(), RuleAlteredContextUtil.getExecuteEngine())) {
inventoryTask.start();
assertFalse(inventoryTask.getProgress().getPosition() instanceof FinishedPosition);
}
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/RuleAlteredContextUtil.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/RuleAlteredContextUtil.java
index 0fc684c..cf81516 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/RuleAlteredContextUtil.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/RuleAlteredContextUtil.java
@@ -23,7 +23,9 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.Shardi
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
import org.apache.shardingsphere.data.pipeline.core.fixture.EmbedTestingServer;
+import org.apache.shardingsphere.data.pipeline.core.spi.ingest.channel.MemoryPipelineChannelFactory;
import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredContext;
+import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelFactory;
import org.apache.shardingsphere.driver.jdbc.core.datasource.ShardingSphereDataSource;
import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
import org.apache.shardingsphere.mode.manager.ContextManager;
@@ -36,6 +38,8 @@ public final class RuleAlteredContextUtil {
private static final ExecuteEngine EXECUTE_ENGINE = ExecuteEngine.newCachedThreadInstance();
+ private static final PipelineChannelFactory PIPELINE_CHANNEL_FACTORY = new MemoryPipelineChannelFactory();
+
/**
* Mock mode configuration.
*/
@@ -74,4 +78,13 @@ public final class RuleAlteredContextUtil {
public static ExecuteEngine getExecuteEngine() {
return EXECUTE_ENGINE;
}
+
+ /**
+ * Get pipeline channel factory.
+ *
+ * @return channel factory
+ */
+ public static PipelineChannelFactory getPipelineChannelFactory() {
+ return PIPELINE_CHANNEL_FACTORY;
+ }
}
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/config_sharding_sphere_jdbc_source.yaml b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/config_sharding_sphere_jdbc_source.yaml
index e6e492a..55d7c32 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/config_sharding_sphere_jdbc_source.yaml
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/config_sharding_sphere_jdbc_source.yaml
@@ -51,7 +51,6 @@ rules:
scalingName: default_scaling
scaling:
default_scaling:
- blockQueueSize: 10000
input:
workerThread: 40
batchSize: 1000
@@ -66,6 +65,10 @@ rules:
type: TPS
props:
tps: 2000
+ streamChannel:
+ type: MEMORY
+ props:
+ block-queue-size: 10000
completionDetector:
type: FIXTURE
dataConsistencyChecker:
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/config_sharding_sphere_jdbc_target.yaml b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/config_sharding_sphere_jdbc_target.yaml
index b569b7d..02c7e6c 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/config_sharding_sphere_jdbc_target.yaml
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/config_sharding_sphere_jdbc_target.yaml
@@ -54,7 +54,6 @@ rules:
scalingName: default_scaling
scaling:
default_scaling:
- blockQueueSize: 10000
input:
workerThread: 40
batchSize: 1000
@@ -69,6 +68,10 @@ rules:
type: TPS
props:
tps: 2000
+ streamChannel:
+ type: MEMORY
+ props:
+ block-queue-size: 10000
completionDetector:
type: FIXTURE
dataConsistencyChecker: