You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2022/10/05 05:50:09 UTC

[incubator-seatunnel] branch dev updated: [Hotfix][core] Fix spark engine parallelism parameter does not working (#2965)

This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 35e896eb6 [Hotfix][core] Fix spark engine parallelism parameter does not working (#2965)
35e896eb6 is described below

commit 35e896eb67b23e450d2ac49f6a891f39cc7826bd
Author: TyrantLucifer <Ty...@gmail.com>
AuthorDate: Wed Oct 5 13:50:04 2022 +0800

    [Hotfix][core] Fix spark engine parallelism parameter does not working (#2965)
    
    * [Hotfix][core] Fix spark engine parallelism parameter does not working
---
 .../main/java/org/apache/seatunnel/spark/SparkEnvironment.java    | 8 +++++++-
 .../src/main/java/org/apache/seatunnel/common/Constants.java      | 2 +-
 .../core/starter/spark/execution/SourceExecuteProcessor.java      | 8 ++++++++
 3 files changed, 16 insertions(+), 2 deletions(-)

diff --git a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/SparkEnvironment.java b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/SparkEnvironment.java
index 3e2d5e91b..b9758babe 100644
--- a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/SparkEnvironment.java
+++ b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/SparkEnvironment.java
@@ -46,6 +46,8 @@ public class SparkEnvironment implements RuntimeEnv {
 
     private static final long DEFAULT_SPARK_STREAMING_DURATION = 5;
 
+    private SparkConf sparkConf;
+
     private SparkSession sparkSession;
 
     private StreamingContext streamingContext;
@@ -98,7 +100,7 @@ public class SparkEnvironment implements RuntimeEnv {
 
     @Override
     public SparkEnvironment prepare() {
-        SparkConf sparkConf = createSparkConf();
+        sparkConf = createSparkConf();
         SparkSession.Builder builder = SparkSession.builder().config(sparkConf);
         if (enableHive) {
             builder.enableHiveSupport();
@@ -116,6 +118,10 @@ public class SparkEnvironment implements RuntimeEnv {
         return this.streamingContext;
     }
 
+    public SparkConf getSparkConf() {
+        return this.sparkConf;
+    }
+
     private SparkConf createSparkConf() {
         SparkConf sparkConf = new SparkConf();
         this.config.entrySet().forEach(entry -> sparkConf.set(entry.getKey(), String.valueOf(entry.getValue().unwrapped())));
diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java
index 8e700d89a..ad9f8e222 100644
--- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java
+++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java
@@ -31,7 +31,7 @@ public final class Constants {
 
     public static final String SOURCE_SERIALIZATION = "source.serialization";
 
-    public static final String SOURCE_PARALLELISM = "source.parallelism";
+    public static final String SOURCE_PARALLELISM = "parallelism";
 
     public static final String HDFS_ROOT = "hdfs.root";
 
diff --git a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
index f6eafe67c..eda672914 100644
--- a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
@@ -54,9 +54,17 @@ public class SourceExecuteProcessor extends AbstractPluginExecuteProcessor<SeaTu
         List<Dataset<Row>> sources = new ArrayList<>();
         for (int i = 0; i < plugins.size(); i++) {
             SeaTunnelSource<?, ?, ?> source = plugins.get(i);
+            Config pluginConfig = pluginConfigs.get(i);
+            int parallelism;
+            if (pluginConfig.hasPath(Constants.SOURCE_PARALLELISM)) {
+                parallelism = pluginConfig.getInt(Constants.SOURCE_PARALLELISM);
+            } else {
+                parallelism = sparkEnvironment.getSparkConf().getInt(Constants.SOURCE_PARALLELISM, 1);
+            }
             Dataset<Row> dataset = sparkEnvironment.getSparkSession()
                 .read()
                 .format(SeaTunnelSource.class.getSimpleName())
+                .option(Constants.SOURCE_PARALLELISM, parallelism)
                 .option(Constants.SOURCE_SERIALIZATION, SerializationUtils.objectToString(source))
                 .schema((StructType) TypeConverterUtils.convert(source.getProducedType())).load();
             sources.add(dataset);