You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2019/07/23 01:33:33 UTC

[flink] branch master updated: [FLINK-13221][table-planner-blink] Blink planner should set ScheduleMode to LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST for batch jobs

This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new d5cb1b9  [FLINK-13221][table-planner-blink] Blink planner should set ScheduleMode to LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST for batch jobs
d5cb1b9 is described below

commit d5cb1b9e5aefdc73026985f67a991f099e8f4364
Author: liyafan82 <fa...@foxmail.com>
AuthorDate: Mon Jul 22 10:59:11 2019 +0800

    [FLINK-13221][table-planner-blink] Blink planner should set ScheduleMode to LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST for batch jobs
    
    This closes #9101
---
 .../AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java     | 9 +++++++++
 .../main/java/org/apache/flink/table/executor/BatchExecutor.java | 2 +-
 2 files changed, 10 insertions(+), 1 deletion(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java
index c203062..3c1ca2b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java
@@ -456,6 +456,15 @@ public class AdaptedRestartPipelinedRegionStrategyNGFailoverTest extends TestLog
 		}
 
 		@Override
+		public CompletableFuture<LogicalSlot> allocateBatchSlot(
+			final SlotRequestId slotRequestId,
+			final ScheduledUnit scheduledUnit,
+			final SlotProfile slotProfile,
+			final boolean allowQueuedScheduling) {
+			return allocateSlot(slotRequestId, scheduledUnit, slotProfile, allowQueuedScheduling, null);
+		}
+
+		@Override
 		public CompletableFuture<LogicalSlot> allocateSlot(
 				final SlotRequestId slotRequestId,
 				final ScheduledUnit scheduledUnit,
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/BatchExecutor.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/BatchExecutor.java
index f4bb2ee..adafab1 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/BatchExecutor.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/BatchExecutor.java
@@ -89,7 +89,7 @@ public class BatchExecutor extends ExecutorBase {
 			}
 		});
 		streamGraph.setChaining(true);
-		streamGraph.setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES);
+		streamGraph.setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST);
 		streamGraph.setStateBackend(null);
 		streamGraph.getCheckpointConfig().setCheckpointInterval(Long.MAX_VALUE);
 		if (isShuffleModeAllBatch()) {