You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by su...@apache.org on 2022/10/06 17:05:01 UTC

[shardingsphere] branch master updated: Refactor YamlPipelineProcessConfigurationSwapperTest (#21363)

This is an automated email from the ASF dual-hosted git repository.

sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new f97eed193a6 Refactor YamlPipelineProcessConfigurationSwapperTest (#21363)
f97eed193a6 is described below

commit f97eed193a63f3197b9467798a02e2c1742aa469
Author: Liang Zhang <zh...@apache.org>
AuthorDate: Fri Oct 7 01:04:52 2022 +0800

    Refactor YamlPipelineProcessConfigurationSwapperTest (#21363)
    
    * Refactor YamlPipelineProcessConfigurationSwapperTest
---
 ...amlPipelineProcessConfigurationSwapperTest.java | 68 +++++++++++++++++-----
 1 file changed, 55 insertions(+), 13 deletions(-)

diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/yaml/process/YamlPipelineProcessConfigurationSwapperTest.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/yaml/process/YamlPipelineProcessConfigurationSwapperTest.java
index 0bd1d40eab4..ac4f7062286 100644
--- a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/yaml/process/YamlPipelineProcessConfigurationSwapperTest.java
+++ b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/yaml/process/YamlPipelineProcessConfigurationSwapperTest.java
@@ -18,46 +18,88 @@
 package org.apache.shardingsphere.data.pipeline.yaml.process;
 
 import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
-import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineReadConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineWriteConfiguration;
+import org.apache.shardingsphere.infra.config.algorithm.AlgorithmConfiguration;
 import org.apache.shardingsphere.infra.yaml.config.pojo.algorithm.YamlAlgorithmConfiguration;
 import org.junit.Test;
 
 import java.util.Properties;
 
 import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertNull;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertNull;
 
 public final class YamlPipelineProcessConfigurationSwapperTest {
     
     @Test
-    public void assertSwap() {
-        YamlPipelineProcessConfiguration yamlConfig = new YamlPipelineProcessConfiguration();
+    public void assertSwapToObject() {
+        PipelineProcessConfiguration actual = new YamlPipelineProcessConfigurationSwapper().swapToObject(createYamlConfiguration());
+        assertThat(actual.getRead().getWorkerThread(), is(40));
+        assertThat(actual.getRead().getBatchSize(), is(1000));
+        assertThat(actual.getRead().getShardingSize(), is(10000000));
+        assertThat(actual.getRead().getRateLimiter().getType(), is("INPUT"));
+        assertThat(actual.getRead().getRateLimiter().getProps().getProperty("batch-size"), is("1000"));
+        assertThat(actual.getRead().getRateLimiter().getProps().getProperty("qps"), is("50"));
+        assertThat(actual.getWrite().getWorkerThread(), is(40));
+        assertThat(actual.getWrite().getBatchSize(), is(1000));
+        assertThat(actual.getWrite().getRateLimiter().getType(), is("OUTPUT"));
+        assertThat(actual.getWrite().getRateLimiter().getProps().getProperty("batch-size"), is("1000"));
+        assertThat(actual.getWrite().getRateLimiter().getProps().getProperty("qps"), is("50"));
+        assertThat(actual.getStreamChannel().getType(), is("MEMORY"));
+        assertThat(actual.getStreamChannel().getProps().getProperty("block-queue-size"), is("10000"));
+    }
+    
+    private YamlPipelineProcessConfiguration createYamlConfiguration() {
         Properties rateLimiterProps = new Properties();
         rateLimiterProps.setProperty("batch-size", "1000");
         rateLimiterProps.setProperty("qps", "50");
         YamlPipelineReadConfiguration yamlInputConfig = YamlPipelineReadConfiguration.buildWithDefaultValue();
-        yamlConfig.setRead(yamlInputConfig);
         yamlInputConfig.setRateLimiter(new YamlAlgorithmConfiguration("INPUT", rateLimiterProps));
+        YamlPipelineProcessConfiguration result = new YamlPipelineProcessConfiguration();
+        result.setRead(yamlInputConfig);
         YamlPipelineWriteConfiguration yamlOutputConfig = YamlPipelineWriteConfiguration.buildWithDefaultValue();
         yamlOutputConfig.setRateLimiter(new YamlAlgorithmConfiguration("OUTPUT", rateLimiterProps));
-        yamlConfig.setWrite(yamlOutputConfig);
+        result.setWrite(yamlOutputConfig);
+        Properties streamChannelProps = new Properties();
+        streamChannelProps.setProperty("block-queue-size", "10000");
+        result.setStreamChannel(new YamlAlgorithmConfiguration("MEMORY", streamChannelProps));
+        return result;
+    }
+    
+    @Test
+    public void assertSwapToYamlConfiguration() {
+        Properties rateLimiterProps = new Properties();
+        rateLimiterProps.setProperty("batch-size", "1000");
+        rateLimiterProps.setProperty("qps", "50");
+        PipelineReadConfiguration readConfig = new PipelineReadConfiguration(40, 1000, 10000000, new AlgorithmConfiguration("INPUT", rateLimiterProps));
+        PipelineWriteConfiguration writeConfig = new PipelineWriteConfiguration(40, 1000, new AlgorithmConfiguration("OUTPUT", rateLimiterProps));
         Properties streamChannelProps = new Properties();
         streamChannelProps.setProperty("block-queue-size", "10000");
-        yamlConfig.setStreamChannel(new YamlAlgorithmConfiguration("MEMORY", streamChannelProps));
-        YamlPipelineProcessConfigurationSwapper swapper = new YamlPipelineProcessConfigurationSwapper();
-        PipelineProcessConfiguration actualConfig = swapper.swapToObject(yamlConfig);
-        YamlPipelineProcessConfiguration actualYamlConfig = swapper.swapToYamlConfiguration(actualConfig);
-        assertThat(YamlEngine.marshal(actualYamlConfig), is(YamlEngine.marshal(yamlConfig)));
+        PipelineProcessConfiguration config = new PipelineProcessConfiguration(readConfig, writeConfig, new AlgorithmConfiguration("MEMORY", streamChannelProps));
+        YamlPipelineProcessConfiguration actual = new YamlPipelineProcessConfigurationSwapper().swapToYamlConfiguration(config);
+        assertThat(actual.getRead().getWorkerThread(), is(40));
+        assertThat(actual.getRead().getBatchSize(), is(1000));
+        assertThat(actual.getRead().getShardingSize(), is(10000000));
+        assertThat(actual.getRead().getRateLimiter().getType(), is("INPUT"));
+        assertThat(actual.getRead().getRateLimiter().getProps().getProperty("batch-size"), is("1000"));
+        assertThat(actual.getRead().getRateLimiter().getProps().getProperty("qps"), is("50"));
+        assertThat(actual.getWrite().getWorkerThread(), is(40));
+        assertThat(actual.getWrite().getBatchSize(), is(1000));
+        assertThat(actual.getWrite().getRateLimiter().getType(), is("OUTPUT"));
+        assertThat(actual.getWrite().getRateLimiter().getProps().getProperty("batch-size"), is("1000"));
+        assertThat(actual.getWrite().getRateLimiter().getProps().getProperty("qps"), is("50"));
+        assertThat(actual.getStreamChannel().getType(), is("MEMORY"));
+        assertThat(actual.getStreamChannel().getProps().getProperty("block-queue-size"), is("10000"));
     }
     
     @Test
-    public void assertYamlConfigNull() {
+    public void assertSwapToYamlConfigurationWithNull() {
         assertNull(new YamlPipelineProcessConfigurationSwapper().swapToYamlConfiguration(null));
     }
     
     @Test
-    public void assertConfigNull() {
+    public void assertSwapToObjectWithNull() {
         assertNull(new YamlPipelineProcessConfigurationSwapper().swapToObject(null));
     }
 }