You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2019/07/12 19:05:37 UTC
[flink] 10/10: [FLINK-13250][blink runner] Make sure that all nodes
have a concrete resource profile
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit d56c45a090de689a01fc8ddb884e6baa29568322
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Fri Jul 12 18:28:36 2019 +0200
[FLINK-13250][blink runner] Make sure that all nodes have a concrete resource profile
This change is covered by various existing integration tests that failed prior to this fix.
---
.../src/main/java/org/apache/flink/table/executor/BatchExecutor.java | 4 +++-
1 file changed, 3 insertions(+), 1 deletion(-)
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/BatchExecutor.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/BatchExecutor.java
index 8a90b80..f4bb2ee 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/BatchExecutor.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/BatchExecutor.java
@@ -84,7 +84,9 @@ public class BatchExecutor extends ExecutorBase {
// All transformations should set managed memory size.
ResourceSpec managedResourceSpec = NodeResourceUtil.fromManagedMem(0);
streamGraph.getStreamNodes().forEach(sn -> {
- sn.setResources(sn.getMinResources().merge(managedResourceSpec), sn.getPreferredResources().merge(managedResourceSpec));
+ if (sn.getMinResources().equals(ResourceSpec.DEFAULT)) {
+ sn.setResources(managedResourceSpec, managedResourceSpec);
+ }
});
streamGraph.setChaining(true);
streamGraph.setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES);