You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/10/18 02:51:52 UTC
[incubator-seatunnel] branch dev updated: [Imporve][Seatunnel-code][Seatunnel-flink-starter] customize operator parallel for flink and spark (#2941)
This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new d6f826835 [Imporve][Seatunnel-code][Seatunnel-flink-starter] customize operator parallel for flink and spark (#2941)
d6f826835 is described below
commit d6f82683592237e2cf6505fccac76c58854092fc
Author: liugddx <80...@qq.com>
AuthorDate: Tue Oct 18 10:51:47 2022 +0800
[Imporve][Seatunnel-code][Seatunnel-flink-starter] customize operator parallel for flink and spark (#2941)
* customize operator parallel
---
docs/en/connector-v2/sink/common-options.md | 10 ++++++++
docs/en/connector-v2/source/common-options.md | 7 ++++++
.../org/apache/seatunnel/common/Constants.java | 2 --
.../flink/execution/SinkExecuteProcessor.java | 8 ++++++-
.../flink/execution/SourceExecuteProcessor.java | 5 ++++
.../spark/execution/SinkExecuteProcessor.java | 8 +++++++
.../spark/execution/SourceExecuteProcessor.java | 9 ++++----
.../main/resources/examples/fake_to_console.conf | 27 ++++++++++++----------
.../src/main/resources/examples/spark.batch.conf | 7 ++++--
.../spark/source/SeaTunnelSourceSupport.java | 5 ++--
10 files changed, 65 insertions(+), 23 deletions(-)
diff --git a/docs/en/connector-v2/sink/common-options.md b/docs/en/connector-v2/sink/common-options.md
index ac4a2e428..53c623086 100644
--- a/docs/en/connector-v2/sink/common-options.md
+++ b/docs/en/connector-v2/sink/common-options.md
@@ -5,6 +5,8 @@
| name | type | required | default value |
| ----------------- | ------ | -------- | ------------- |
| source_table_name | string | no | - |
+| parallelism | int | no | - |
+
### source_table_name [string]
@@ -12,11 +14,18 @@ When `source_table_name` is not specified, the current plug-in processes the dat
When `source_table_name` is specified, the current plug-in is processing the data set corresponding to this parameter.
+### parallelism [int]
+
+When `parallelism` is not specified, the `parallelism` in env is used by default.
+
+When parallelism is specified, it will override the parallelism in env.
+
## Examples
```bash
source {
FakeSourceStream {
+ parallelism = 2
result_table_name = "fake"
field_name = "name,age"
}
@@ -37,6 +46,7 @@ transform {
sink {
console {
+ parallelism = 3
source_table_name = "fake_name"
}
}
diff --git a/docs/en/connector-v2/source/common-options.md b/docs/en/connector-v2/source/common-options.md
index 7254c44e5..7fc32c505 100644
--- a/docs/en/connector-v2/source/common-options.md
+++ b/docs/en/connector-v2/source/common-options.md
@@ -5,6 +5,7 @@
| name | type | required | default value |
| ----------------- | ------ | -------- | ------------- |
| result_table_name | string | no | - |
+| parallelism | int | no | - |
### result_table_name [string]
@@ -12,6 +13,12 @@ When `result_table_name` is not specified, the data processed by this plugin wil
When `result_table_name` is specified, the data processed by this plugin will be registered as a data set `(dataStream/dataset)` that can be directly accessed by other plugins, or called a temporary table `(table)` . The data set `(dataStream/dataset)` registered here can be directly accessed by other plugins by specifying `source_table_name` .
+### parallelism [int]
+
+When `parallelism` is not specified, the `parallelism` in env is used by default.
+
+When parallelism is specified, it will override the parallelism in env.
+
## Example
```bash
diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java
index ad9f8e222..c5a24d228 100644
--- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java
+++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java
@@ -31,8 +31,6 @@ public final class Constants {
public static final String SOURCE_SERIALIZATION = "source.serialization";
- public static final String SOURCE_PARALLELISM = "parallelism";
-
public static final String HDFS_ROOT = "hdfs.root";
public static final String HDFS_USER = "hdfs.user";
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index a7c48832b..eb0517cfb 100644
--- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.constants.CollectionConstants;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
@@ -32,6 +33,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
import com.google.common.collect.Lists;
import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.types.Row;
import java.net.URL;
@@ -76,7 +78,11 @@ public class SinkExecuteProcessor extends AbstractPluginExecuteProcessor<SeaTunn
SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable> seaTunnelSink = plugins.get(i);
DataStream<Row> stream = fromSourceTable(sinkConfig).orElse(input);
seaTunnelSink.setTypeInfo((SeaTunnelRowType) TypeConverterUtils.convert(stream.getType()));
- stream.sinkTo(new FlinkSink<>(seaTunnelSink)).name(seaTunnelSink.getPluginName());
+ DataStreamSink<Row> dataStreamSink = stream.sinkTo(new FlinkSink<>(seaTunnelSink)).name(seaTunnelSink.getPluginName());
+ if (sinkConfig.hasPath(CollectionConstants.PARALLELISM)) {
+ int parallelism = sinkConfig.getInt(CollectionConstants.PARALLELISM);
+ dataStreamSink.setParallelism(parallelism);
+ }
}
// the sink is the last stream
return null;
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
index 1dc378a83..0ae0f151d 100644
--- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
@@ -22,6 +22,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SupportCoordinate;
+import org.apache.seatunnel.common.constants.CollectionConstants;
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
@@ -75,6 +76,10 @@ public class SourceExecuteProcessor extends AbstractPluginExecuteProcessor<SeaTu
"SeaTunnel " + internalSource.getClass().getSimpleName(),
internalSource.getBoundedness() == org.apache.seatunnel.api.source.Boundedness.BOUNDED);
Config pluginConfig = pluginConfigs.get(i);
+ if (pluginConfig.hasPath(CollectionConstants.PARALLELISM)) {
+ int parallelism = pluginConfig.getInt(CollectionConstants.PARALLELISM);
+ sourceStream.setParallelism(parallelism);
+ }
registerResultTable(pluginConfig, sourceStream);
sources.add(sourceStream);
}
diff --git a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
index fbcecd7c8..c5d8276fe 100644
--- a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.core.starter.spark.execution;
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.constants.CollectionConstants;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
@@ -71,6 +72,13 @@ public class SinkExecuteProcessor extends AbstractPluginExecuteProcessor<SeaTunn
Config sinkConfig = pluginConfigs.get(i);
SeaTunnelSink<?, ?, ?, ?> seaTunnelSink = plugins.get(i);
Dataset<Row> dataset = fromSourceTable(sinkConfig, sparkEnvironment).orElse(input);
+ int parallelism;
+ if (sinkConfig.hasPath(CollectionConstants.PARALLELISM)) {
+ parallelism = sinkConfig.getInt(CollectionConstants.PARALLELISM);
+ } else {
+ parallelism = sparkEnvironment.getSparkConf().getInt(CollectionConstants.PARALLELISM, 1);
+ }
+ dataset.sparkSession().read().option(CollectionConstants.PARALLELISM, parallelism);
// TODO modify checkpoint location
seaTunnelSink.setTypeInfo((SeaTunnelRowType) TypeConverterUtils.convert(dataset.schema()));
SparkSinkInjector.inject(dataset.write(), seaTunnelSink).option("checkpointLocation", "/tmp").save();
diff --git a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
index eda672914..6f2f7901c 100644
--- a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.core.starter.spark.execution;
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.common.Constants;
+import org.apache.seatunnel.common.constants.CollectionConstants;
import org.apache.seatunnel.common.utils.SerializationUtils;
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery;
@@ -56,15 +57,15 @@ public class SourceExecuteProcessor extends AbstractPluginExecuteProcessor<SeaTu
SeaTunnelSource<?, ?, ?> source = plugins.get(i);
Config pluginConfig = pluginConfigs.get(i);
int parallelism;
- if (pluginConfig.hasPath(Constants.SOURCE_PARALLELISM)) {
- parallelism = pluginConfig.getInt(Constants.SOURCE_PARALLELISM);
+ if (pluginConfig.hasPath(CollectionConstants.PARALLELISM)) {
+ parallelism = pluginConfig.getInt(CollectionConstants.PARALLELISM);
} else {
- parallelism = sparkEnvironment.getSparkConf().getInt(Constants.SOURCE_PARALLELISM, 1);
+ parallelism = sparkEnvironment.getSparkConf().getInt(CollectionConstants.PARALLELISM, 1);
}
Dataset<Row> dataset = sparkEnvironment.getSparkSession()
.read()
.format(SeaTunnelSource.class.getSimpleName())
- .option(Constants.SOURCE_PARALLELISM, parallelism)
+ .option(CollectionConstants.PARALLELISM, parallelism)
.option(Constants.SOURCE_SERIALIZATION, SerializationUtils.objectToString(source))
.schema((StructType) TypeConverterUtils.convert(source.getProducedType())).load();
sources.add(dataset);
diff --git a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_console.conf b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_console.conf
index 212ec1cdf..aea7d4c8c 100644
--- a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_console.conf
+++ b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_console.conf
@@ -28,32 +28,35 @@ env {
source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
- FakeSource {
- result_table_name = "fake"
- row.num = 16
- schema = {
- fields {
- name = "string"
- age = "int"
- }
+ FakeSource {
+ parallelism = 2
+ result_table_name = "fake"
+ row.num = 16
+ schema = {
+ fields {
+ name = "string"
+ age = "int"
}
}
+ }
# If you would like to get more information about how to configure seatunnel and see full list of source plugins,
# please go to https://seatunnel.apache.org/docs/category/source-v2
}
transform {
- sql {
- sql = "select name,age from fake"
- }
+ sql {
+ sql = "select name,age from fake"
+ }
# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
# please go to https://seatunnel.apache.org/docs/category/transform
}
sink {
- Console {}
+ Console {
+ parallelism = 3
+ }
# If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
# please go to https://seatunnel.apache.org/docs/category/sink-v2
diff --git a/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf b/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf
index 4adb2a9ff..7692598da 100644
--- a/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf
+++ b/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf
@@ -24,7 +24,7 @@ env {
# see available properties defined by spark: https://spark.apache.org/docs/latest/configuration.html#available-properties
#job.mode = BATCH
job.name = "SeaTunnel"
- spark.executor.instances = 2
+ spark.executor.instances = 1
spark.executor.cores = 1
spark.executor.memory = "1g"
spark.master = local
@@ -34,6 +34,7 @@ source {
# This is a example input plugin **only for test and demonstrate the feature input plugin**
FakeSource {
row.num = 16
+ parallelism = 2
schema = {
fields {
c_map = "map<string, string>"
@@ -82,7 +83,9 @@ transform {
sink {
# choose stdout output plugin to output data to console
- Console {}
+ Console {
+ parallelism = 2
+ }
# you can also you other output plugins, such as sql
# hdfs {
diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java
index 22a4859aa..69b22a625 100644
--- a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java
+++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.translation.spark.source;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.Constants;
+import org.apache.seatunnel.common.constants.CollectionConstants;
import org.apache.seatunnel.common.utils.SerializationUtils;
import org.apache.seatunnel.translation.spark.source.batch.BatchSourceReader;
import org.apache.seatunnel.translation.spark.source.micro.MicroBatchSourceReader;
@@ -59,14 +60,14 @@ public class SeaTunnelSourceSupport implements DataSourceV2, ReadSupport, MicroB
@Override
public DataSourceReader createReader(DataSourceOptions options) {
SeaTunnelSource<SeaTunnelRow, ?, ?> seaTunnelSource = getSeaTunnelSource(options);
- int parallelism = options.getInt(Constants.SOURCE_PARALLELISM, 1);
+ int parallelism = options.getInt(CollectionConstants.PARALLELISM, 1);
return new BatchSourceReader(seaTunnelSource, parallelism);
}
@Override
public MicroBatchReader createMicroBatchReader(Optional<StructType> rowTypeOptional, String checkpointLocation, DataSourceOptions options) {
SeaTunnelSource<SeaTunnelRow, ?, ?> seaTunnelSource = getSeaTunnelSource(options);
- Integer parallelism = options.getInt(Constants.SOURCE_PARALLELISM, 1);
+ Integer parallelism = options.getInt(CollectionConstants.PARALLELISM, 1);
Integer checkpointInterval = options.getInt(Constants.CHECKPOINT_INTERVAL, CHECKPOINT_INTERVAL_DEFAULT);
String checkpointPath = StringUtils.replacePattern(checkpointLocation, "sources/\\d+", "sources-state");
Configuration configuration = SparkSession.getActiveSession().get().sparkContext().hadoopConfiguration();