You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/10/18 02:51:52 UTC

[incubator-seatunnel] branch dev updated: [Imporve][Seatunnel-code][Seatunnel-flink-starter] customize operator parallel for flink and spark (#2941)

This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new d6f826835 [Imporve][Seatunnel-code][Seatunnel-flink-starter] customize operator parallel for flink and spark (#2941)
d6f826835 is described below

commit d6f82683592237e2cf6505fccac76c58854092fc
Author: liugddx <80...@qq.com>
AuthorDate: Tue Oct 18 10:51:47 2022 +0800

    [Imporve][Seatunnel-code][Seatunnel-flink-starter] customize operator parallel for flink and spark (#2941)
    
    * customize operator parallel
---
 docs/en/connector-v2/sink/common-options.md        | 10 ++++++++
 docs/en/connector-v2/source/common-options.md      |  7 ++++++
 .../org/apache/seatunnel/common/Constants.java     |  2 --
 .../flink/execution/SinkExecuteProcessor.java      |  8 ++++++-
 .../flink/execution/SourceExecuteProcessor.java    |  5 ++++
 .../spark/execution/SinkExecuteProcessor.java      |  8 +++++++
 .../spark/execution/SourceExecuteProcessor.java    |  9 ++++----
 .../main/resources/examples/fake_to_console.conf   | 27 ++++++++++++----------
 .../src/main/resources/examples/spark.batch.conf   |  7 ++++--
 .../spark/source/SeaTunnelSourceSupport.java       |  5 ++--
 10 files changed, 65 insertions(+), 23 deletions(-)

diff --git a/docs/en/connector-v2/sink/common-options.md b/docs/en/connector-v2/sink/common-options.md
index ac4a2e428..53c623086 100644
--- a/docs/en/connector-v2/sink/common-options.md
+++ b/docs/en/connector-v2/sink/common-options.md
@@ -5,6 +5,8 @@
 | name              | type   | required | default value |
 | ----------------- | ------ | -------- | ------------- |
 | source_table_name | string | no       | -             |
+| parallelism       | int    | no       | -             |
+
 
 ### source_table_name [string]
 
@@ -12,11 +14,18 @@ When `source_table_name` is not specified, the current plug-in processes the dat
 
 When `source_table_name` is specified, the current plug-in is processing the data set corresponding to this parameter.
 
+### parallelism [int]
+
+When `parallelism` is not specified, the `parallelism` in env is used by default.
+
+When parallelism is specified, it will override the parallelism in env.
+
 ## Examples
 
 ```bash
 source {
     FakeSourceStream {
+      parallelism = 2
       result_table_name = "fake"
       field_name = "name,age"
     }
@@ -37,6 +46,7 @@ transform {
 
 sink {
     console {
+      parallelism = 3
       source_table_name = "fake_name"
     }
 }
diff --git a/docs/en/connector-v2/source/common-options.md b/docs/en/connector-v2/source/common-options.md
index 7254c44e5..7fc32c505 100644
--- a/docs/en/connector-v2/source/common-options.md
+++ b/docs/en/connector-v2/source/common-options.md
@@ -5,6 +5,7 @@
 | name              | type   | required | default value |
 | ----------------- | ------ | -------- | ------------- |
 | result_table_name | string | no       | -             |
+| parallelism       | int    | no       | -             |
 
 ### result_table_name [string]
 
@@ -12,6 +13,12 @@ When `result_table_name` is not specified, the data processed by this plugin wil
 
 When `result_table_name` is specified, the data processed by this plugin will be registered as a data set `(dataStream/dataset)` that can be directly accessed by other plugins, or called a temporary table `(table)` . The data set `(dataStream/dataset)` registered here can be directly accessed by other plugins by specifying `source_table_name` .
 
+### parallelism [int]
+
+When `parallelism` is not specified, the `parallelism` in env is used by default.
+
+When parallelism is specified, it will override the parallelism in env.
+
 ## Example
 
 ```bash
diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java
index ad9f8e222..c5a24d228 100644
--- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java
+++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java
@@ -31,8 +31,6 @@ public final class Constants {
 
     public static final String SOURCE_SERIALIZATION = "source.serialization";
 
-    public static final String SOURCE_PARALLELISM = "parallelism";
-
     public static final String HDFS_ROOT = "hdfs.root";
 
     public static final String HDFS_USER = "hdfs.user";
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index a7c48832b..eb0517cfb 100644
--- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.constants.CollectionConstants;
 import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
 import org.apache.seatunnel.flink.FlinkEnvironment;
 import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
@@ -32,6 +33,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import com.google.common.collect.Lists;
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.types.Row;
 
 import java.net.URL;
@@ -76,7 +78,11 @@ public class SinkExecuteProcessor extends AbstractPluginExecuteProcessor<SeaTunn
             SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable> seaTunnelSink = plugins.get(i);
             DataStream<Row> stream = fromSourceTable(sinkConfig).orElse(input);
             seaTunnelSink.setTypeInfo((SeaTunnelRowType) TypeConverterUtils.convert(stream.getType()));
-            stream.sinkTo(new FlinkSink<>(seaTunnelSink)).name(seaTunnelSink.getPluginName());
+            DataStreamSink<Row> dataStreamSink = stream.sinkTo(new FlinkSink<>(seaTunnelSink)).name(seaTunnelSink.getPluginName());
+            if (sinkConfig.hasPath(CollectionConstants.PARALLELISM)) {
+                int parallelism = sinkConfig.getInt(CollectionConstants.PARALLELISM);
+                dataStreamSink.setParallelism(parallelism);
+            }
         }
         // the sink is the last stream
         return null;
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
index 1dc378a83..0ae0f151d 100644
--- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
@@ -22,6 +22,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.source.SupportCoordinate;
+import org.apache.seatunnel.common.constants.CollectionConstants;
 import org.apache.seatunnel.common.constants.JobMode;
 import org.apache.seatunnel.flink.FlinkEnvironment;
 import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
@@ -75,6 +76,10 @@ public class SourceExecuteProcessor extends AbstractPluginExecuteProcessor<SeaTu
                 "SeaTunnel " + internalSource.getClass().getSimpleName(),
                 internalSource.getBoundedness() == org.apache.seatunnel.api.source.Boundedness.BOUNDED);
             Config pluginConfig = pluginConfigs.get(i);
+            if (pluginConfig.hasPath(CollectionConstants.PARALLELISM)) {
+                int parallelism = pluginConfig.getInt(CollectionConstants.PARALLELISM);
+                sourceStream.setParallelism(parallelism);
+            }
             registerResultTable(pluginConfig, sourceStream);
             sources.add(sourceStream);
         }
diff --git a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
index fbcecd7c8..c5d8276fe 100644
--- a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.core.starter.spark.execution;
 import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.constants.CollectionConstants;
 import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
 import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
 import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
@@ -71,6 +72,13 @@ public class SinkExecuteProcessor extends AbstractPluginExecuteProcessor<SeaTunn
             Config sinkConfig = pluginConfigs.get(i);
             SeaTunnelSink<?, ?, ?, ?> seaTunnelSink = plugins.get(i);
             Dataset<Row> dataset = fromSourceTable(sinkConfig, sparkEnvironment).orElse(input);
+            int parallelism;
+            if (sinkConfig.hasPath(CollectionConstants.PARALLELISM)) {
+                parallelism = sinkConfig.getInt(CollectionConstants.PARALLELISM);
+            } else {
+                parallelism = sparkEnvironment.getSparkConf().getInt(CollectionConstants.PARALLELISM, 1);
+            }
+            dataset.sparkSession().read().option(CollectionConstants.PARALLELISM, parallelism);
             // TODO modify checkpoint location
             seaTunnelSink.setTypeInfo((SeaTunnelRowType) TypeConverterUtils.convert(dataset.schema()));
             SparkSinkInjector.inject(dataset.write(), seaTunnelSink).option("checkpointLocation", "/tmp").save();
diff --git a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
index eda672914..6f2f7901c 100644
--- a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.core.starter.spark.execution;
 import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.common.Constants;
+import org.apache.seatunnel.common.constants.CollectionConstants;
 import org.apache.seatunnel.common.utils.SerializationUtils;
 import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
 import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery;
@@ -56,15 +57,15 @@ public class SourceExecuteProcessor extends AbstractPluginExecuteProcessor<SeaTu
             SeaTunnelSource<?, ?, ?> source = plugins.get(i);
             Config pluginConfig = pluginConfigs.get(i);
             int parallelism;
-            if (pluginConfig.hasPath(Constants.SOURCE_PARALLELISM)) {
-                parallelism = pluginConfig.getInt(Constants.SOURCE_PARALLELISM);
+            if (pluginConfig.hasPath(CollectionConstants.PARALLELISM)) {
+                parallelism = pluginConfig.getInt(CollectionConstants.PARALLELISM);
             } else {
-                parallelism = sparkEnvironment.getSparkConf().getInt(Constants.SOURCE_PARALLELISM, 1);
+                parallelism = sparkEnvironment.getSparkConf().getInt(CollectionConstants.PARALLELISM, 1);
             }
             Dataset<Row> dataset = sparkEnvironment.getSparkSession()
                 .read()
                 .format(SeaTunnelSource.class.getSimpleName())
-                .option(Constants.SOURCE_PARALLELISM, parallelism)
+                .option(CollectionConstants.PARALLELISM, parallelism)
                 .option(Constants.SOURCE_SERIALIZATION, SerializationUtils.objectToString(source))
                 .schema((StructType) TypeConverterUtils.convert(source.getProducedType())).load();
             sources.add(dataset);
diff --git a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_console.conf b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_console.conf
index 212ec1cdf..aea7d4c8c 100644
--- a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_console.conf
+++ b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_console.conf
@@ -28,32 +28,35 @@ env {
 
 source {
   # This is a example source plugin **only for test and demonstrate the feature source plugin**
-    FakeSource {
-      result_table_name = "fake"
-      row.num = 16
-      schema = {
-        fields {
-          name = "string"
-          age = "int"
-        }
+  FakeSource {
+    parallelism = 2
+    result_table_name = "fake"
+    row.num = 16
+    schema = {
+      fields {
+        name = "string"
+        age = "int"
       }
     }
+  }
 
   # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
   # please go to https://seatunnel.apache.org/docs/category/source-v2
 }
 
 transform {
-    sql {
-      sql = "select name,age from fake"
-    }
+  sql {
+    sql = "select name,age from fake"
+  }
 
   # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
   # please go to https://seatunnel.apache.org/docs/category/transform
 }
 
 sink {
-  Console {}
+  Console {
+    parallelism = 3
+  }
 
   # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
   # please go to https://seatunnel.apache.org/docs/category/sink-v2
diff --git a/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf b/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf
index 4adb2a9ff..7692598da 100644
--- a/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf
+++ b/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf
@@ -24,7 +24,7 @@ env {
   # see available properties defined by spark: https://spark.apache.org/docs/latest/configuration.html#available-properties
   #job.mode = BATCH
   job.name = "SeaTunnel"
-  spark.executor.instances = 2
+  spark.executor.instances = 1
   spark.executor.cores = 1
   spark.executor.memory = "1g"
   spark.master = local
@@ -34,6 +34,7 @@ source {
   # This is a example input plugin **only for test and demonstrate the feature input plugin**
   FakeSource {
     row.num = 16
+    parallelism = 2
     schema = {
       fields {
         c_map = "map<string, string>"
@@ -82,7 +83,9 @@ transform {
 
 sink {
   # choose stdout output plugin to output data to console
-  Console {}
+  Console {
+    parallelism = 2
+  }
 
   # you can also you other output plugins, such as sql
   # hdfs {
diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java
index 22a4859aa..69b22a625 100644
--- a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java
+++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.translation.spark.source;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.common.Constants;
+import org.apache.seatunnel.common.constants.CollectionConstants;
 import org.apache.seatunnel.common.utils.SerializationUtils;
 import org.apache.seatunnel.translation.spark.source.batch.BatchSourceReader;
 import org.apache.seatunnel.translation.spark.source.micro.MicroBatchSourceReader;
@@ -59,14 +60,14 @@ public class SeaTunnelSourceSupport implements DataSourceV2, ReadSupport, MicroB
     @Override
     public DataSourceReader createReader(DataSourceOptions options) {
         SeaTunnelSource<SeaTunnelRow, ?, ?> seaTunnelSource = getSeaTunnelSource(options);
-        int parallelism = options.getInt(Constants.SOURCE_PARALLELISM, 1);
+        int parallelism = options.getInt(CollectionConstants.PARALLELISM, 1);
         return new BatchSourceReader(seaTunnelSource, parallelism);
     }
 
     @Override
     public MicroBatchReader createMicroBatchReader(Optional<StructType> rowTypeOptional, String checkpointLocation, DataSourceOptions options) {
         SeaTunnelSource<SeaTunnelRow, ?, ?> seaTunnelSource = getSeaTunnelSource(options);
-        Integer parallelism = options.getInt(Constants.SOURCE_PARALLELISM, 1);
+        Integer parallelism = options.getInt(CollectionConstants.PARALLELISM, 1);
         Integer checkpointInterval = options.getInt(Constants.CHECKPOINT_INTERVAL, CHECKPOINT_INTERVAL_DEFAULT);
         String checkpointPath = StringUtils.replacePattern(checkpointLocation, "sources/\\d+", "sources-state");
         Configuration configuration = SparkSession.getActiveSession().get().sparkContext().hadoopConfiguration();