You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2019/07/23 01:33:33 UTC
[flink] branch master updated: [FLINK-13221][table-planner-blink]
Blink planner should set ScheduleMode to
LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST for batch jobs
This is an automated email from the ASF dual-hosted git repository.
kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new d5cb1b9 [FLINK-13221][table-planner-blink] Blink planner should set ScheduleMode to LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST for batch jobs
d5cb1b9 is described below
commit d5cb1b9e5aefdc73026985f67a991f099e8f4364
Author: liyafan82 <fa...@foxmail.com>
AuthorDate: Mon Jul 22 10:59:11 2019 +0800
[FLINK-13221][table-planner-blink] Blink planner should set ScheduleMode to LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST for batch jobs
This closes #9101
---
.../AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java | 9 +++++++++
.../main/java/org/apache/flink/table/executor/BatchExecutor.java | 2 +-
2 files changed, 10 insertions(+), 1 deletion(-)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java
index c203062..3c1ca2b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java
@@ -456,6 +456,15 @@ public class AdaptedRestartPipelinedRegionStrategyNGFailoverTest extends TestLog
}
@Override
+ public CompletableFuture<LogicalSlot> allocateBatchSlot(
+ final SlotRequestId slotRequestId,
+ final ScheduledUnit scheduledUnit,
+ final SlotProfile slotProfile,
+ final boolean allowQueuedScheduling) {
+ return allocateSlot(slotRequestId, scheduledUnit, slotProfile, allowQueuedScheduling, null);
+ }
+
+ @Override
public CompletableFuture<LogicalSlot> allocateSlot(
final SlotRequestId slotRequestId,
final ScheduledUnit scheduledUnit,
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/BatchExecutor.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/BatchExecutor.java
index f4bb2ee..adafab1 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/BatchExecutor.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/BatchExecutor.java
@@ -89,7 +89,7 @@ public class BatchExecutor extends ExecutorBase {
}
});
streamGraph.setChaining(true);
- streamGraph.setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES);
+ streamGraph.setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST);
streamGraph.setStateBackend(null);
streamGraph.getCheckpointConfig().setCheckpointInterval(Long.MAX_VALUE);
if (isShuffleModeAllBatch()) {