You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2020/06/16 09:14:40 UTC

[flink] 05/07: [FLINK-17018][runtime] Use OneSlotPerExecutionSlotAllocator on pipelined region scheduling

This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d75f186d99e8699a2dac802de803b25fdf612e01
Author: Zhu Zhu <re...@gmail.com>
AuthorDate: Sat Jun 6 13:26:28 2020 +0800

    [FLINK-17018][runtime] Use OneSlotPerExecutionSlotAllocator on pipelined region scheduling
---
 .../runtime/scheduler/DefaultSchedulerFactory.java | 34 +++++++++++--
 .../OneSlotPerExecutionSlotAllocatorFactory.java   | 55 ++++++++++++++++++++++
 2 files changed, 84 insertions(+), 5 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
index 1671b5e..333472b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
 import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
 import org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy;
 import org.apache.flink.runtime.scheduler.strategy.LazyFromSourcesSchedulingStrategy;
+import org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
 import org.apache.flink.runtime.shuffle.ShuffleMaster;
 
@@ -79,10 +80,12 @@ public class DefaultSchedulerFactory implements SchedulerNGFactory {
 			.create();
 		log.info("Using restart back off time strategy {} for {} ({}).", restartBackoffTimeStrategy, jobGraph.getName(), jobGraph.getJobID());
 
-		final SlotProviderStrategy slotProviderStrategy = SlotProviderStrategy.from(
-			jobGraph.getScheduleMode(),
-			slotProvider,
-			slotRequestTimeout);
+		final ExecutionSlotAllocatorFactory slotAllocatorFactory =
+			createExecutionSlotAllocatorFactory(
+				jobGraph.getScheduleMode(),
+				slotProvider,
+				slotRequestTimeout,
+				schedulingStrategyFactory);
 
 		return new DefaultScheduler(
 			log,
@@ -104,7 +107,7 @@ public class DefaultSchedulerFactory implements SchedulerNGFactory {
 			restartBackoffTimeStrategy,
 			new DefaultExecutionVertexOperations(),
 			new ExecutionVertexVersioner(),
-			new DefaultExecutionSlotAllocatorFactory(slotProviderStrategy));
+			slotAllocatorFactory);
 	}
 
 	static SchedulingStrategyFactory createSchedulingStrategyFactory(final ScheduleMode scheduleMode) {
@@ -118,4 +121,25 @@ public class DefaultSchedulerFactory implements SchedulerNGFactory {
 				throw new IllegalStateException("Unsupported schedule mode " + scheduleMode);
 		}
 	}
+
+	private static ExecutionSlotAllocatorFactory createExecutionSlotAllocatorFactory(
+			final ScheduleMode scheduleMode,
+			final SlotProvider slotProvider,
+			final Time slotRequestTimeout,
+			final SchedulingStrategyFactory schedulingStrategyFactory) {
+
+		if (schedulingStrategyFactory instanceof PipelinedRegionSchedulingStrategy.Factory) {
+			return new OneSlotPerExecutionSlotAllocatorFactory(
+				slotProvider,
+				scheduleMode != ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST,
+				slotRequestTimeout);
+		} else {
+			final SlotProviderStrategy slotProviderStrategy = SlotProviderStrategy.from(
+				scheduleMode,
+				slotProvider,
+				slotRequestTimeout);
+
+			return new DefaultExecutionSlotAllocatorFactory(slotProviderStrategy);
+		}
+	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/OneSlotPerExecutionSlotAllocatorFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/OneSlotPerExecutionSlotAllocatorFactory.java
new file mode 100644
index 0000000..8412b05
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/OneSlotPerExecutionSlotAllocatorFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Factory for {@link OneSlotPerExecutionSlotAllocator}.
+ */
+class OneSlotPerExecutionSlotAllocatorFactory implements ExecutionSlotAllocatorFactory {
+
+	private final SlotProvider slotProvider;
+
+	private final boolean slotWillBeOccupiedIndefinitely;
+
+	private final Time allocationTimeout;
+
+	OneSlotPerExecutionSlotAllocatorFactory(
+			final SlotProvider slotProvider,
+			final boolean slotWillBeOccupiedIndefinitely,
+			final Time allocationTimeout) {
+		this.slotProvider = checkNotNull(slotProvider);
+		this.slotWillBeOccupiedIndefinitely = slotWillBeOccupiedIndefinitely;
+		this.allocationTimeout = checkNotNull(allocationTimeout);
+	}
+
+	@Override
+	public ExecutionSlotAllocator createInstance(final PreferredLocationsRetriever preferredLocationsRetriever) {
+		return new OneSlotPerExecutionSlotAllocator(
+			slotProvider,
+			preferredLocationsRetriever,
+			slotWillBeOccupiedIndefinitely,
+			allocationTimeout);
+	}
+}