You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by su...@apache.org on 2022/10/06 17:05:01 UTC
[shardingsphere] branch master updated: Refactor YamlPipelineProcessConfigurationSwapperTest (#21363)
This is an automated email from the ASF dual-hosted git repository.
sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new f97eed193a6 Refactor YamlPipelineProcessConfigurationSwapperTest (#21363)
f97eed193a6 is described below
commit f97eed193a63f3197b9467798a02e2c1742aa469
Author: Liang Zhang <zh...@apache.org>
AuthorDate: Fri Oct 7 01:04:52 2022 +0800
Refactor YamlPipelineProcessConfigurationSwapperTest (#21363)
* Refactor YamlPipelineProcessConfigurationSwapperTest
---
...amlPipelineProcessConfigurationSwapperTest.java | 68 +++++++++++++++++-----
1 file changed, 55 insertions(+), 13 deletions(-)
diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/yaml/process/YamlPipelineProcessConfigurationSwapperTest.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/yaml/process/YamlPipelineProcessConfigurationSwapperTest.java
index 0bd1d40eab4..ac4f7062286 100644
--- a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/yaml/process/YamlPipelineProcessConfigurationSwapperTest.java
+++ b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/yaml/process/YamlPipelineProcessConfigurationSwapperTest.java
@@ -18,46 +18,88 @@
package org.apache.shardingsphere.data.pipeline.yaml.process;
import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
-import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineReadConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineWriteConfiguration;
+import org.apache.shardingsphere.infra.config.algorithm.AlgorithmConfiguration;
import org.apache.shardingsphere.infra.yaml.config.pojo.algorithm.YamlAlgorithmConfiguration;
import org.junit.Test;
import java.util.Properties;
import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertNull;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertNull;
public final class YamlPipelineProcessConfigurationSwapperTest {
@Test
- public void assertSwap() {
- YamlPipelineProcessConfiguration yamlConfig = new YamlPipelineProcessConfiguration();
+ public void assertSwapToObject() {
+ PipelineProcessConfiguration actual = new YamlPipelineProcessConfigurationSwapper().swapToObject(createYamlConfiguration());
+ assertThat(actual.getRead().getWorkerThread(), is(40));
+ assertThat(actual.getRead().getBatchSize(), is(1000));
+ assertThat(actual.getRead().getShardingSize(), is(10000000));
+ assertThat(actual.getRead().getRateLimiter().getType(), is("INPUT"));
+ assertThat(actual.getRead().getRateLimiter().getProps().getProperty("batch-size"), is("1000"));
+ assertThat(actual.getRead().getRateLimiter().getProps().getProperty("qps"), is("50"));
+ assertThat(actual.getWrite().getWorkerThread(), is(40));
+ assertThat(actual.getWrite().getBatchSize(), is(1000));
+ assertThat(actual.getWrite().getRateLimiter().getType(), is("OUTPUT"));
+ assertThat(actual.getWrite().getRateLimiter().getProps().getProperty("batch-size"), is("1000"));
+ assertThat(actual.getWrite().getRateLimiter().getProps().getProperty("qps"), is("50"));
+ assertThat(actual.getStreamChannel().getType(), is("MEMORY"));
+ assertThat(actual.getStreamChannel().getProps().getProperty("block-queue-size"), is("10000"));
+ }
+
+ private YamlPipelineProcessConfiguration createYamlConfiguration() {
Properties rateLimiterProps = new Properties();
rateLimiterProps.setProperty("batch-size", "1000");
rateLimiterProps.setProperty("qps", "50");
YamlPipelineReadConfiguration yamlInputConfig = YamlPipelineReadConfiguration.buildWithDefaultValue();
- yamlConfig.setRead(yamlInputConfig);
yamlInputConfig.setRateLimiter(new YamlAlgorithmConfiguration("INPUT", rateLimiterProps));
+ YamlPipelineProcessConfiguration result = new YamlPipelineProcessConfiguration();
+ result.setRead(yamlInputConfig);
YamlPipelineWriteConfiguration yamlOutputConfig = YamlPipelineWriteConfiguration.buildWithDefaultValue();
yamlOutputConfig.setRateLimiter(new YamlAlgorithmConfiguration("OUTPUT", rateLimiterProps));
- yamlConfig.setWrite(yamlOutputConfig);
+ result.setWrite(yamlOutputConfig);
+ Properties streamChannelProps = new Properties();
+ streamChannelProps.setProperty("block-queue-size", "10000");
+ result.setStreamChannel(new YamlAlgorithmConfiguration("MEMORY", streamChannelProps));
+ return result;
+ }
+
+ @Test
+ public void assertSwapToYamlConfiguration() {
+ Properties rateLimiterProps = new Properties();
+ rateLimiterProps.setProperty("batch-size", "1000");
+ rateLimiterProps.setProperty("qps", "50");
+ PipelineReadConfiguration readConfig = new PipelineReadConfiguration(40, 1000, 10000000, new AlgorithmConfiguration("INPUT", rateLimiterProps));
+ PipelineWriteConfiguration writeConfig = new PipelineWriteConfiguration(40, 1000, new AlgorithmConfiguration("OUTPUT", rateLimiterProps));
Properties streamChannelProps = new Properties();
streamChannelProps.setProperty("block-queue-size", "10000");
- yamlConfig.setStreamChannel(new YamlAlgorithmConfiguration("MEMORY", streamChannelProps));
- YamlPipelineProcessConfigurationSwapper swapper = new YamlPipelineProcessConfigurationSwapper();
- PipelineProcessConfiguration actualConfig = swapper.swapToObject(yamlConfig);
- YamlPipelineProcessConfiguration actualYamlConfig = swapper.swapToYamlConfiguration(actualConfig);
- assertThat(YamlEngine.marshal(actualYamlConfig), is(YamlEngine.marshal(yamlConfig)));
+ PipelineProcessConfiguration config = new PipelineProcessConfiguration(readConfig, writeConfig, new AlgorithmConfiguration("MEMORY", streamChannelProps));
+ YamlPipelineProcessConfiguration actual = new YamlPipelineProcessConfigurationSwapper().swapToYamlConfiguration(config);
+ assertThat(actual.getRead().getWorkerThread(), is(40));
+ assertThat(actual.getRead().getBatchSize(), is(1000));
+ assertThat(actual.getRead().getShardingSize(), is(10000000));
+ assertThat(actual.getRead().getRateLimiter().getType(), is("INPUT"));
+ assertThat(actual.getRead().getRateLimiter().getProps().getProperty("batch-size"), is("1000"));
+ assertThat(actual.getRead().getRateLimiter().getProps().getProperty("qps"), is("50"));
+ assertThat(actual.getWrite().getWorkerThread(), is(40));
+ assertThat(actual.getWrite().getBatchSize(), is(1000));
+ assertThat(actual.getWrite().getRateLimiter().getType(), is("OUTPUT"));
+ assertThat(actual.getWrite().getRateLimiter().getProps().getProperty("batch-size"), is("1000"));
+ assertThat(actual.getWrite().getRateLimiter().getProps().getProperty("qps"), is("50"));
+ assertThat(actual.getStreamChannel().getType(), is("MEMORY"));
+ assertThat(actual.getStreamChannel().getProps().getProperty("block-queue-size"), is("10000"));
}
@Test
- public void assertYamlConfigNull() {
+ public void assertSwapToYamlConfigurationWithNull() {
assertNull(new YamlPipelineProcessConfigurationSwapper().swapToYamlConfiguration(null));
}
@Test
- public void assertConfigNull() {
+ public void assertSwapToObjectWithNull() {
assertNull(new YamlPipelineProcessConfigurationSwapper().swapToObject(null));
}
}