You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2020/06/16 09:14:35 UTC

[flink] branch master updated (efd4974 -> 0aea300)

This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from efd4974  [hotfix][e2e] Sync kafka 0.10 versions
     new 4b7d8db  [hotfix][runtime] Move shared static test methods of physical slot into PhysicalSlotTestUtils
     new a1c9a30  [hotfix][runtime] Move shared static test methods of slot allocator into ExecutionSlotAllocatorTestUtils
     new 6d9eb50  [FLINK-17018][runtime] Extract common logics of DefaultExecutionSlotAllocator into AbstractExecutionSlotAllocator
     new 95cf59a  [FLINK-17018][runtime] Introduce OneSlotPerExecutionSlotAllocator which will request one physical slot for each single execution vertex
     new d75f186  [FLINK-17018][runtime] Use OneSlotPerExecutionSlotAllocator on pipelined region scheduling
     new a10f1a47 [hotfix][runtime] Narrow down the access scope of DefaultExecutionSlotAllocator
     new 0aea300  [hotfix][runtime] Narrow down the access scope of SlotExecutionVertexAssignment

The 7 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../jobmaster/slotpool/PhysicalSlotRequest.java    |   6 +-
 .../jobmaster/slotpool/SingleLogicalSlot.java      |   2 +-
 .../scheduler/AbstractExecutionSlotAllocator.java  | 131 +++++++++
 .../scheduler/DefaultExecutionSlotAllocator.java   | 134 +++------
 .../DefaultExecutionSlotAllocatorFactory.java      |   4 +-
 .../runtime/scheduler/DefaultSchedulerFactory.java |  34 ++-
 .../OneSlotPerExecutionSlotAllocator.java          | 217 +++++++++++++++
 ...> OneSlotPerExecutionSlotAllocatorFactory.java} |  26 +-
 .../scheduler/SlotExecutionVertexAssignment.java   |   8 +-
 .../slotpool/AllocatedSlotOccupationTest.java      |  35 +--
 .../PhysicalSlotRequestBulkCheckerTest.java        |  28 +-
 ...upationTest.java => PhysicalSlotTestUtils.java} |  33 +--
 .../AbstractExecutionSlotAllocatorTest.java        | 178 ++++++++++++
 .../DefaultExecutionSlotAllocatorTest.java         | 151 ++--------
 .../scheduler/ExecutionSlotAllocatorTestUtils.java |  61 +++++
 .../OneSlotPerExecutionSlotAllocatorTest.java      | 304 +++++++++++++++++++++
 16 files changed, 1035 insertions(+), 317 deletions(-)
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/AbstractExecutionSlotAllocator.java
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/OneSlotPerExecutionSlotAllocator.java
 copy flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/{DefaultExecutionSlotAllocatorFactory.java => OneSlotPerExecutionSlotAllocatorFactory.java} (56%)
 copy flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/{AllocatedSlotOccupationTest.java => PhysicalSlotTestUtils.java} (66%)
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/AbstractExecutionSlotAllocatorTest.java
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/ExecutionSlotAllocatorTestUtils.java
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/OneSlotPerExecutionSlotAllocatorTest.java


[flink] 02/07: [hotfix][runtime] Move shared static test methods of slot allocator into ExecutionSlotAllocatorTestUtils

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a1c9a30e322b0b3adfeadc3d01a2cbfced9fa74e
Author: Zhu Zhu <re...@gmail.com>
AuthorDate: Thu Jun 11 10:37:44 2020 +0800

    [hotfix][runtime] Move shared static test methods of slot allocator into ExecutionSlotAllocatorTestUtils
---
 .../DefaultExecutionSlotAllocatorTest.java         | 23 +-------
 .../scheduler/ExecutionSlotAllocatorTestUtils.java | 61 ++++++++++++++++++++++
 2 files changed, 63 insertions(+), 21 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorTest.java
index a6a559e..ad74357 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorTest.java
@@ -54,6 +54,8 @@ import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorTestUtils.createSchedulingRequirements;
+import static org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorTestUtils.findSlotAssignmentByExecutionVertexId;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.empty;
@@ -280,27 +282,6 @@ public class DefaultExecutionSlotAllocatorTest extends TestLogger {
 			new DefaultPreferredLocationsRetriever(stateLocationRetriever, inputsLocationsRetriever));
 	}
 
-	private List<ExecutionVertexSchedulingRequirements> createSchedulingRequirements(ExecutionVertexID... executionVertexIds) {
-		List<ExecutionVertexSchedulingRequirements> schedulingRequirements = new ArrayList<>(executionVertexIds.length);
-
-		for (ExecutionVertexID executionVertexId : executionVertexIds) {
-			schedulingRequirements.add(new ExecutionVertexSchedulingRequirements.Builder()
-					.withExecutionVertexId(executionVertexId).build());
-		}
-		return schedulingRequirements;
-	}
-
-	private SlotExecutionVertexAssignment findSlotAssignmentByExecutionVertexId(
-			ExecutionVertexID executionVertexId,
-			Collection<SlotExecutionVertexAssignment> slotExecutionVertexAssignments) {
-		return slotExecutionVertexAssignments.stream()
-				.filter(slotExecutionVertexAssignment -> slotExecutionVertexAssignment.getExecutionVertexId().equals(executionVertexId))
-				.findFirst()
-				.orElseThrow(() -> new IllegalArgumentException(String.format(
-						"SlotExecutionVertexAssignment with execution vertex id %s not found",
-						executionVertexId)));
-	}
-
 	private static class AllocationToggableSlotProvider implements SlotProvider {
 
 		private final List<Tuple3<SlotRequestId, ScheduledUnit, SlotProfile>> slotAllocationRequests = new ArrayList<>();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/ExecutionSlotAllocatorTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/ExecutionSlotAllocatorTestUtils.java
new file mode 100644
index 0000000..426d5f7
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/ExecutionSlotAllocatorTestUtils.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Test utils for {@link ExecutionSlotAllocator}.
+ */
+class ExecutionSlotAllocatorTestUtils {
+
+	static List<ExecutionVertexSchedulingRequirements> createSchedulingRequirements(
+			final ExecutionVertexID... executionVertexIds) {
+
+		final List<ExecutionVertexSchedulingRequirements> schedulingRequirements =
+			new ArrayList<>(executionVertexIds.length);
+
+		for (ExecutionVertexID executionVertexId : executionVertexIds) {
+			schedulingRequirements.add(new ExecutionVertexSchedulingRequirements.Builder()
+					.withExecutionVertexId(executionVertexId).build());
+		}
+		return schedulingRequirements;
+	}
+
+	static SlotExecutionVertexAssignment findSlotAssignmentByExecutionVertexId(
+			final ExecutionVertexID executionVertexId,
+			final Collection<SlotExecutionVertexAssignment> slotExecutionVertexAssignments) {
+
+		return slotExecutionVertexAssignments.stream()
+				.filter(assignment -> assignment.getExecutionVertexId().equals(executionVertexId))
+				.findFirst()
+				.orElseThrow(
+					() ->
+						new IllegalArgumentException(String.format(
+							"SlotExecutionVertexAssignment with execution vertex id %s not found",
+							executionVertexId)));
+	}
+
+	private ExecutionSlotAllocatorTestUtils() {
+	}
+}


[flink] 01/07: [hotfix][runtime] Move shared static test methods of physical slot into PhysicalSlotTestUtils

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4b7d8db3aca409ae18dff635c70720d9a827cd06
Author: Zhu Zhu <re...@gmail.com>
AuthorDate: Sat Jun 6 02:13:36 2020 +0800

    [hotfix][runtime] Move shared static test methods of physical slot into PhysicalSlotTestUtils
---
 .../slotpool/AllocatedSlotOccupationTest.java      | 35 +++-------------------
 .../PhysicalSlotRequestBulkCheckerTest.java        | 28 ++++-------------
 ...upationTest.java => PhysicalSlotTestUtils.java} | 33 +++++---------------
 3 files changed, 17 insertions(+), 79 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotOccupationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotOccupationTest.java
index f68ff75..ce37266 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotOccupationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotOccupationTest.java
@@ -18,18 +18,12 @@
 
 package org.apache.flink.runtime.jobmaster.slotpool;
 
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
-import org.apache.flink.runtime.jobmanager.scheduler.Locality;
-import org.apache.flink.runtime.jobmanager.slots.TestingSlotOwner;
-import org.apache.flink.runtime.jobmaster.LogicalSlot;
-import org.apache.flink.runtime.jobmaster.SlotRequestId;
-import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
+import static org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotTestUtils.createPhysicalSlot;
+import static org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotTestUtils.occupyPhysicalSlot;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertThat;
 
@@ -41,7 +35,7 @@ public class AllocatedSlotOccupationTest extends TestLogger {
 	@Test
 	public void testSingleTaskOccupyingSlotIndefinitely() {
 		final PhysicalSlot physicalSlot = createPhysicalSlot();
-		allocateSingleLogicalSlotFromPhysicalSlot(physicalSlot, true);
+		occupyPhysicalSlot(physicalSlot, true);
 
 		assertThat(physicalSlot.willBeOccupiedIndefinitely(), is(true));
 	}
@@ -49,29 +43,8 @@ public class AllocatedSlotOccupationTest extends TestLogger {
 	@Test
 	public void testSingleTaskNotOccupyingSlotIndefinitely() {
 		final PhysicalSlot physicalSlot = createPhysicalSlot();
-		allocateSingleLogicalSlotFromPhysicalSlot(physicalSlot, false);
+		occupyPhysicalSlot(physicalSlot, false);
 
 		assertThat(physicalSlot.willBeOccupiedIndefinitely(), is(false));
 	}
-
-	private static PhysicalSlot createPhysicalSlot() {
-		return new AllocatedSlot(
-			new AllocationID(),
-			new LocalTaskManagerLocation(),
-			0,
-			ResourceProfile.ANY,
-			new SimpleAckingTaskManagerGateway());
-	}
-
-	private static LogicalSlot allocateSingleLogicalSlotFromPhysicalSlot(
-			final PhysicalSlot physicalSlot,
-			final boolean slotWillBeOccupiedIndefinitely) {
-
-		return SingleLogicalSlot.allocateFromPhysicalSlot(
-			new SlotRequestId(),
-			physicalSlot,
-			Locality.UNKNOWN,
-			new TestingSlotOwner(),
-			slotWillBeOccupiedIndefinitely);
-	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkCheckerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkCheckerTest.java
index 557ba54..2c58f2f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkCheckerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkCheckerTest.java
@@ -22,12 +22,8 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotProfile;
-import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
-import org.apache.flink.runtime.jobmanager.scheduler.Locality;
-import org.apache.flink.runtime.jobmanager.slots.TestingSlotOwner;
 import org.apache.flink.runtime.jobmaster.SlotInfo;
 import org.apache.flink.runtime.jobmaster.SlotRequestId;
-import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.clock.ManualClock;
 
@@ -41,6 +37,8 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 
+import static org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotTestUtils.createPhysicalSlot;
+import static org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotTestUtils.occupyPhysicalSlot;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertThat;
 
@@ -99,7 +97,7 @@ public class PhysicalSlotRequestBulkCheckerTest extends TestLogger {
 			Collections.singletonList(createPhysicalSlotRequest()));
 
 		final PhysicalSlot slot = addOneSlot();
-		occupySlot(slot, false);
+		occupyPhysicalSlot(slot, false);
 
 		assertThat(checkBulkTimeout(bulk), is(PhysicalSlotRequestBulkChecker.TimeoutCheckResult.PENDING));
 	}
@@ -153,7 +151,7 @@ public class PhysicalSlotRequestBulkCheckerTest extends TestLogger {
 		final PhysicalSlot slot1 = addOneSlot();
 		addOneSlot();
 
-		occupySlot(slot1, true);
+		occupyPhysicalSlot(slot1, true);
 
 		assertThat(isFulfillable(bulk), is(false));
 	}
@@ -166,7 +164,7 @@ public class PhysicalSlotRequestBulkCheckerTest extends TestLogger {
 		final PhysicalSlot slot1 = addOneSlot();
 		addOneSlot();
 
-		occupySlot(slot1, false);
+		occupyPhysicalSlot(slot1, false);
 
 		assertThat(isFulfillable(bulk), is(true));
 	}
@@ -178,22 +176,8 @@ public class PhysicalSlotRequestBulkCheckerTest extends TestLogger {
 			true);
 	}
 
-	private static void occupySlot(final PhysicalSlot slotToOccupy, final boolean slotWillBeOccupiedIndefinitely) {
-		SingleLogicalSlot.allocateFromPhysicalSlot(
-			new SlotRequestId(),
-			slotToOccupy,
-			Locality.UNKNOWN,
-			new TestingSlotOwner(),
-			slotWillBeOccupiedIndefinitely);
-	}
-
 	private PhysicalSlot addOneSlot() {
-		final PhysicalSlot slot = new AllocatedSlot(
-			new AllocationID(),
-			new LocalTaskManagerLocation(),
-			0,
-			ResourceProfile.ANY,
-			new SimpleAckingTaskManagerGateway());
+		final PhysicalSlot slot = createPhysicalSlot();
 		slots.add(slot);
 
 		return slot;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotOccupationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotTestUtils.java
similarity index 66%
copy from flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotOccupationTest.java
copy to flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotTestUtils.java
index f68ff75..a1bef43 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotOccupationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotTestUtils.java
@@ -26,35 +26,13 @@ import org.apache.flink.runtime.jobmanager.slots.TestingSlotOwner;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.SlotRequestId;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Test;
-
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertThat;
 
 /**
- * Tests whether the slot occupation state of {@link AllocatedSlot} is correctly.
+ * Test utils of {@link PhysicalSlot}.
  */
-public class AllocatedSlotOccupationTest extends TestLogger {
-
-	@Test
-	public void testSingleTaskOccupyingSlotIndefinitely() {
-		final PhysicalSlot physicalSlot = createPhysicalSlot();
-		allocateSingleLogicalSlotFromPhysicalSlot(physicalSlot, true);
-
-		assertThat(physicalSlot.willBeOccupiedIndefinitely(), is(true));
-	}
+public class PhysicalSlotTestUtils {
 
-	@Test
-	public void testSingleTaskNotOccupyingSlotIndefinitely() {
-		final PhysicalSlot physicalSlot = createPhysicalSlot();
-		allocateSingleLogicalSlotFromPhysicalSlot(physicalSlot, false);
-
-		assertThat(physicalSlot.willBeOccupiedIndefinitely(), is(false));
-	}
-
-	private static PhysicalSlot createPhysicalSlot() {
+	public static PhysicalSlot createPhysicalSlot() {
 		return new AllocatedSlot(
 			new AllocationID(),
 			new LocalTaskManagerLocation(),
@@ -63,7 +41,7 @@ public class AllocatedSlotOccupationTest extends TestLogger {
 			new SimpleAckingTaskManagerGateway());
 	}
 
-	private static LogicalSlot allocateSingleLogicalSlotFromPhysicalSlot(
+	public static LogicalSlot occupyPhysicalSlot(
 			final PhysicalSlot physicalSlot,
 			final boolean slotWillBeOccupiedIndefinitely) {
 
@@ -74,4 +52,7 @@ public class AllocatedSlotOccupationTest extends TestLogger {
 			new TestingSlotOwner(),
 			slotWillBeOccupiedIndefinitely);
 	}
+
+	private PhysicalSlotTestUtils() {
+	}
 }


[flink] 06/07: [hotfix][runtime] Narrow down the access scope of DefaultExecutionSlotAllocator

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a10f1a47ca200bcc1d0b63e068459729854302dc
Author: Zhu Zhu <re...@gmail.com>
AuthorDate: Thu Jun 11 23:01:49 2020 +0800

    [hotfix][runtime] Narrow down the access scope of DefaultExecutionSlotAllocator
---
 .../apache/flink/runtime/scheduler/DefaultExecutionSlotAllocator.java | 4 ++--
 .../flink/runtime/scheduler/DefaultExecutionSlotAllocatorFactory.java | 4 ++--
 2 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocator.java
index 07da10e..f54aa24 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocator.java
@@ -43,13 +43,13 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * Default {@link ExecutionSlotAllocator} which will use {@link SlotProvider} to allocate slots and
  * keep the unfulfilled requests for further cancellation.
  */
-public class DefaultExecutionSlotAllocator extends AbstractExecutionSlotAllocator {
+class DefaultExecutionSlotAllocator extends AbstractExecutionSlotAllocator {
 
 	private static final Logger LOG = LoggerFactory.getLogger(DefaultExecutionSlotAllocator.class);
 
 	private final SlotProviderStrategy slotProviderStrategy;
 
-	public DefaultExecutionSlotAllocator(
+	DefaultExecutionSlotAllocator(
 			final SlotProviderStrategy slotProviderStrategy,
 			final PreferredLocationsRetriever preferredLocationsRetriever) {
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorFactory.java
index ef8a4e1..4324c3b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorFactory.java
@@ -26,11 +26,11 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 /**
  * Factory for {@link DefaultExecutionSlotAllocator}.
  */
-public class DefaultExecutionSlotAllocatorFactory implements ExecutionSlotAllocatorFactory {
+class DefaultExecutionSlotAllocatorFactory implements ExecutionSlotAllocatorFactory {
 
 	private final SlotProviderStrategy slotProvider;
 
-	public DefaultExecutionSlotAllocatorFactory(final SlotProviderStrategy slotProvider) {
+	DefaultExecutionSlotAllocatorFactory(final SlotProviderStrategy slotProvider) {
 		this.slotProvider = checkNotNull(slotProvider);
 	}
 


[flink] 04/07: [FLINK-17018][runtime] Introduce OneSlotPerExecutionSlotAllocator which will request one physical slot for each single execution vertex

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 95cf59a669610bfcf12a561281322113ea32cd90
Author: Zhu Zhu <re...@gmail.com>
AuthorDate: Thu Jun 11 22:57:00 2020 +0800

    [FLINK-17018][runtime] Introduce OneSlotPerExecutionSlotAllocator which will request one physical slot for each single execution vertex
    
    OneSlotPerExecutionSlotAllocator allocates slots in bulks so that the SlotProvider can check whether this bulk of slot requests can be fulfilled at the same time.
    It has several limitations:
    1. Slot sharing will be ignored.
    2. Co-location constraints are not allowed.
    3. Intra-bulk input location preferences will be ignored.
---
 .../jobmaster/slotpool/PhysicalSlotRequest.java    |   6 +-
 .../jobmaster/slotpool/SingleLogicalSlot.java      |   2 +-
 .../OneSlotPerExecutionSlotAllocator.java          | 217 +++++++++++++++
 .../OneSlotPerExecutionSlotAllocatorTest.java      | 304 +++++++++++++++++++++
 4 files changed, 525 insertions(+), 4 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequest.java
index b953e43..60030ca 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequest.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequest.java
@@ -42,11 +42,11 @@ public class PhysicalSlotRequest {
 		this.slotWillBeOccupiedIndefinitely = slotWillBeOccupiedIndefinitely;
 	}
 
-	SlotRequestId getSlotRequestId() {
+	public SlotRequestId getSlotRequestId() {
 		return slotRequestId;
 	}
 
-	SlotProfile getSlotProfile() {
+	public SlotProfile getSlotProfile() {
 		return slotProfile;
 	}
 
@@ -63,7 +63,7 @@ public class PhysicalSlotRequest {
 
 		private final PhysicalSlot physicalSlot;
 
-		Result(final SlotRequestId slotRequestId, final PhysicalSlot physicalSlot) {
+		public Result(final SlotRequestId slotRequestId, final PhysicalSlot physicalSlot) {
 			this.slotRequestId = slotRequestId;
 			this.physicalSlot = physicalSlot;
 		}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java
index 710f003..f5ba44c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java
@@ -166,7 +166,7 @@ public class SingleLogicalSlot implements LogicalSlot, PhysicalSlot.Payload {
 		return slotSharingGroupId;
 	}
 
-	static SingleLogicalSlot allocateFromPhysicalSlot(
+	public static SingleLogicalSlot allocateFromPhysicalSlot(
 			final SlotRequestId slotRequestId,
 			final PhysicalSlot physicalSlot,
 			final Locality locality,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/OneSlotPerExecutionSlotAllocator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/OneSlotPerExecutionSlotAllocator.java
new file mode 100644
index 0000000..c350f5d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/OneSlotPerExecutionSlotAllocator.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.slotpool.BulkSlotProvider;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequest;
+import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.util.FlinkException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * This slot allocator will request one physical slot for each single execution vertex.
+ * The slots will be requested in bulks so that the {@link SlotProvider} can check
+ * whether this bulk of slot requests can be fulfilled at the same time.
+ * It has several limitations:
+ *
+ * <p>1. Slot sharing will be ignored.
+ *
+ * <p>2. Co-location constraints are not allowed.
+ *
+ * <p>3. Intra-bulk input location preferences will be ignored.
+ */
+class OneSlotPerExecutionSlotAllocator extends AbstractExecutionSlotAllocator implements SlotOwner {
+
+	private static final Logger LOG = LoggerFactory.getLogger(OneSlotPerExecutionSlotAllocator.class);
+
+	private final BulkSlotProvider slotProvider;
+
+	private final boolean slotWillBeOccupiedIndefinitely;
+
+	private final Time allocationTimeout;
+
+	OneSlotPerExecutionSlotAllocator(
+			final BulkSlotProvider slotProvider,
+			final PreferredLocationsRetriever preferredLocationsRetriever,
+			final boolean slotWillBeOccupiedIndefinitely,
+			final Time allocationTimeout) {
+
+		super(preferredLocationsRetriever);
+
+		this.slotProvider = checkNotNull(slotProvider);
+		this.slotWillBeOccupiedIndefinitely = slotWillBeOccupiedIndefinitely;
+		this.allocationTimeout = checkNotNull(allocationTimeout);
+	}
+
+	@Override
+	public List<SlotExecutionVertexAssignment> allocateSlotsFor(
+			final List<ExecutionVertexSchedulingRequirements> executionVertexSchedulingRequirements) {
+
+		validateSchedulingRequirements(executionVertexSchedulingRequirements);
+
+		validateNoCoLocationConstraint(executionVertexSchedulingRequirements);
+
+		// LinkedHashMap is needed to retain the given order
+		final LinkedHashMap<SlotRequestId, SlotExecutionVertexAssignment> slotExecutionVertexAssignments =
+			createSlotExecutionVertexAssignments(executionVertexSchedulingRequirements);
+
+		final Map<ExecutionVertexID, SlotRequestId> executionVertexSlotRequestIds = slotExecutionVertexAssignments
+			.entrySet()
+			.stream()
+			.collect(Collectors.toMap(e -> e.getValue().getExecutionVertexId(), Map.Entry::getKey));
+
+		final List<CompletableFuture<PhysicalSlotRequest>> physicalSlotRequestFutures =
+			createPhysicalSlotRequestFutures(
+				executionVertexSchedulingRequirements,
+				executionVertexSlotRequestIds);
+
+		allocateSlotsForAssignments(
+			physicalSlotRequestFutures,
+			slotExecutionVertexAssignments);
+
+		return Collections.unmodifiableList(new ArrayList<>(slotExecutionVertexAssignments.values()));
+	}
+
+	private static void validateNoCoLocationConstraint(
+			final Collection<ExecutionVertexSchedulingRequirements> schedulingRequirements) {
+
+		final boolean hasCoLocationConstraint = schedulingRequirements.stream()
+			.anyMatch(r -> r.getCoLocationConstraint() != null);
+		checkState(
+			!hasCoLocationConstraint,
+			"Jobs with co-location constraints are not allowed to run with pipelined region scheduling strategy.");
+	}
+
+	private LinkedHashMap<SlotRequestId, SlotExecutionVertexAssignment> createSlotExecutionVertexAssignments(
+			final List<ExecutionVertexSchedulingRequirements> executionVertexSchedulingRequirements) {
+
+		final LinkedHashMap<SlotRequestId, SlotExecutionVertexAssignment> assignments = new LinkedHashMap<>();
+		for (ExecutionVertexSchedulingRequirements schedulingRequirements : executionVertexSchedulingRequirements) {
+			final ExecutionVertexID executionVertexId = schedulingRequirements.getExecutionVertexId();
+
+			final SlotRequestId slotRequestId = new SlotRequestId();
+			final SlotExecutionVertexAssignment slotExecutionVertexAssignment =
+				createAndRegisterSlotExecutionVertexAssignment(
+					executionVertexId,
+					new CompletableFuture<>(),
+					throwable -> slotProvider.cancelSlotRequest(slotRequestId, throwable));
+			assignments.put(slotRequestId, slotExecutionVertexAssignment);
+		}
+
+		return assignments;
+	}
+
+	private List<CompletableFuture<PhysicalSlotRequest>> createPhysicalSlotRequestFutures(
+			final List<ExecutionVertexSchedulingRequirements> executionVertexSchedulingRequirements,
+			final Map<ExecutionVertexID, SlotRequestId> executionVertexSlotRequestIds) {
+
+		final Set<AllocationID> allPreviousAllocationIds =
+			computeAllPriorAllocationIds(executionVertexSchedulingRequirements);
+
+		final List<CompletableFuture<PhysicalSlotRequest>> physicalSlotRequestFutures =
+			new ArrayList<>(executionVertexSchedulingRequirements.size());
+		for (ExecutionVertexSchedulingRequirements schedulingRequirements : executionVertexSchedulingRequirements) {
+			final ExecutionVertexID executionVertexId = schedulingRequirements.getExecutionVertexId();
+			final SlotRequestId slotRequestId = executionVertexSlotRequestIds.get(executionVertexId);
+
+			LOG.debug("Allocate slot with id {} for execution {}", slotRequestId, executionVertexId);
+
+			// use the task resource profile as the physical slot resource requirement since slot sharing is ignored
+			final CompletableFuture<SlotProfile> slotProfileFuture = getSlotProfileFuture(
+				schedulingRequirements,
+				schedulingRequirements.getTaskResourceProfile(),
+				executionVertexSlotRequestIds.keySet(),
+				allPreviousAllocationIds);
+
+			final CompletableFuture<PhysicalSlotRequest> physicalSlotRequestFuture =
+				slotProfileFuture.thenApply(
+					slotProfile -> createPhysicalSlotRequest(slotRequestId, slotProfile));
+			physicalSlotRequestFutures.add(physicalSlotRequestFuture);
+		}
+
+		return physicalSlotRequestFutures;
+	}
+
+	private PhysicalSlotRequest createPhysicalSlotRequest(
+			final SlotRequestId slotRequestId,
+			final SlotProfile slotProfile) {
+		return new PhysicalSlotRequest(slotRequestId, slotProfile, slotWillBeOccupiedIndefinitely);
+	}
+
+	private void allocateSlotsForAssignments(
+			final List<CompletableFuture<PhysicalSlotRequest>> physicalSlotRequestFutures,
+			final Map<SlotRequestId, SlotExecutionVertexAssignment> slotExecutionVertexAssignments) {
+
+		FutureUtils.combineAll(physicalSlotRequestFutures)
+			.thenCompose(physicalSlotRequests -> slotProvider.allocatePhysicalSlots(physicalSlotRequests, allocationTimeout))
+			.thenAccept(physicalSlotRequestResults -> {
+				for (PhysicalSlotRequest.Result result : physicalSlotRequestResults) {
+					final SlotRequestId slotRequestId = result.getSlotRequestId();
+					final SlotExecutionVertexAssignment assignment = slotExecutionVertexAssignments.get(slotRequestId);
+
+					checkState(assignment != null);
+
+					final LogicalSlot logicalSlot = SingleLogicalSlot.allocateFromPhysicalSlot(
+						slotRequestId,
+						result.getPhysicalSlot(),
+						Locality.UNKNOWN,
+						this,
+						slotWillBeOccupiedIndefinitely);
+					assignment.getLogicalSlotFuture().complete(logicalSlot);
+				}
+			})
+			.exceptionally(ex -> {
+				slotExecutionVertexAssignments.values().forEach(
+					assignment -> assignment.getLogicalSlotFuture().completeExceptionally(ex));
+				return null;
+			});
+	}
+
+	@Override
+	public void returnLogicalSlot(LogicalSlot logicalSlot) {
+		slotProvider.cancelSlotRequest(
+			logicalSlot.getSlotRequestId(),
+			new FlinkException("Slot is being returned to OneSlotPerExecutionSlotAllocator."));
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/OneSlotPerExecutionSlotAllocatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/OneSlotPerExecutionSlotAllocatorTest.java
new file mode 100644
index 0000000..c74f762
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/OneSlotPerExecutionSlotAllocatorTest.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.slotpool.BulkSlotProvider;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequest;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotTestUtils.createPhysicalSlot;
+import static org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorTestUtils.createSchedulingRequirements;
+import static org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorTestUtils.findSlotAssignmentByExecutionVertexId;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link OneSlotPerExecutionSlotAllocator}.
+ */
+public class OneSlotPerExecutionSlotAllocatorTest extends TestLogger {
+
+	private TestingBulkSlotProvider slotProvider;
+
+	@Before
+	public void setUp() throws Exception {
+		slotProvider = new TestingBulkSlotProvider();
+	}
+
+	@Test
+	public void testSucceededSlotAllocation() {
+		final ExecutionSlotAllocator executionSlotAllocator = createExecutionSlotAllocator();
+
+		final ExecutionVertexID executionVertexId = new ExecutionVertexID(new JobVertexID(), 0);
+		final List<ExecutionVertexSchedulingRequirements> schedulingRequirements =
+			createSchedulingRequirements(executionVertexId);
+
+		final Collection<SlotExecutionVertexAssignment> slotExecutionVertexAssignments =
+			executionSlotAllocator.allocateSlotsFor(schedulingRequirements);
+
+		assertThat(slotExecutionVertexAssignments, hasSize(1));
+
+		final SlotExecutionVertexAssignment slotAssignment = slotExecutionVertexAssignments.iterator().next();
+
+		assertThat(slotAssignment.getExecutionVertexId(), equalTo(executionVertexId));
+		assertThat(slotAssignment.getLogicalSlotFuture().isDone(), is(true));
+		assertThat(slotAssignment.getLogicalSlotFuture().isCompletedExceptionally(), is(false));
+	}
+
+	@Test
+	public void testFailedSlotAllocation() {
+		final OneSlotPerExecutionSlotAllocator executionSlotAllocator = createExecutionSlotAllocator();
+
+		final ExecutionVertexID executionVertexId = new ExecutionVertexID(new JobVertexID(), 0);
+		final List<ExecutionVertexSchedulingRequirements> schedulingRequirements =
+			createSchedulingRequirements(executionVertexId);
+
+		slotProvider.forceFailingSlotAllocation();
+		final Collection<SlotExecutionVertexAssignment> slotExecutionVertexAssignments =
+			executionSlotAllocator.allocateSlotsFor(schedulingRequirements);
+
+		final SlotExecutionVertexAssignment slotAssignment = slotExecutionVertexAssignments.iterator().next();
+
+		assertThat(slotAssignment.getLogicalSlotFuture().isCompletedExceptionally(), is(true));
+		assertThat(executionSlotAllocator.getPendingSlotAssignments().keySet(), hasSize(0));
+
+		final SlotRequestId slotRequestId = slotProvider.getSlotRequests().get(0).getSlotRequestId();
+		assertThat(slotProvider.getCancelledSlotRequestIds(), contains(slotRequestId));
+	}
+
+	@Test
+	public void testInterBulkInputLocationPreferencesAreRespected() {
+		final ExecutionVertexID producerId = new ExecutionVertexID(new JobVertexID(), 0);
+		final ExecutionVertexID consumerId = new ExecutionVertexID(new JobVertexID(), 0);
+
+		final TestingInputsLocationsRetriever inputsLocationsRetriever = new TestingInputsLocationsRetriever.Builder()
+			.connectConsumerToProducer(consumerId, producerId)
+			.build();
+
+		final ExecutionSlotAllocator executionSlotAllocator = createExecutionSlotAllocator(
+			new TestingStateLocationRetriever(),
+			inputsLocationsRetriever);
+
+		inputsLocationsRetriever.markScheduled(producerId);
+		final List<ExecutionVertexSchedulingRequirements> schedulingRequirementsForProducer =
+			createSchedulingRequirements(producerId);
+		final Collection<SlotExecutionVertexAssignment> slotExecutionVertexAssignmentsForProducer =
+			executionSlotAllocator.allocateSlotsFor(schedulingRequirementsForProducer);
+		final SlotExecutionVertexAssignment producerSlotAssignment =
+			findSlotAssignmentByExecutionVertexId(producerId, slotExecutionVertexAssignmentsForProducer);
+
+		assertThat(producerSlotAssignment.getLogicalSlotFuture().isDone(), is(true));
+
+		inputsLocationsRetriever.markScheduled(consumerId);
+		final List<ExecutionVertexSchedulingRequirements> schedulingRequirementsForConsumer =
+			createSchedulingRequirements(consumerId);
+		final Collection<SlotExecutionVertexAssignment> slotExecutionVertexAssignmentsForConsumer =
+			executionSlotAllocator.allocateSlotsFor(schedulingRequirementsForConsumer);
+		final SlotExecutionVertexAssignment consumerSlotAssignment =
+			findSlotAssignmentByExecutionVertexId(consumerId, slotExecutionVertexAssignmentsForConsumer);
+
+		assertThat(consumerSlotAssignment.getLogicalSlotFuture().isDone(), is(false));
+
+		inputsLocationsRetriever.assignTaskManagerLocation(producerId);
+
+		assertThat(consumerSlotAssignment.getLogicalSlotFuture().isDone(), is(true));
+	}
+
+	@Test
+	public void testIntraBulkInputLocationPreferencesDoNotBlockAllocation() {
+		final ExecutionVertexID producerId = new ExecutionVertexID(new JobVertexID(), 0);
+		final ExecutionVertexID consumerId = new ExecutionVertexID(new JobVertexID(), 0);
+
+		final TestingInputsLocationsRetriever inputsLocationsRetriever = new TestingInputsLocationsRetriever.Builder()
+			.connectConsumerToProducer(consumerId, producerId)
+			.build();
+
+		final ExecutionSlotAllocator executionSlotAllocator = createExecutionSlotAllocator(
+			new TestingStateLocationRetriever(),
+			inputsLocationsRetriever);
+
+		inputsLocationsRetriever.markScheduled(producerId);
+		inputsLocationsRetriever.markScheduled(consumerId);
+
+		final List<ExecutionVertexSchedulingRequirements> schedulingRequirements =
+			createSchedulingRequirements(producerId, consumerId);
+		final Collection<SlotExecutionVertexAssignment> slotExecutionVertexAssignments =
+			executionSlotAllocator.allocateSlotsFor(schedulingRequirements);
+
+		assertThat(slotExecutionVertexAssignments, hasSize(2));
+
+		final SlotExecutionVertexAssignment producerSlotAssignment =
+			findSlotAssignmentByExecutionVertexId(producerId, slotExecutionVertexAssignments);
+		final SlotExecutionVertexAssignment consumerSlotAssignment =
+			findSlotAssignmentByExecutionVertexId(consumerId, slotExecutionVertexAssignments);
+
+		assertThat(producerSlotAssignment.getLogicalSlotFuture().isDone(), is(true));
+		assertThat(consumerSlotAssignment.getLogicalSlotFuture().isDone(), is(true));
+	}
+
+	@Test
+	public void testCreatedSlotRequests() {
+		final ExecutionVertexID executionVertexId = new ExecutionVertexID(new JobVertexID(), 0);
+		final AllocationID allocationId = new AllocationID();
+		final SlotSharingGroupId sharingGroupId = new SlotSharingGroupId();
+		final ResourceProfile taskResourceProfile = ResourceProfile.fromResources(0.5, 250);
+		final ResourceProfile physicalSlotResourceProfile = ResourceProfile.fromResources(1.0, 300);
+		final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
+
+		final TestingStateLocationRetriever stateLocationRetriever = new TestingStateLocationRetriever();
+		stateLocationRetriever.setStateLocation(executionVertexId, taskManagerLocation);
+
+		final ExecutionSlotAllocator executionSlotAllocator = createExecutionSlotAllocator(
+			stateLocationRetriever,
+			new TestingInputsLocationsRetriever.Builder().build());
+
+		final List<ExecutionVertexSchedulingRequirements> schedulingRequirements = Collections.singletonList(
+			new ExecutionVertexSchedulingRequirements.Builder()
+				.withExecutionVertexId(executionVertexId)
+				.withPreviousAllocationId(allocationId)
+				.withSlotSharingGroupId(sharingGroupId)
+				.withPhysicalSlotResourceProfile(physicalSlotResourceProfile)
+				.withTaskResourceProfile(taskResourceProfile)
+				.build()
+		);
+
+		executionSlotAllocator.allocateSlotsFor(schedulingRequirements);
+		assertThat(slotProvider.getSlotRequests(), hasSize(1));
+
+		final SlotProfile requestSlotProfile = slotProvider.getSlotRequests().iterator().next().getSlotProfile();
+
+		assertThat(requestSlotProfile.getPreferredAllocations(), contains(allocationId));
+		assertThat(requestSlotProfile.getPreviousExecutionGraphAllocations(), contains(allocationId));
+		assertThat(requestSlotProfile.getTaskResourceProfile(), equalTo(taskResourceProfile));
+		assertThat(requestSlotProfile.getPreferredLocations(), contains(taskManagerLocation));
+		// task resource profile is used instead of slot sharing group resource profile since slot sharing is ignored
+		assertThat(requestSlotProfile.getPhysicalSlotResourceProfile(), equalTo(taskResourceProfile));
+	}
+
+	@Test(expected = IllegalStateException.class)
+	public void testCoLocationConstraintThrowsException() {
+		final ExecutionSlotAllocator executionSlotAllocator = createExecutionSlotAllocator();
+
+		final CoLocationConstraint coLocationConstraint = new CoLocationGroup().getLocationConstraint(0);
+		final List<ExecutionVertexSchedulingRequirements> schedulingRequirements = Collections.singletonList(
+			new ExecutionVertexSchedulingRequirements.Builder()
+				.withExecutionVertexId(new ExecutionVertexID(new JobVertexID(), 0))
+				.withCoLocationConstraint(coLocationConstraint)
+				.build()
+		);
+
+		executionSlotAllocator.allocateSlotsFor(schedulingRequirements);
+	}
+
+	private OneSlotPerExecutionSlotAllocator createExecutionSlotAllocator() {
+		return createExecutionSlotAllocator(
+			new TestingStateLocationRetriever(),
+			new TestingInputsLocationsRetriever.Builder().build());
+	}
+
+	private OneSlotPerExecutionSlotAllocator createExecutionSlotAllocator(
+			final StateLocationRetriever stateLocationRetriever,
+			final InputsLocationsRetriever inputsLocationsRetriever) {
+
+		return new OneSlotPerExecutionSlotAllocator(
+			slotProvider,
+			new DefaultPreferredLocationsRetriever(stateLocationRetriever, inputsLocationsRetriever),
+			true,
+			Time.seconds(10));
+	}
+
+	private static class TestingBulkSlotProvider implements BulkSlotProvider {
+
+		private final List<PhysicalSlotRequest> slotRequests = new ArrayList<>();
+
+		private final List<SlotRequestId> cancelledSlotRequestIds = new ArrayList<>();
+
+		private boolean forceFailingSlotAllocation = false;
+
+		@Override
+		public void start(ComponentMainThreadExecutor mainThreadExecutor) {
+		}
+
+		@Override
+		public CompletableFuture<Collection<PhysicalSlotRequest.Result>> allocatePhysicalSlots(
+				final Collection<PhysicalSlotRequest> physicalSlotRequests,
+				final Time timeout) {
+
+			slotRequests.addAll(physicalSlotRequests);
+
+			if (forceFailingSlotAllocation) {
+				return FutureUtils.completedExceptionally(new Exception("Forced failure"));
+			}
+
+			final List<PhysicalSlotRequest.Result> results = new ArrayList<>(physicalSlotRequests.size());
+			for (PhysicalSlotRequest request : physicalSlotRequests) {
+				final PhysicalSlotRequest.Result result = new PhysicalSlotRequest.Result(
+					request.getSlotRequestId(),
+					createPhysicalSlot());
+				results.add(result);
+			}
+			return CompletableFuture.completedFuture(results);
+		}
+
+		@Override
+		public void cancelSlotRequest(
+				final SlotRequestId slotRequestId,
+				final Throwable cause) {
+			cancelledSlotRequestIds.add(slotRequestId);
+		}
+
+		List<PhysicalSlotRequest> getSlotRequests() {
+			return Collections.unmodifiableList(slotRequests);
+		}
+
+		List<SlotRequestId> getCancelledSlotRequestIds() {
+			return Collections.unmodifiableList(cancelledSlotRequestIds);
+		}
+
+		void forceFailingSlotAllocation() {
+			this.forceFailingSlotAllocation = true;
+		}
+	}
+
+}


[flink] 07/07: [hotfix][runtime] Narrow down the access scope of SlotExecutionVertexAssignment

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0aea300ccf23780221bd9d628511b698a0aa519b
Author: Zhu Zhu <re...@gmail.com>
AuthorDate: Tue Jun 16 00:03:55 2020 +0800

    [hotfix][runtime] Narrow down the access scope of SlotExecutionVertexAssignment
---
 .../flink/runtime/scheduler/SlotExecutionVertexAssignment.java    | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotExecutionVertexAssignment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotExecutionVertexAssignment.java
index b52c6af..a25209f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotExecutionVertexAssignment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotExecutionVertexAssignment.java
@@ -29,24 +29,24 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 /**
  * The slot assignment for a {@link ExecutionVertex}.
  */
-public class SlotExecutionVertexAssignment {
+class SlotExecutionVertexAssignment {
 
 	private final ExecutionVertexID executionVertexId;
 
 	private final CompletableFuture<LogicalSlot> logicalSlotFuture;
 
-	public SlotExecutionVertexAssignment(
+	SlotExecutionVertexAssignment(
 			ExecutionVertexID executionVertexId,
 			CompletableFuture<LogicalSlot> logicalSlotFuture) {
 		this.executionVertexId = checkNotNull(executionVertexId);
 		this.logicalSlotFuture = checkNotNull(logicalSlotFuture);
 	}
 
-	public ExecutionVertexID getExecutionVertexId() {
+	ExecutionVertexID getExecutionVertexId() {
 		return executionVertexId;
 	}
 
-	public CompletableFuture<LogicalSlot> getLogicalSlotFuture() {
+	CompletableFuture<LogicalSlot> getLogicalSlotFuture() {
 		return logicalSlotFuture;
 	}
 }


[flink] 03/07: [FLINK-17018][runtime] Extract common logics of DefaultExecutionSlotAllocator into AbstractExecutionSlotAllocator

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6d9eb50862624cb2af2d77e747550e8ce28908bd
Author: Zhu Zhu <re...@gmail.com>
AuthorDate: Thu Jun 11 22:50:00 2020 +0800

    [FLINK-17018][runtime] Extract common logics of DefaultExecutionSlotAllocator into AbstractExecutionSlotAllocator
---
 .../scheduler/AbstractExecutionSlotAllocator.java  | 131 +++++++++++++++
 .../scheduler/DefaultExecutionSlotAllocator.java   | 132 +++++----------
 .../AbstractExecutionSlotAllocatorTest.java        | 178 +++++++++++++++++++++
 .../DefaultExecutionSlotAllocatorTest.java         | 128 ++++-----------
 4 files changed, 374 insertions(+), 195 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/AbstractExecutionSlotAllocator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/AbstractExecutionSlotAllocator.java
new file mode 100644
index 0000000..d8dbf3b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/AbstractExecutionSlotAllocator.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Base class for all {@link ExecutionSlotAllocator}. It is responsible to allocate slots for tasks and
+ * keep the unfulfilled slot requests for further cancellation.
+ */
+abstract class AbstractExecutionSlotAllocator implements ExecutionSlotAllocator {
+
+	private final Map<ExecutionVertexID, SlotExecutionVertexAssignment> pendingSlotAssignments;
+
+	private final PreferredLocationsRetriever preferredLocationsRetriever;
+
+	AbstractExecutionSlotAllocator(final PreferredLocationsRetriever preferredLocationsRetriever) {
+		this.preferredLocationsRetriever = checkNotNull(preferredLocationsRetriever);
+		this.pendingSlotAssignments = new HashMap<>();
+	}
+
+	@Override
+	public void cancel(final ExecutionVertexID executionVertexId) {
+		final SlotExecutionVertexAssignment slotExecutionVertexAssignment = pendingSlotAssignments.get(executionVertexId);
+		if (slotExecutionVertexAssignment != null) {
+			slotExecutionVertexAssignment.getLogicalSlotFuture().cancel(false);
+		}
+	}
+
+	void validateSchedulingRequirements(final Collection<ExecutionVertexSchedulingRequirements> schedulingRequirements) {
+		schedulingRequirements.stream()
+			.map(ExecutionVertexSchedulingRequirements::getExecutionVertexId)
+			.forEach(id -> checkState(
+				!pendingSlotAssignments.containsKey(id),
+				"BUG: vertex %s tries to allocate a slot when its previous slot request is still pending", id));
+	}
+
+	SlotExecutionVertexAssignment createAndRegisterSlotExecutionVertexAssignment(
+			final ExecutionVertexID executionVertexId,
+			final CompletableFuture<LogicalSlot> logicalSlotFuture,
+			final Consumer<Throwable> slotRequestFailureHandler) {
+
+		final SlotExecutionVertexAssignment slotExecutionVertexAssignment =
+			new SlotExecutionVertexAssignment(executionVertexId, logicalSlotFuture);
+
+		// add to map first in case the slot future is already completed
+		pendingSlotAssignments.put(executionVertexId, slotExecutionVertexAssignment);
+
+		logicalSlotFuture.whenComplete(
+			(ignored, throwable) -> {
+				pendingSlotAssignments.remove(executionVertexId);
+				if (throwable != null) {
+					slotRequestFailureHandler.accept(throwable);
+				}
+			});
+
+		return slotExecutionVertexAssignment;
+	}
+
+	CompletableFuture<SlotProfile> getSlotProfileFuture(
+			final ExecutionVertexSchedulingRequirements schedulingRequirements,
+			final ResourceProfile physicalSlotResourceProfile,
+			final Set<ExecutionVertexID> producersToIgnore,
+			final Set<AllocationID> allPreviousAllocationIds) {
+
+		final CompletableFuture<Collection<TaskManagerLocation>> preferredLocationsFuture =
+			preferredLocationsRetriever.getPreferredLocations(
+				schedulingRequirements.getExecutionVertexId(),
+				producersToIgnore);
+
+		return preferredLocationsFuture.thenApply(
+			preferredLocations ->
+				SlotProfile.priorAllocation(
+					schedulingRequirements.getTaskResourceProfile(),
+					physicalSlotResourceProfile,
+					preferredLocations,
+					Collections.singletonList(schedulingRequirements.getPreviousAllocationId()),
+					allPreviousAllocationIds));
+	}
+
+	@VisibleForTesting
+	static Set<AllocationID> computeAllPriorAllocationIds(
+			final Collection<ExecutionVertexSchedulingRequirements> executionVertexSchedulingRequirements) {
+
+		return executionVertexSchedulingRequirements
+			.stream()
+			.map(ExecutionVertexSchedulingRequirements::getPreviousAllocationId)
+			.filter(Objects::nonNull)
+			.collect(Collectors.toSet());
+	}
+
+	@VisibleForTesting
+	Map<ExecutionVertexID, SlotExecutionVertexAssignment> getPendingSlotAssignments() {
+		return Collections.unmodifiableMap(pendingSlotAssignments);
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocator.java
index c7b2dd9..07da10e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocator.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.scheduler;
 
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.SlotProfile;
 import org.apache.flink.runtime.executiongraph.SlotProviderStrategy;
@@ -28,49 +27,34 @@ import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.SlotRequestId;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
-import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * Default {@link ExecutionSlotAllocator} which will use {@link SlotProvider} to allocate slots and
  * keep the unfulfilled requests for further cancellation.
  */
-public class DefaultExecutionSlotAllocator implements ExecutionSlotAllocator {
+public class DefaultExecutionSlotAllocator extends AbstractExecutionSlotAllocator {
 
 	private static final Logger LOG = LoggerFactory.getLogger(DefaultExecutionSlotAllocator.class);
 
-	/**
-	 * Store the uncompleted slot assignments.
-	 */
-	private final Map<ExecutionVertexID, SlotExecutionVertexAssignment> pendingSlotAssignments;
-
 	private final SlotProviderStrategy slotProviderStrategy;
 
-	private final PreferredLocationsRetriever preferredLocationsRetriever;
-
 	public DefaultExecutionSlotAllocator(
 			final SlotProviderStrategy slotProviderStrategy,
 			final PreferredLocationsRetriever preferredLocationsRetriever) {
-		this.slotProviderStrategy = checkNotNull(slotProviderStrategy);
-		this.preferredLocationsRetriever = checkNotNull(preferredLocationsRetriever);
 
-		pendingSlotAssignments = new HashMap<>();
+		super(preferredLocationsRetriever);
+		this.slotProviderStrategy = checkNotNull(slotProviderStrategy);
 	}
 
 	@Override
@@ -86,89 +70,49 @@ public class DefaultExecutionSlotAllocator implements ExecutionSlotAllocator {
 
 		for (ExecutionVertexSchedulingRequirements schedulingRequirements : executionVertexSchedulingRequirements) {
 			final ExecutionVertexID executionVertexId = schedulingRequirements.getExecutionVertexId();
-			final SlotRequestId slotRequestId = new SlotRequestId();
 			final SlotSharingGroupId slotSharingGroupId = schedulingRequirements.getSlotSharingGroupId();
 
-			LOG.debug("Allocate slot with id {} for execution {}", slotRequestId, executionVertexId);
-
-			CompletableFuture<LogicalSlot> slotFuture = calculatePreferredLocations(
-					executionVertexId).thenCompose(
-							(Collection<TaskManagerLocation> preferredLocations) ->
-								slotProviderStrategy.allocateSlot(
-									slotRequestId,
-									new ScheduledUnit(
-										executionVertexId,
-										slotSharingGroupId,
-										schedulingRequirements.getCoLocationConstraint()),
-									SlotProfile.priorAllocation(
-										schedulingRequirements.getTaskResourceProfile(),
-										schedulingRequirements.getPhysicalSlotResourceProfile(),
-										preferredLocations,
-										Collections.singletonList(schedulingRequirements.getPreviousAllocationId()),
-										allPreviousAllocationIds)));
-
-			SlotExecutionVertexAssignment slotExecutionVertexAssignment =
-					new SlotExecutionVertexAssignment(executionVertexId, slotFuture);
-			// add to map first to avoid the future completed before added.
-			pendingSlotAssignments.put(executionVertexId, slotExecutionVertexAssignment);
-
-			slotFuture.whenComplete(
-					(ignored, throwable) -> {
-						pendingSlotAssignments.remove(executionVertexId);
-						if (throwable != null) {
-							slotProviderStrategy.cancelSlotRequest(slotRequestId, slotSharingGroupId, throwable);
-						}
-					});
-
-			slotExecutionVertexAssignments.add(slotExecutionVertexAssignment);
-		}
+			final SlotRequestId slotRequestId = new SlotRequestId();
 
-		return slotExecutionVertexAssignments;
-	}
+			final CompletableFuture<LogicalSlot> slotFuture = allocateSlot(
+				schedulingRequirements,
+				slotRequestId,
+				allPreviousAllocationIds);
 
-	private void validateSchedulingRequirements(Collection<ExecutionVertexSchedulingRequirements> schedulingRequirements) {
-		schedulingRequirements.stream()
-			.map(ExecutionVertexSchedulingRequirements::getExecutionVertexId)
-			.forEach(id -> checkState(
-				!pendingSlotAssignments.containsKey(id),
-				"BUG: vertex %s tries to allocate a slot when its previous slot request is still pending", id));
-	}
+			final SlotExecutionVertexAssignment slotExecutionVertexAssignment =
+				createAndRegisterSlotExecutionVertexAssignment(
+					executionVertexId,
+					slotFuture,
+					throwable -> slotProviderStrategy.cancelSlotRequest(slotRequestId, slotSharingGroupId, throwable));
 
-	@Override
-	public void cancel(ExecutionVertexID executionVertexId) {
-		SlotExecutionVertexAssignment slotExecutionVertexAssignment = pendingSlotAssignments.get(executionVertexId);
-		if (slotExecutionVertexAssignment != null) {
-			slotExecutionVertexAssignment.getLogicalSlotFuture().cancel(false);
+			slotExecutionVertexAssignments.add(slotExecutionVertexAssignment);
 		}
-	}
-
-	/**
-	 * Calculates the preferred locations for an execution.
-	 * It will first try to use preferred locations based on state,
-	 * if null, will use the preferred locations based on inputs.
-	 */
-	private CompletableFuture<Collection<TaskManagerLocation>> calculatePreferredLocations(
-			ExecutionVertexID executionVertexId) {
-		return preferredLocationsRetriever.getPreferredLocations(executionVertexId, Collections.emptySet());
-	}
 
-	/**
-	 * Computes and returns a set with the prior allocation ids from all execution vertices scheduled together.
-	 *
-	 * @param executionVertexSchedulingRequirements contains the execution vertices which are scheduled together
-	 */
-	@VisibleForTesting
-	static Set<AllocationID> computeAllPriorAllocationIds(
-			Collection<ExecutionVertexSchedulingRequirements> executionVertexSchedulingRequirements) {
-		return executionVertexSchedulingRequirements
-			.stream()
-			.map(ExecutionVertexSchedulingRequirements::getPreviousAllocationId)
-			.filter(Objects::nonNull)
-			.collect(Collectors.toSet());
+		return slotExecutionVertexAssignments;
 	}
 
-	@VisibleForTesting
-	int getNumberOfPendingSlotAssignments() {
-		return pendingSlotAssignments.size();
+	private CompletableFuture<LogicalSlot> allocateSlot(
+			final ExecutionVertexSchedulingRequirements schedulingRequirements,
+			final SlotRequestId slotRequestId,
+			final Set<AllocationID> allPreviousAllocationIds) {
+
+		final ExecutionVertexID executionVertexId = schedulingRequirements.getExecutionVertexId();
+
+		LOG.debug("Allocate slot with id {} for execution {}", slotRequestId, executionVertexId);
+
+		final CompletableFuture<SlotProfile> slotProfileFuture = getSlotProfileFuture(
+			schedulingRequirements,
+			schedulingRequirements.getPhysicalSlotResourceProfile(),
+			Collections.emptySet(),
+			allPreviousAllocationIds);
+
+		return slotProfileFuture.thenCompose(
+			slotProfile -> slotProviderStrategy.allocateSlot(
+				slotRequestId,
+				new ScheduledUnit(
+					executionVertexId,
+					schedulingRequirements.getSlotSharingGroupId(),
+					schedulingRequirements.getCoLocationConstraint()),
+				slotProfile));
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/AbstractExecutionSlotAllocatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/AbstractExecutionSlotAllocatorTest.java
new file mode 100644
index 0000000..4077930
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/AbstractExecutionSlotAllocatorTest.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.hasSize;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link AbstractExecutionSlotAllocator}.
+ */
+public class AbstractExecutionSlotAllocatorTest extends TestLogger {
+
+	private AbstractExecutionSlotAllocator executionSlotAllocator;
+
+	@Before
+	public void setUp() throws Exception {
+		executionSlotAllocator = new TestingExecutionSlotAllocator();
+	}
+
+	@Test
+	public void testCancel() {
+		final ExecutionVertexID executionVertexId = new ExecutionVertexID(new JobVertexID(), 0);
+
+		final List<ExecutionVertexSchedulingRequirements> schedulingRequirements =
+			createSchedulingRequirements(executionVertexId);
+		final List<SlotExecutionVertexAssignment> assignments =
+			executionSlotAllocator.allocateSlotsFor(schedulingRequirements);
+
+		executionSlotAllocator.cancel(executionVertexId);
+
+		assertThat(assignments.get(0).getLogicalSlotFuture().isCancelled(), is(true));
+	}
+
+	@Test(expected = IllegalStateException.class)
+	public void testValidateSchedulingRequirements() {
+		final ExecutionVertexID executionVertexId = new ExecutionVertexID(new JobVertexID(), 0);
+
+		final List<ExecutionVertexSchedulingRequirements> schedulingRequirements =
+			createSchedulingRequirements(executionVertexId);
+		executionSlotAllocator.allocateSlotsFor(schedulingRequirements);
+
+		executionSlotAllocator.validateSchedulingRequirements(schedulingRequirements);
+	}
+
+	@Test
+	public void testCreateAndRegisterSlotExecutionVertexAssignment() {
+		final ExecutionVertexID executionVertexId = new ExecutionVertexID(new JobVertexID(), 0);
+
+		final List<ExecutionVertexSchedulingRequirements> schedulingRequirements =
+			createSchedulingRequirements(executionVertexId);
+		final List<SlotExecutionVertexAssignment> assignments =
+			executionSlotAllocator.allocateSlotsFor(schedulingRequirements);
+
+		assertThat(assignments, hasSize(1));
+
+		final SlotExecutionVertexAssignment assignment = assignments.get(0);
+		assertThat(assignment.getExecutionVertexId(), is(executionVertexId));
+		assertThat(assignment.getLogicalSlotFuture().isDone(), is(false));
+		assertThat(executionSlotAllocator.getPendingSlotAssignments().values(), contains(assignment));
+
+		assignment.getLogicalSlotFuture().cancel(false);
+
+		assertThat(executionSlotAllocator.getPendingSlotAssignments().keySet(), hasSize(0));
+	}
+
+	@Test
+	public void testCompletedExecutionVertexAssignmentWillBeUnregistered() {
+		final ExecutionVertexID executionVertexId = new ExecutionVertexID(new JobVertexID(), 0);
+
+		final List<ExecutionVertexSchedulingRequirements> schedulingRequirements =
+			createSchedulingRequirements(executionVertexId);
+		final List<SlotExecutionVertexAssignment> assignments =
+			executionSlotAllocator.allocateSlotsFor(schedulingRequirements);
+
+		assignments.get(0).getLogicalSlotFuture().cancel(false);
+
+		assertThat(executionSlotAllocator.getPendingSlotAssignments().keySet(), hasSize(0));
+	}
+
+	@Test
+	public void testComputeAllPriorAllocationIds() {
+		final List<AllocationID> expectAllocationIds = Arrays.asList(new AllocationID(), new AllocationID());
+		final List<ExecutionVertexSchedulingRequirements> testSchedulingRequirements = Arrays.asList(
+			new ExecutionVertexSchedulingRequirements.Builder().
+				withExecutionVertexId(new ExecutionVertexID(new JobVertexID(), 0)).
+				withPreviousAllocationId(expectAllocationIds.get(0)).
+				build(),
+			new ExecutionVertexSchedulingRequirements.Builder().
+				withExecutionVertexId(new ExecutionVertexID(new JobVertexID(), 1)).
+				withPreviousAllocationId(expectAllocationIds.get(0)).
+				build(),
+			new ExecutionVertexSchedulingRequirements.Builder().
+				withExecutionVertexId(new ExecutionVertexID(new JobVertexID(), 2)).
+				withPreviousAllocationId(expectAllocationIds.get(1)).
+				build(),
+			new ExecutionVertexSchedulingRequirements.Builder().
+				withExecutionVertexId(new ExecutionVertexID(new JobVertexID(), 3)).
+				build()
+		);
+
+		final Set<AllocationID> allPriorAllocationIds =
+			AbstractExecutionSlotAllocator.computeAllPriorAllocationIds(testSchedulingRequirements);
+		assertThat(allPriorAllocationIds, containsInAnyOrder(expectAllocationIds.toArray()));
+	}
+
+	private List<ExecutionVertexSchedulingRequirements> createSchedulingRequirements(
+			final ExecutionVertexID... executionVertexIds) {
+
+		final List<ExecutionVertexSchedulingRequirements> schedulingRequirements = new ArrayList<>(executionVertexIds.length);
+
+		for (ExecutionVertexID executionVertexId : executionVertexIds) {
+			schedulingRequirements.add(new ExecutionVertexSchedulingRequirements.Builder()
+				.withExecutionVertexId(executionVertexId).build());
+		}
+		return schedulingRequirements;
+	}
+
+	private static class TestingExecutionSlotAllocator extends AbstractExecutionSlotAllocator {
+
+		TestingExecutionSlotAllocator() {
+			super(
+				new DefaultPreferredLocationsRetriever(
+					new TestingStateLocationRetriever(),
+					new TestingInputsLocationsRetriever.Builder().build()));
+		}
+
+		@Override
+		public List<SlotExecutionVertexAssignment> allocateSlotsFor(
+				final List<ExecutionVertexSchedulingRequirements> executionVertexSchedulingRequirements) {
+
+			final List<SlotExecutionVertexAssignment> slotExecutionVertexAssignments =
+				new ArrayList<>(executionVertexSchedulingRequirements.size());
+
+			for (ExecutionVertexSchedulingRequirements schedulingRequirements : executionVertexSchedulingRequirements) {
+				slotExecutionVertexAssignments.add(
+					createAndRegisterSlotExecutionVertexAssignment(
+						schedulingRequirements.getExecutionVertexId(),
+						new CompletableFuture<>(),
+						throwable -> {}));
+			}
+
+			return slotExecutionVertexAssignments;
+		}
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorTest.java
index ad74357..0646a32 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorTest.java
@@ -49,18 +49,12 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
-import java.util.stream.Collectors;
 
 import static org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorTestUtils.createSchedulingRequirements;
 import static org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorTestUtils.findSlotAssignmentByExecutionVertexId;
 import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.hasSize;
-import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
@@ -111,7 +105,7 @@ public class DefaultExecutionSlotAllocatorTest extends TestLogger {
 		inputsLocationsRetriever.assignTaskManagerLocation(producerId);
 
 		assertTrue(consumerSlotAssignment.getLogicalSlotFuture().isDone());
-		assertEquals(0, executionSlotAllocator.getNumberOfPendingSlotAssignments());
+		assertThat(executionSlotAllocator.getPendingSlotAssignments().keySet(), hasSize(0));
 	}
 
 	/**
@@ -160,109 +154,47 @@ public class DefaultExecutionSlotAllocatorTest extends TestLogger {
 		assertThat(expectedSlotProfile.getPreferredLocations(), contains(taskManagerLocation));
 	}
 
-	/**
-	 * Tests that cancels an execution vertex which is not existed.
-	 */
 	@Test
-	public void testCancelNonExistingExecutionVertex() {
-		final DefaultExecutionSlotAllocator executionSlotAllocator = createExecutionSlotAllocator();
-
-		ExecutionVertexID inValidExecutionVertexId = new ExecutionVertexID(new JobVertexID(), 0);
-		executionSlotAllocator.cancel(inValidExecutionVertexId);
-
-		assertThat(slotProvider.getCancelledSlotRequestIds(), is(empty()));
-	}
-
-	/**
-	 * Tests that cancels a slot request which has already been fulfilled.
-	 */
-	@Test
-	public void testCancelFulfilledSlotRequest() {
-		final ExecutionVertexID producerId = new ExecutionVertexID(new JobVertexID(), 0);
+	public void testDuplicatedSlotAllocationIsNotAllowed() {
+		final ExecutionVertexID executionVertexId = new ExecutionVertexID(new JobVertexID(), 0);
 
 		final DefaultExecutionSlotAllocator executionSlotAllocator = createExecutionSlotAllocator();
+		slotProvider.disableSlotAllocation();
 
 		final List<ExecutionVertexSchedulingRequirements> schedulingRequirements =
-				createSchedulingRequirements(producerId);
+			createSchedulingRequirements(executionVertexId);
 		executionSlotAllocator.allocateSlotsFor(schedulingRequirements);
 
-		executionSlotAllocator.cancel(producerId);
-
-		assertThat(slotProvider.getCancelledSlotRequestIds(), is(empty()));
+		try {
+			executionSlotAllocator.allocateSlotsFor(schedulingRequirements);
+			fail("exception should happen");
+		} catch (IllegalStateException e) {
+			// IllegalStateException is expected
+		}
 	}
 
-	/**
-	 * Tests that cancels a slot request which has not been fulfilled.
-	 */
 	@Test
-	public void testCancelUnFulfilledSlotRequest() throws Exception {
-		final ExecutionVertexID producerId = new ExecutionVertexID(new JobVertexID(), 0);
-
+	public void testSlotAssignmentIsProperlyRegistered() {
 		final DefaultExecutionSlotAllocator executionSlotAllocator = createExecutionSlotAllocator();
 
-		slotProvider.disableSlotAllocation();
+		final ExecutionVertexID executionVertexID = new ExecutionVertexID(new JobVertexID(), 0);
 		final List<ExecutionVertexSchedulingRequirements> schedulingRequirements =
-				createSchedulingRequirements(producerId);
-		Collection<SlotExecutionVertexAssignment> assignments = executionSlotAllocator.allocateSlotsFor(schedulingRequirements);
-
-		executionSlotAllocator.cancel(producerId);
-
-		assertThat(slotProvider.getCancelledSlotRequestIds(), hasSize(1));
-		assertThat(slotProvider.getCancelledSlotRequestIds(), contains(slotProvider.getReceivedSlotRequestIds().toArray()));
-
-		try {
-			assignments.iterator().next().getLogicalSlotFuture().get();
-			fail("Expect a CancellationException but got nothing.");
-		} catch (CancellationException ignored) {
-			// Expected exception
-		}
-	}
+			createSchedulingRequirements(executionVertexID);
 
-	/**
-	 * Tests that all prior allocation ids are computed by union all previous allocation ids in scheduling requirements.
-	 */
-	@Test
-	public void testComputeAllPriorAllocationIds() {
-		List<AllocationID> expectAllocationIds = Arrays.asList(new AllocationID(), new AllocationID());
-		List<ExecutionVertexSchedulingRequirements> testSchedulingRequirements = Arrays.asList(
-				new ExecutionVertexSchedulingRequirements.Builder().
-						withExecutionVertexId(new ExecutionVertexID(new JobVertexID(), 0)).
-						withPreviousAllocationId(expectAllocationIds.get(0)).
-						build(),
-				new ExecutionVertexSchedulingRequirements.Builder().
-						withExecutionVertexId(new ExecutionVertexID(new JobVertexID(), 1)).
-						withPreviousAllocationId(expectAllocationIds.get(0)).
-						build(),
-				new ExecutionVertexSchedulingRequirements.Builder().
-						withExecutionVertexId(new ExecutionVertexID(new JobVertexID(), 2)).
-						withPreviousAllocationId(expectAllocationIds.get(1)).
-						build(),
-				new ExecutionVertexSchedulingRequirements.Builder().
-						withExecutionVertexId(new ExecutionVertexID(new JobVertexID(), 3)).
-						build()
-		);
+		slotProvider.disableSlotAllocation();
+		final Collection<SlotExecutionVertexAssignment> slotExecutionVertexAssignments =
+			executionSlotAllocator.allocateSlotsFor(schedulingRequirements);
 
-		Set<AllocationID> allPriorAllocationIds = DefaultExecutionSlotAllocator.computeAllPriorAllocationIds(testSchedulingRequirements);
-		assertThat(allPriorAllocationIds, containsInAnyOrder(expectAllocationIds.toArray()));
-	}
+		final SlotExecutionVertexAssignment slotAssignment = slotExecutionVertexAssignments.iterator().next();
 
-	@Test
-	public void testDuplicatedSlotAllocationIsNotAllowed() {
-		final ExecutionVertexID executionVertexId = new ExecutionVertexID(new JobVertexID(), 0);
+		assertThat(executionSlotAllocator.getPendingSlotAssignments().values(), contains(slotAssignment));
 
-		final DefaultExecutionSlotAllocator executionSlotAllocator = createExecutionSlotAllocator();
-		slotProvider.disableSlotAllocation();
+		executionSlotAllocator.cancel(executionVertexID);
 
-		final List<ExecutionVertexSchedulingRequirements> schedulingRequirements =
-			createSchedulingRequirements(executionVertexId);
-		executionSlotAllocator.allocateSlotsFor(schedulingRequirements);
+		assertThat(executionSlotAllocator.getPendingSlotAssignments().keySet(), hasSize(0));
 
-		try {
-			executionSlotAllocator.allocateSlotsFor(schedulingRequirements);
-			fail("exception should happen");
-		} catch (IllegalStateException e) {
-			// IllegalStateException is expected
-		}
+		final SlotRequestId slotRequestId = slotProvider.slotAllocationRequests.get(0).f0;
+		assertThat(slotProvider.getCancelledSlotRequestIds(), contains(slotRequestId));
 	}
 
 	private DefaultExecutionSlotAllocator createExecutionSlotAllocator() {
@@ -317,18 +249,12 @@ public class DefaultExecutionSlotAllocatorTest extends TestLogger {
 			return Collections.unmodifiableList(slotAllocationRequests);
 		}
 
-		public List<SlotRequestId> getReceivedSlotRequestIds() {
-			return slotAllocationRequests.stream()
-					.map(requestTuple -> requestTuple.f0)
-					.collect(Collectors.toList());
-		}
-
-		public List<SlotRequestId> getCancelledSlotRequestIds() {
-			return Collections.unmodifiableList(cancelledSlotRequestIds);
-		}
-
 		public void disableSlotAllocation() {
 			slotAllocationDisabled = true;
 		}
+
+		List<SlotRequestId> getCancelledSlotRequestIds() {
+			return cancelledSlotRequestIds;
+		}
 	}
 }


[flink] 05/07: [FLINK-17018][runtime] Use OneSlotPerExecutionSlotAllocator on pipelined region scheduling

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d75f186d99e8699a2dac802de803b25fdf612e01
Author: Zhu Zhu <re...@gmail.com>
AuthorDate: Sat Jun 6 13:26:28 2020 +0800

    [FLINK-17018][runtime] Use OneSlotPerExecutionSlotAllocator on pipelined region scheduling
---
 .../runtime/scheduler/DefaultSchedulerFactory.java | 34 +++++++++++--
 .../OneSlotPerExecutionSlotAllocatorFactory.java   | 55 ++++++++++++++++++++++
 2 files changed, 84 insertions(+), 5 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
index 1671b5e..333472b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
 import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
 import org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy;
 import org.apache.flink.runtime.scheduler.strategy.LazyFromSourcesSchedulingStrategy;
+import org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
 import org.apache.flink.runtime.shuffle.ShuffleMaster;
 
@@ -79,10 +80,12 @@ public class DefaultSchedulerFactory implements SchedulerNGFactory {
 			.create();
 		log.info("Using restart back off time strategy {} for {} ({}).", restartBackoffTimeStrategy, jobGraph.getName(), jobGraph.getJobID());
 
-		final SlotProviderStrategy slotProviderStrategy = SlotProviderStrategy.from(
-			jobGraph.getScheduleMode(),
-			slotProvider,
-			slotRequestTimeout);
+		final ExecutionSlotAllocatorFactory slotAllocatorFactory =
+			createExecutionSlotAllocatorFactory(
+				jobGraph.getScheduleMode(),
+				slotProvider,
+				slotRequestTimeout,
+				schedulingStrategyFactory);
 
 		return new DefaultScheduler(
 			log,
@@ -104,7 +107,7 @@ public class DefaultSchedulerFactory implements SchedulerNGFactory {
 			restartBackoffTimeStrategy,
 			new DefaultExecutionVertexOperations(),
 			new ExecutionVertexVersioner(),
-			new DefaultExecutionSlotAllocatorFactory(slotProviderStrategy));
+			slotAllocatorFactory);
 	}
 
 	static SchedulingStrategyFactory createSchedulingStrategyFactory(final ScheduleMode scheduleMode) {
@@ -118,4 +121,25 @@ public class DefaultSchedulerFactory implements SchedulerNGFactory {
 				throw new IllegalStateException("Unsupported schedule mode " + scheduleMode);
 		}
 	}
+
+	private static ExecutionSlotAllocatorFactory createExecutionSlotAllocatorFactory(
+			final ScheduleMode scheduleMode,
+			final SlotProvider slotProvider,
+			final Time slotRequestTimeout,
+			final SchedulingStrategyFactory schedulingStrategyFactory) {
+
+		if (schedulingStrategyFactory instanceof PipelinedRegionSchedulingStrategy.Factory) {
+			return new OneSlotPerExecutionSlotAllocatorFactory(
+				slotProvider,
+				scheduleMode != ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST,
+				slotRequestTimeout);
+		} else {
+			final SlotProviderStrategy slotProviderStrategy = SlotProviderStrategy.from(
+				scheduleMode,
+				slotProvider,
+				slotRequestTimeout);
+
+			return new DefaultExecutionSlotAllocatorFactory(slotProviderStrategy);
+		}
+	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/OneSlotPerExecutionSlotAllocatorFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/OneSlotPerExecutionSlotAllocatorFactory.java
new file mode 100644
index 0000000..8412b05
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/OneSlotPerExecutionSlotAllocatorFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Factory for {@link OneSlotPerExecutionSlotAllocator}.
+ */
+class OneSlotPerExecutionSlotAllocatorFactory implements ExecutionSlotAllocatorFactory {
+
+	private final SlotProvider slotProvider;
+
+	private final boolean slotWillBeOccupiedIndefinitely;
+
+	private final Time allocationTimeout;
+
+	OneSlotPerExecutionSlotAllocatorFactory(
+			final SlotProvider slotProvider,
+			final boolean slotWillBeOccupiedIndefinitely,
+			final Time allocationTimeout) {
+		this.slotProvider = checkNotNull(slotProvider);
+		this.slotWillBeOccupiedIndefinitely = slotWillBeOccupiedIndefinitely;
+		this.allocationTimeout = checkNotNull(allocationTimeout);
+	}
+
+	@Override
+	public ExecutionSlotAllocator createInstance(final PreferredLocationsRetriever preferredLocationsRetriever) {
+		return new OneSlotPerExecutionSlotAllocator(
+			slotProvider,
+			preferredLocationsRetriever,
+			slotWillBeOccupiedIndefinitely,
+			allocationTimeout);
+	}
+}