You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/11/18 13:10:57 UTC
[flink] 01/19: [hotfix] Fix parallelism consolidation logic in the
(Stream)ExecutionEnvironment.
This is an automated email from the ASF dual-hosted git repository.
kkloudas pushed a commit to branch executor-impl
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3ca53c6dd68c54bc34dc397ea7e764d14e20f8aa
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Thu Nov 14 15:41:57 2019 +0100
[hotfix] Fix parallelism consolidation logic in the (Stream)ExecutionEnvironment.
---
.../java/org/apache/flink/api/java/ExecutionEnvironment.java | 10 ++--------
.../streaming/api/environment/StreamExecutionEnvironment.java | 10 ++--------
2 files changed, 4 insertions(+), 16 deletions(-)
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 1c6fca1..5b07843 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -801,15 +801,9 @@ public class ExecutionEnvironment {
}
private void consolidateParallelismDefinitionsInConfiguration() {
- final int execParallelism = getParallelism();
- if (execParallelism == ExecutionConfig.PARALLELISM_DEFAULT) {
- return;
+ if (getParallelism() == ExecutionConfig.PARALLELISM_DEFAULT) {
+ configuration.getOptional(CoreOptions.DEFAULT_PARALLELISM).ifPresent(this::setParallelism);
}
-
- // if parallelism is set in the ExecutorConfig, then
- // that value takes precedence over any other value.
-
- configuration.set(CoreOptions.DEFAULT_PARALLELISM, execParallelism);
}
/**
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 721869c..3870b52 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -1556,15 +1556,9 @@ public class StreamExecutionEnvironment {
}
private void consolidateParallelismDefinitionsInConfiguration() {
- final int execParallelism = getParallelism();
- if (execParallelism == ExecutionConfig.PARALLELISM_DEFAULT) {
- return;
+ if (getParallelism() == ExecutionConfig.PARALLELISM_DEFAULT) {
+ configuration.getOptional(CoreOptions.DEFAULT_PARALLELISM).ifPresent(this::setParallelism);
}
-
- // if parallelism is set in the ExecutorConfig, then
- // that value takes precedence over any other value.
-
- configuration.set(CoreOptions.DEFAULT_PARALLELISM, execParallelism);
}
/**