You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2022/10/05 05:50:09 UTC
[incubator-seatunnel] branch dev updated: [Hotfix][core] Fix spark engine parallelism parameter does not working (#2965)
This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 35e896eb6 [Hotfix][core] Fix spark engine parallelism parameter does not working (#2965)
35e896eb6 is described below
commit 35e896eb67b23e450d2ac49f6a891f39cc7826bd
Author: TyrantLucifer <Ty...@gmail.com>
AuthorDate: Wed Oct 5 13:50:04 2022 +0800
[Hotfix][core] Fix spark engine parallelism parameter does not working (#2965)
* [Hotfix][core] Fix spark engine parallelism parameter does not working
---
.../main/java/org/apache/seatunnel/spark/SparkEnvironment.java | 8 +++++++-
.../src/main/java/org/apache/seatunnel/common/Constants.java | 2 +-
.../core/starter/spark/execution/SourceExecuteProcessor.java | 8 ++++++++
3 files changed, 16 insertions(+), 2 deletions(-)
diff --git a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/SparkEnvironment.java b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/SparkEnvironment.java
index 3e2d5e91b..b9758babe 100644
--- a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/SparkEnvironment.java
+++ b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/SparkEnvironment.java
@@ -46,6 +46,8 @@ public class SparkEnvironment implements RuntimeEnv {
private static final long DEFAULT_SPARK_STREAMING_DURATION = 5;
+ private SparkConf sparkConf;
+
private SparkSession sparkSession;
private StreamingContext streamingContext;
@@ -98,7 +100,7 @@ public class SparkEnvironment implements RuntimeEnv {
@Override
public SparkEnvironment prepare() {
- SparkConf sparkConf = createSparkConf();
+ sparkConf = createSparkConf();
SparkSession.Builder builder = SparkSession.builder().config(sparkConf);
if (enableHive) {
builder.enableHiveSupport();
@@ -116,6 +118,10 @@ public class SparkEnvironment implements RuntimeEnv {
return this.streamingContext;
}
+ public SparkConf getSparkConf() {
+ return this.sparkConf;
+ }
+
private SparkConf createSparkConf() {
SparkConf sparkConf = new SparkConf();
this.config.entrySet().forEach(entry -> sparkConf.set(entry.getKey(), String.valueOf(entry.getValue().unwrapped())));
diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java
index 8e700d89a..ad9f8e222 100644
--- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java
+++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java
@@ -31,7 +31,7 @@ public final class Constants {
public static final String SOURCE_SERIALIZATION = "source.serialization";
- public static final String SOURCE_PARALLELISM = "source.parallelism";
+ public static final String SOURCE_PARALLELISM = "parallelism";
public static final String HDFS_ROOT = "hdfs.root";
diff --git a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
index f6eafe67c..eda672914 100644
--- a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
@@ -54,9 +54,17 @@ public class SourceExecuteProcessor extends AbstractPluginExecuteProcessor<SeaTu
List<Dataset<Row>> sources = new ArrayList<>();
for (int i = 0; i < plugins.size(); i++) {
SeaTunnelSource<?, ?, ?> source = plugins.get(i);
+ Config pluginConfig = pluginConfigs.get(i);
+ int parallelism;
+ if (pluginConfig.hasPath(Constants.SOURCE_PARALLELISM)) {
+ parallelism = pluginConfig.getInt(Constants.SOURCE_PARALLELISM);
+ } else {
+ parallelism = sparkEnvironment.getSparkConf().getInt(Constants.SOURCE_PARALLELISM, 1);
+ }
Dataset<Row> dataset = sparkEnvironment.getSparkSession()
.read()
.format(SeaTunnelSource.class.getSimpleName())
+ .option(Constants.SOURCE_PARALLELISM, parallelism)
.option(Constants.SOURCE_SERIALIZATION, SerializationUtils.objectToString(source))
.schema((StructType) TypeConverterUtils.convert(source.getProducedType())).load();
sources.add(dataset);