You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2020/06/16 09:14:40 UTC
[flink] 05/07: [FLINK-17018][runtime] Use
OneSlotPerExecutionSlotAllocator on pipelined region scheduling
This is an automated email from the ASF dual-hosted git repository.
zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit d75f186d99e8699a2dac802de803b25fdf612e01
Author: Zhu Zhu <re...@gmail.com>
AuthorDate: Sat Jun 6 13:26:28 2020 +0800
[FLINK-17018][runtime] Use OneSlotPerExecutionSlotAllocator on pipelined region scheduling
---
.../runtime/scheduler/DefaultSchedulerFactory.java | 34 +++++++++++--
.../OneSlotPerExecutionSlotAllocatorFactory.java | 55 ++++++++++++++++++++++
2 files changed, 84 insertions(+), 5 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
index 1671b5e..333472b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
import org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy;
import org.apache.flink.runtime.scheduler.strategy.LazyFromSourcesSchedulingStrategy;
+import org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy;
import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
@@ -79,10 +80,12 @@ public class DefaultSchedulerFactory implements SchedulerNGFactory {
.create();
log.info("Using restart back off time strategy {} for {} ({}).", restartBackoffTimeStrategy, jobGraph.getName(), jobGraph.getJobID());
- final SlotProviderStrategy slotProviderStrategy = SlotProviderStrategy.from(
- jobGraph.getScheduleMode(),
- slotProvider,
- slotRequestTimeout);
+ final ExecutionSlotAllocatorFactory slotAllocatorFactory =
+ createExecutionSlotAllocatorFactory(
+ jobGraph.getScheduleMode(),
+ slotProvider,
+ slotRequestTimeout,
+ schedulingStrategyFactory);
return new DefaultScheduler(
log,
@@ -104,7 +107,7 @@ public class DefaultSchedulerFactory implements SchedulerNGFactory {
restartBackoffTimeStrategy,
new DefaultExecutionVertexOperations(),
new ExecutionVertexVersioner(),
- new DefaultExecutionSlotAllocatorFactory(slotProviderStrategy));
+ slotAllocatorFactory);
}
static SchedulingStrategyFactory createSchedulingStrategyFactory(final ScheduleMode scheduleMode) {
@@ -118,4 +121,25 @@ public class DefaultSchedulerFactory implements SchedulerNGFactory {
throw new IllegalStateException("Unsupported schedule mode " + scheduleMode);
}
}
+
+ private static ExecutionSlotAllocatorFactory createExecutionSlotAllocatorFactory(
+ final ScheduleMode scheduleMode,
+ final SlotProvider slotProvider,
+ final Time slotRequestTimeout,
+ final SchedulingStrategyFactory schedulingStrategyFactory) {
+
+ if (schedulingStrategyFactory instanceof PipelinedRegionSchedulingStrategy.Factory) {
+ return new OneSlotPerExecutionSlotAllocatorFactory(
+ slotProvider,
+ scheduleMode != ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST,
+ slotRequestTimeout);
+ } else {
+ final SlotProviderStrategy slotProviderStrategy = SlotProviderStrategy.from(
+ scheduleMode,
+ slotProvider,
+ slotRequestTimeout);
+
+ return new DefaultExecutionSlotAllocatorFactory(slotProviderStrategy);
+ }
+ }
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/OneSlotPerExecutionSlotAllocatorFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/OneSlotPerExecutionSlotAllocatorFactory.java
new file mode 100644
index 0000000..8412b05
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/OneSlotPerExecutionSlotAllocatorFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Factory for {@link OneSlotPerExecutionSlotAllocator}.
+ */
+class OneSlotPerExecutionSlotAllocatorFactory implements ExecutionSlotAllocatorFactory {
+
+ private final SlotProvider slotProvider;
+
+ private final boolean slotWillBeOccupiedIndefinitely;
+
+ private final Time allocationTimeout;
+
+ OneSlotPerExecutionSlotAllocatorFactory(
+ final SlotProvider slotProvider,
+ final boolean slotWillBeOccupiedIndefinitely,
+ final Time allocationTimeout) {
+ this.slotProvider = checkNotNull(slotProvider);
+ this.slotWillBeOccupiedIndefinitely = slotWillBeOccupiedIndefinitely;
+ this.allocationTimeout = checkNotNull(allocationTimeout);
+ }
+
+ @Override
+ public ExecutionSlotAllocator createInstance(final PreferredLocationsRetriever preferredLocationsRetriever) {
+ return new OneSlotPerExecutionSlotAllocator(
+ slotProvider,
+ preferredLocationsRetriever,
+ slotWillBeOccupiedIndefinitely,
+ allocationTimeout);
+ }
+}