You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2022/10/07 05:47:30 UTC

[incubator-seatunnel] branch dev updated: [hotfix][engine] Fix execution parallelism not working (#2990)

This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 9c187e2d2 [hotfix][engine] Fix execution parallelism not working (#2990)
9c187e2d2 is described below

commit 9c187e2d2724a64983b69af490e138cc8672c7a6
Author: liugddx <80...@qq.com>
AuthorDate: Fri Oct 7 13:47:24 2022 +0800

    [hotfix][engine] Fix execution parallelism not working (#2990)
---
 .../seatunnel/engine/core/parse/JobConfigParser.java       | 14 +++++++++++---
 .../server/dag/execution/ExecutionPlanGenerator.java       |  2 +-
 2 files changed, 12 insertions(+), 4 deletions(-)

diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
index 6c2b6ea60..e817ed490 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
@@ -76,17 +76,21 @@ public class JobConfigParser {
 
     private JobConfig jobConfig;
 
+    private Config seaTunnelJobConfig;
+
+    private Config envConfigs;
+
     public JobConfigParser(@NonNull String jobDefineFilePath,
                            @NonNull IdGenerator idGenerator,
                            @NonNull JobConfig jobConfig) {
         this.jobDefineFilePath = jobDefineFilePath;
         this.idGenerator = idGenerator;
         this.jobConfig = jobConfig;
+        this.seaTunnelJobConfig = new ConfigBuilder(Paths.get(jobDefineFilePath)).getConfig();
+        this.envConfigs = seaTunnelJobConfig.getConfig("env");
     }
 
     public ImmutablePair<List<Action>, Set<URL>> parse() {
-        Config seaTunnelJobConfig = new ConfigBuilder(Paths.get(jobDefineFilePath)).getConfig();
-        Config envConfigs = seaTunnelJobConfig.getConfig("env");
         List<? extends Config> sinkConfigs = seaTunnelJobConfig.getConfigList("sink");
         List<? extends Config> transformConfigs = seaTunnelJobConfig.getConfigList("transform");
         List<? extends Config> sourceConfigs = seaTunnelJobConfig.getConfigList("source");
@@ -322,7 +326,11 @@ public class JobConfigParser {
             int sourceParallelism = sourceConfig.getInt(CollectionConstants.PARALLELISM);
             return Math.max(sourceParallelism, 1);
         }
-        return 1;
+        int executionParallelism = 0;
+        if (envConfigs.hasPath(CollectionConstants.PARALLELISM)) {
+            executionParallelism = envConfigs.getInt(CollectionConstants.PARALLELISM);
+        }
+        return Math.max(executionParallelism, 1);
     }
 
     private SourceAction createSourceAction(long id,
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
index 25aa11ef5..98461678f 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
@@ -194,7 +194,7 @@ public class ExecutionPlanGenerator {
                 action.getName(),
                 ((SinkAction<?, ?, ?, ?>) action).getSink(),
                 action.getJarUrls());
-        } else if (action instanceof SourceAction){
+        } else if (action instanceof SourceAction) {
             newAction = new SourceAction<>(id,
                 action.getName(),
                 ((SourceAction<?, ?, ?>) action).getSource(),