You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2022/10/07 05:47:30 UTC
[incubator-seatunnel] branch dev updated: [hotfix][engine] Fix execution parallelism not working (#2990)
This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 9c187e2d2 [hotfix][engine] Fix execution parallelism not working (#2990)
9c187e2d2 is described below
commit 9c187e2d2724a64983b69af490e138cc8672c7a6
Author: liugddx <80...@qq.com>
AuthorDate: Fri Oct 7 13:47:24 2022 +0800
[hotfix][engine] Fix execution parallelism not working (#2990)
---
.../seatunnel/engine/core/parse/JobConfigParser.java | 14 +++++++++++---
.../server/dag/execution/ExecutionPlanGenerator.java | 2 +-
2 files changed, 12 insertions(+), 4 deletions(-)
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
index 6c2b6ea60..e817ed490 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
@@ -76,17 +76,21 @@ public class JobConfigParser {
private JobConfig jobConfig;
+ private Config seaTunnelJobConfig;
+
+ private Config envConfigs;
+
public JobConfigParser(@NonNull String jobDefineFilePath,
@NonNull IdGenerator idGenerator,
@NonNull JobConfig jobConfig) {
this.jobDefineFilePath = jobDefineFilePath;
this.idGenerator = idGenerator;
this.jobConfig = jobConfig;
+ this.seaTunnelJobConfig = new ConfigBuilder(Paths.get(jobDefineFilePath)).getConfig();
+ this.envConfigs = seaTunnelJobConfig.getConfig("env");
}
public ImmutablePair<List<Action>, Set<URL>> parse() {
- Config seaTunnelJobConfig = new ConfigBuilder(Paths.get(jobDefineFilePath)).getConfig();
- Config envConfigs = seaTunnelJobConfig.getConfig("env");
List<? extends Config> sinkConfigs = seaTunnelJobConfig.getConfigList("sink");
List<? extends Config> transformConfigs = seaTunnelJobConfig.getConfigList("transform");
List<? extends Config> sourceConfigs = seaTunnelJobConfig.getConfigList("source");
@@ -322,7 +326,11 @@ public class JobConfigParser {
int sourceParallelism = sourceConfig.getInt(CollectionConstants.PARALLELISM);
return Math.max(sourceParallelism, 1);
}
- return 1;
+ int executionParallelism = 0;
+ if (envConfigs.hasPath(CollectionConstants.PARALLELISM)) {
+ executionParallelism = envConfigs.getInt(CollectionConstants.PARALLELISM);
+ }
+ return Math.max(executionParallelism, 1);
}
private SourceAction createSourceAction(long id,
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
index 25aa11ef5..98461678f 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
@@ -194,7 +194,7 @@ public class ExecutionPlanGenerator {
action.getName(),
((SinkAction<?, ?, ?, ?>) action).getSink(),
action.getJarUrls());
- } else if (action instanceof SourceAction){
+ } else if (action instanceof SourceAction) {
newAction = new SourceAction<>(id,
action.getName(),
((SourceAction<?, ?, ?>) action).getSource(),