You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2019/07/12 19:05:37 UTC

[flink] 10/10: [FLINK-13250][blink runner] Make sure that all nodes have a concrete resource profile

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d56c45a090de689a01fc8ddb884e6baa29568322
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Fri Jul 12 18:28:36 2019 +0200

    [FLINK-13250][blink runner] Make sure that all nodes have a concrete resource profile
    
    This change is covered by various existing integration tests that failed prior to this fix.
---
 .../src/main/java/org/apache/flink/table/executor/BatchExecutor.java  | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/BatchExecutor.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/BatchExecutor.java
index 8a90b80..f4bb2ee 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/BatchExecutor.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/BatchExecutor.java
@@ -84,7 +84,9 @@ public class BatchExecutor extends ExecutorBase {
 		// All transformations should set managed memory size.
 		ResourceSpec managedResourceSpec = NodeResourceUtil.fromManagedMem(0);
 		streamGraph.getStreamNodes().forEach(sn -> {
-			sn.setResources(sn.getMinResources().merge(managedResourceSpec), sn.getPreferredResources().merge(managedResourceSpec));
+			if (sn.getMinResources().equals(ResourceSpec.DEFAULT)) {
+				sn.setResources(managedResourceSpec, managedResourceSpec);
+			}
 		});
 		streamGraph.setChaining(true);
 		streamGraph.setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES);