You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2020/06/16 09:14:39 UTC

[flink] 04/07: [FLINK-17018][runtime] Introduce OneSlotPerExecutionSlotAllocator which will request one physical slot for each single execution vertex

This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 95cf59a669610bfcf12a561281322113ea32cd90
Author: Zhu Zhu <re...@gmail.com>
AuthorDate: Thu Jun 11 22:57:00 2020 +0800

    [FLINK-17018][runtime] Introduce OneSlotPerExecutionSlotAllocator which will request one physical slot for each single execution vertex
    
    OneSlotPerExecutionSlotAllocator allocates slots in bulks so that the SlotProvider can check whether this bulk of slot requests can be fulfilled at the same time.
    It has several limitations:
    1. Slot sharing will be ignored.
    2. Co-location constraints are not allowed.
    3. Intra-bulk input location preferences will be ignored.
---
 .../jobmaster/slotpool/PhysicalSlotRequest.java    |   6 +-
 .../jobmaster/slotpool/SingleLogicalSlot.java      |   2 +-
 .../OneSlotPerExecutionSlotAllocator.java          | 217 +++++++++++++++
 .../OneSlotPerExecutionSlotAllocatorTest.java      | 304 +++++++++++++++++++++
 4 files changed, 525 insertions(+), 4 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequest.java
index b953e43..60030ca 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequest.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequest.java
@@ -42,11 +42,11 @@ public class PhysicalSlotRequest {
 		this.slotWillBeOccupiedIndefinitely = slotWillBeOccupiedIndefinitely;
 	}
 
-	SlotRequestId getSlotRequestId() {
+	public SlotRequestId getSlotRequestId() {
 		return slotRequestId;
 	}
 
-	SlotProfile getSlotProfile() {
+	public SlotProfile getSlotProfile() {
 		return slotProfile;
 	}
 
@@ -63,7 +63,7 @@ public class PhysicalSlotRequest {
 
 		private final PhysicalSlot physicalSlot;
 
-		Result(final SlotRequestId slotRequestId, final PhysicalSlot physicalSlot) {
+		public Result(final SlotRequestId slotRequestId, final PhysicalSlot physicalSlot) {
 			this.slotRequestId = slotRequestId;
 			this.physicalSlot = physicalSlot;
 		}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java
index 710f003..f5ba44c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java
@@ -166,7 +166,7 @@ public class SingleLogicalSlot implements LogicalSlot, PhysicalSlot.Payload {
 		return slotSharingGroupId;
 	}
 
-	static SingleLogicalSlot allocateFromPhysicalSlot(
+	public static SingleLogicalSlot allocateFromPhysicalSlot(
 			final SlotRequestId slotRequestId,
 			final PhysicalSlot physicalSlot,
 			final Locality locality,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/OneSlotPerExecutionSlotAllocator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/OneSlotPerExecutionSlotAllocator.java
new file mode 100644
index 0000000..c350f5d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/OneSlotPerExecutionSlotAllocator.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.slotpool.BulkSlotProvider;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequest;
+import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.util.FlinkException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * This slot allocator will request one physical slot for each single execution vertex.
+ * The slots will be requested in bulks so that the {@link SlotProvider} can check
+ * whether this bulk of slot requests can be fulfilled at the same time.
+ * It has several limitations:
+ *
+ * <p>1. Slot sharing will be ignored.
+ *
+ * <p>2. Co-location constraints are not allowed.
+ *
+ * <p>3. Intra-bulk input location preferences will be ignored.
+ */
+class OneSlotPerExecutionSlotAllocator extends AbstractExecutionSlotAllocator implements SlotOwner {
+
+	private static final Logger LOG = LoggerFactory.getLogger(OneSlotPerExecutionSlotAllocator.class);
+
+	private final BulkSlotProvider slotProvider;
+
+	private final boolean slotWillBeOccupiedIndefinitely;
+
+	private final Time allocationTimeout;
+
+	OneSlotPerExecutionSlotAllocator(
+			final BulkSlotProvider slotProvider,
+			final PreferredLocationsRetriever preferredLocationsRetriever,
+			final boolean slotWillBeOccupiedIndefinitely,
+			final Time allocationTimeout) {
+
+		super(preferredLocationsRetriever);
+
+		this.slotProvider = checkNotNull(slotProvider);
+		this.slotWillBeOccupiedIndefinitely = slotWillBeOccupiedIndefinitely;
+		this.allocationTimeout = checkNotNull(allocationTimeout);
+	}
+
+	@Override
+	public List<SlotExecutionVertexAssignment> allocateSlotsFor(
+			final List<ExecutionVertexSchedulingRequirements> executionVertexSchedulingRequirements) {
+
+		validateSchedulingRequirements(executionVertexSchedulingRequirements);
+
+		validateNoCoLocationConstraint(executionVertexSchedulingRequirements);
+
+		// LinkedHashMap is needed to retain the given order
+		final LinkedHashMap<SlotRequestId, SlotExecutionVertexAssignment> slotExecutionVertexAssignments =
+			createSlotExecutionVertexAssignments(executionVertexSchedulingRequirements);
+
+		final Map<ExecutionVertexID, SlotRequestId> executionVertexSlotRequestIds = slotExecutionVertexAssignments
+			.entrySet()
+			.stream()
+			.collect(Collectors.toMap(e -> e.getValue().getExecutionVertexId(), Map.Entry::getKey));
+
+		final List<CompletableFuture<PhysicalSlotRequest>> physicalSlotRequestFutures =
+			createPhysicalSlotRequestFutures(
+				executionVertexSchedulingRequirements,
+				executionVertexSlotRequestIds);
+
+		allocateSlotsForAssignments(
+			physicalSlotRequestFutures,
+			slotExecutionVertexAssignments);
+
+		return Collections.unmodifiableList(new ArrayList<>(slotExecutionVertexAssignments.values()));
+	}
+
+	private static void validateNoCoLocationConstraint(
+			final Collection<ExecutionVertexSchedulingRequirements> schedulingRequirements) {
+
+		final boolean hasCoLocationConstraint = schedulingRequirements.stream()
+			.anyMatch(r -> r.getCoLocationConstraint() != null);
+		checkState(
+			!hasCoLocationConstraint,
+			"Jobs with co-location constraints are not allowed to run with pipelined region scheduling strategy.");
+	}
+
+	private LinkedHashMap<SlotRequestId, SlotExecutionVertexAssignment> createSlotExecutionVertexAssignments(
+			final List<ExecutionVertexSchedulingRequirements> executionVertexSchedulingRequirements) {
+
+		final LinkedHashMap<SlotRequestId, SlotExecutionVertexAssignment> assignments = new LinkedHashMap<>();
+		for (ExecutionVertexSchedulingRequirements schedulingRequirements : executionVertexSchedulingRequirements) {
+			final ExecutionVertexID executionVertexId = schedulingRequirements.getExecutionVertexId();
+
+			final SlotRequestId slotRequestId = new SlotRequestId();
+			final SlotExecutionVertexAssignment slotExecutionVertexAssignment =
+				createAndRegisterSlotExecutionVertexAssignment(
+					executionVertexId,
+					new CompletableFuture<>(),
+					throwable -> slotProvider.cancelSlotRequest(slotRequestId, throwable));
+			assignments.put(slotRequestId, slotExecutionVertexAssignment);
+		}
+
+		return assignments;
+	}
+
+	private List<CompletableFuture<PhysicalSlotRequest>> createPhysicalSlotRequestFutures(
+			final List<ExecutionVertexSchedulingRequirements> executionVertexSchedulingRequirements,
+			final Map<ExecutionVertexID, SlotRequestId> executionVertexSlotRequestIds) {
+
+		final Set<AllocationID> allPreviousAllocationIds =
+			computeAllPriorAllocationIds(executionVertexSchedulingRequirements);
+
+		final List<CompletableFuture<PhysicalSlotRequest>> physicalSlotRequestFutures =
+			new ArrayList<>(executionVertexSchedulingRequirements.size());
+		for (ExecutionVertexSchedulingRequirements schedulingRequirements : executionVertexSchedulingRequirements) {
+			final ExecutionVertexID executionVertexId = schedulingRequirements.getExecutionVertexId();
+			final SlotRequestId slotRequestId = executionVertexSlotRequestIds.get(executionVertexId);
+
+			LOG.debug("Allocate slot with id {} for execution {}", slotRequestId, executionVertexId);
+
+			// use the task resource profile as the physical slot resource requirement since slot sharing is ignored
+			final CompletableFuture<SlotProfile> slotProfileFuture = getSlotProfileFuture(
+				schedulingRequirements,
+				schedulingRequirements.getTaskResourceProfile(),
+				executionVertexSlotRequestIds.keySet(),
+				allPreviousAllocationIds);
+
+			final CompletableFuture<PhysicalSlotRequest> physicalSlotRequestFuture =
+				slotProfileFuture.thenApply(
+					slotProfile -> createPhysicalSlotRequest(slotRequestId, slotProfile));
+			physicalSlotRequestFutures.add(physicalSlotRequestFuture);
+		}
+
+		return physicalSlotRequestFutures;
+	}
+
+	private PhysicalSlotRequest createPhysicalSlotRequest(
+			final SlotRequestId slotRequestId,
+			final SlotProfile slotProfile) {
+		return new PhysicalSlotRequest(slotRequestId, slotProfile, slotWillBeOccupiedIndefinitely);
+	}
+
+	private void allocateSlotsForAssignments(
+			final List<CompletableFuture<PhysicalSlotRequest>> physicalSlotRequestFutures,
+			final Map<SlotRequestId, SlotExecutionVertexAssignment> slotExecutionVertexAssignments) {
+
+		FutureUtils.combineAll(physicalSlotRequestFutures)
+			.thenCompose(physicalSlotRequests -> slotProvider.allocatePhysicalSlots(physicalSlotRequests, allocationTimeout))
+			.thenAccept(physicalSlotRequestResults -> {
+				for (PhysicalSlotRequest.Result result : physicalSlotRequestResults) {
+					final SlotRequestId slotRequestId = result.getSlotRequestId();
+					final SlotExecutionVertexAssignment assignment = slotExecutionVertexAssignments.get(slotRequestId);
+
+					checkState(assignment != null);
+
+					final LogicalSlot logicalSlot = SingleLogicalSlot.allocateFromPhysicalSlot(
+						slotRequestId,
+						result.getPhysicalSlot(),
+						Locality.UNKNOWN,
+						this,
+						slotWillBeOccupiedIndefinitely);
+					assignment.getLogicalSlotFuture().complete(logicalSlot);
+				}
+			})
+			.exceptionally(ex -> {
+				slotExecutionVertexAssignments.values().forEach(
+					assignment -> assignment.getLogicalSlotFuture().completeExceptionally(ex));
+				return null;
+			});
+	}
+
+	@Override
+	public void returnLogicalSlot(LogicalSlot logicalSlot) {
+		slotProvider.cancelSlotRequest(
+			logicalSlot.getSlotRequestId(),
+			new FlinkException("Slot is being returned to OneSlotPerExecutionSlotAllocator."));
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/OneSlotPerExecutionSlotAllocatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/OneSlotPerExecutionSlotAllocatorTest.java
new file mode 100644
index 0000000..c74f762
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/OneSlotPerExecutionSlotAllocatorTest.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.slotpool.BulkSlotProvider;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequest;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotTestUtils.createPhysicalSlot;
+import static org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorTestUtils.createSchedulingRequirements;
+import static org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorTestUtils.findSlotAssignmentByExecutionVertexId;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link OneSlotPerExecutionSlotAllocator}.
+ */
+public class OneSlotPerExecutionSlotAllocatorTest extends TestLogger {
+
+	private TestingBulkSlotProvider slotProvider;
+
+	@Before
+	public void setUp() throws Exception {
+		slotProvider = new TestingBulkSlotProvider();
+	}
+
+	@Test
+	public void testSucceededSlotAllocation() {
+		final ExecutionSlotAllocator executionSlotAllocator = createExecutionSlotAllocator();
+
+		final ExecutionVertexID executionVertexId = new ExecutionVertexID(new JobVertexID(), 0);
+		final List<ExecutionVertexSchedulingRequirements> schedulingRequirements =
+			createSchedulingRequirements(executionVertexId);
+
+		final Collection<SlotExecutionVertexAssignment> slotExecutionVertexAssignments =
+			executionSlotAllocator.allocateSlotsFor(schedulingRequirements);
+
+		assertThat(slotExecutionVertexAssignments, hasSize(1));
+
+		final SlotExecutionVertexAssignment slotAssignment = slotExecutionVertexAssignments.iterator().next();
+
+		assertThat(slotAssignment.getExecutionVertexId(), equalTo(executionVertexId));
+		assertThat(slotAssignment.getLogicalSlotFuture().isDone(), is(true));
+		assertThat(slotAssignment.getLogicalSlotFuture().isCompletedExceptionally(), is(false));
+	}
+
+	@Test
+	public void testFailedSlotAllocation() {
+		final OneSlotPerExecutionSlotAllocator executionSlotAllocator = createExecutionSlotAllocator();
+
+		final ExecutionVertexID executionVertexId = new ExecutionVertexID(new JobVertexID(), 0);
+		final List<ExecutionVertexSchedulingRequirements> schedulingRequirements =
+			createSchedulingRequirements(executionVertexId);
+
+		slotProvider.forceFailingSlotAllocation();
+		final Collection<SlotExecutionVertexAssignment> slotExecutionVertexAssignments =
+			executionSlotAllocator.allocateSlotsFor(schedulingRequirements);
+
+		final SlotExecutionVertexAssignment slotAssignment = slotExecutionVertexAssignments.iterator().next();
+
+		assertThat(slotAssignment.getLogicalSlotFuture().isCompletedExceptionally(), is(true));
+		assertThat(executionSlotAllocator.getPendingSlotAssignments().keySet(), hasSize(0));
+
+		final SlotRequestId slotRequestId = slotProvider.getSlotRequests().get(0).getSlotRequestId();
+		assertThat(slotProvider.getCancelledSlotRequestIds(), contains(slotRequestId));
+	}
+
+	@Test
+	public void testInterBulkInputLocationPreferencesAreRespected() {
+		final ExecutionVertexID producerId = new ExecutionVertexID(new JobVertexID(), 0);
+		final ExecutionVertexID consumerId = new ExecutionVertexID(new JobVertexID(), 0);
+
+		final TestingInputsLocationsRetriever inputsLocationsRetriever = new TestingInputsLocationsRetriever.Builder()
+			.connectConsumerToProducer(consumerId, producerId)
+			.build();
+
+		final ExecutionSlotAllocator executionSlotAllocator = createExecutionSlotAllocator(
+			new TestingStateLocationRetriever(),
+			inputsLocationsRetriever);
+
+		inputsLocationsRetriever.markScheduled(producerId);
+		final List<ExecutionVertexSchedulingRequirements> schedulingRequirementsForProducer =
+			createSchedulingRequirements(producerId);
+		final Collection<SlotExecutionVertexAssignment> slotExecutionVertexAssignmentsForProducer =
+			executionSlotAllocator.allocateSlotsFor(schedulingRequirementsForProducer);
+		final SlotExecutionVertexAssignment producerSlotAssignment =
+			findSlotAssignmentByExecutionVertexId(producerId, slotExecutionVertexAssignmentsForProducer);
+
+		assertThat(producerSlotAssignment.getLogicalSlotFuture().isDone(), is(true));
+
+		inputsLocationsRetriever.markScheduled(consumerId);
+		final List<ExecutionVertexSchedulingRequirements> schedulingRequirementsForConsumer =
+			createSchedulingRequirements(consumerId);
+		final Collection<SlotExecutionVertexAssignment> slotExecutionVertexAssignmentsForConsumer =
+			executionSlotAllocator.allocateSlotsFor(schedulingRequirementsForConsumer);
+		final SlotExecutionVertexAssignment consumerSlotAssignment =
+			findSlotAssignmentByExecutionVertexId(consumerId, slotExecutionVertexAssignmentsForConsumer);
+
+		assertThat(consumerSlotAssignment.getLogicalSlotFuture().isDone(), is(false));
+
+		inputsLocationsRetriever.assignTaskManagerLocation(producerId);
+
+		assertThat(consumerSlotAssignment.getLogicalSlotFuture().isDone(), is(true));
+	}
+
+	@Test
+	public void testIntraBulkInputLocationPreferencesDoNotBlockAllocation() {
+		final ExecutionVertexID producerId = new ExecutionVertexID(new JobVertexID(), 0);
+		final ExecutionVertexID consumerId = new ExecutionVertexID(new JobVertexID(), 0);
+
+		final TestingInputsLocationsRetriever inputsLocationsRetriever = new TestingInputsLocationsRetriever.Builder()
+			.connectConsumerToProducer(consumerId, producerId)
+			.build();
+
+		final ExecutionSlotAllocator executionSlotAllocator = createExecutionSlotAllocator(
+			new TestingStateLocationRetriever(),
+			inputsLocationsRetriever);
+
+		inputsLocationsRetriever.markScheduled(producerId);
+		inputsLocationsRetriever.markScheduled(consumerId);
+
+		final List<ExecutionVertexSchedulingRequirements> schedulingRequirements =
+			createSchedulingRequirements(producerId, consumerId);
+		final Collection<SlotExecutionVertexAssignment> slotExecutionVertexAssignments =
+			executionSlotAllocator.allocateSlotsFor(schedulingRequirements);
+
+		assertThat(slotExecutionVertexAssignments, hasSize(2));
+
+		final SlotExecutionVertexAssignment producerSlotAssignment =
+			findSlotAssignmentByExecutionVertexId(producerId, slotExecutionVertexAssignments);
+		final SlotExecutionVertexAssignment consumerSlotAssignment =
+			findSlotAssignmentByExecutionVertexId(consumerId, slotExecutionVertexAssignments);
+
+		assertThat(producerSlotAssignment.getLogicalSlotFuture().isDone(), is(true));
+		assertThat(consumerSlotAssignment.getLogicalSlotFuture().isDone(), is(true));
+	}
+
+	@Test
+	public void testCreatedSlotRequests() {
+		final ExecutionVertexID executionVertexId = new ExecutionVertexID(new JobVertexID(), 0);
+		final AllocationID allocationId = new AllocationID();
+		final SlotSharingGroupId sharingGroupId = new SlotSharingGroupId();
+		final ResourceProfile taskResourceProfile = ResourceProfile.fromResources(0.5, 250);
+		final ResourceProfile physicalSlotResourceProfile = ResourceProfile.fromResources(1.0, 300);
+		final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
+
+		final TestingStateLocationRetriever stateLocationRetriever = new TestingStateLocationRetriever();
+		stateLocationRetriever.setStateLocation(executionVertexId, taskManagerLocation);
+
+		final ExecutionSlotAllocator executionSlotAllocator = createExecutionSlotAllocator(
+			stateLocationRetriever,
+			new TestingInputsLocationsRetriever.Builder().build());
+
+		final List<ExecutionVertexSchedulingRequirements> schedulingRequirements = Collections.singletonList(
+			new ExecutionVertexSchedulingRequirements.Builder()
+				.withExecutionVertexId(executionVertexId)
+				.withPreviousAllocationId(allocationId)
+				.withSlotSharingGroupId(sharingGroupId)
+				.withPhysicalSlotResourceProfile(physicalSlotResourceProfile)
+				.withTaskResourceProfile(taskResourceProfile)
+				.build()
+		);
+
+		executionSlotAllocator.allocateSlotsFor(schedulingRequirements);
+		assertThat(slotProvider.getSlotRequests(), hasSize(1));
+
+		final SlotProfile requestSlotProfile = slotProvider.getSlotRequests().iterator().next().getSlotProfile();
+
+		assertThat(requestSlotProfile.getPreferredAllocations(), contains(allocationId));
+		assertThat(requestSlotProfile.getPreviousExecutionGraphAllocations(), contains(allocationId));
+		assertThat(requestSlotProfile.getTaskResourceProfile(), equalTo(taskResourceProfile));
+		assertThat(requestSlotProfile.getPreferredLocations(), contains(taskManagerLocation));
+		// task resource profile is used instead of slot sharing group resource profile since slot sharing is ignored
+		assertThat(requestSlotProfile.getPhysicalSlotResourceProfile(), equalTo(taskResourceProfile));
+	}
+
+	@Test(expected = IllegalStateException.class)
+	public void testCoLocationConstraintThrowsException() {
+		final ExecutionSlotAllocator executionSlotAllocator = createExecutionSlotAllocator();
+
+		final CoLocationConstraint coLocationConstraint = new CoLocationGroup().getLocationConstraint(0);
+		final List<ExecutionVertexSchedulingRequirements> schedulingRequirements = Collections.singletonList(
+			new ExecutionVertexSchedulingRequirements.Builder()
+				.withExecutionVertexId(new ExecutionVertexID(new JobVertexID(), 0))
+				.withCoLocationConstraint(coLocationConstraint)
+				.build()
+		);
+
+		executionSlotAllocator.allocateSlotsFor(schedulingRequirements);
+	}
+
+	private OneSlotPerExecutionSlotAllocator createExecutionSlotAllocator() {
+		return createExecutionSlotAllocator(
+			new TestingStateLocationRetriever(),
+			new TestingInputsLocationsRetriever.Builder().build());
+	}
+
+	private OneSlotPerExecutionSlotAllocator createExecutionSlotAllocator(
+			final StateLocationRetriever stateLocationRetriever,
+			final InputsLocationsRetriever inputsLocationsRetriever) {
+
+		return new OneSlotPerExecutionSlotAllocator(
+			slotProvider,
+			new DefaultPreferredLocationsRetriever(stateLocationRetriever, inputsLocationsRetriever),
+			true,
+			Time.seconds(10));
+	}
+
+	private static class TestingBulkSlotProvider implements BulkSlotProvider {
+
+		private final List<PhysicalSlotRequest> slotRequests = new ArrayList<>();
+
+		private final List<SlotRequestId> cancelledSlotRequestIds = new ArrayList<>();
+
+		private boolean forceFailingSlotAllocation = false;
+
+		@Override
+		public void start(ComponentMainThreadExecutor mainThreadExecutor) {
+		}
+
+		@Override
+		public CompletableFuture<Collection<PhysicalSlotRequest.Result>> allocatePhysicalSlots(
+				final Collection<PhysicalSlotRequest> physicalSlotRequests,
+				final Time timeout) {
+
+			slotRequests.addAll(physicalSlotRequests);
+
+			if (forceFailingSlotAllocation) {
+				return FutureUtils.completedExceptionally(new Exception("Forced failure"));
+			}
+
+			final List<PhysicalSlotRequest.Result> results = new ArrayList<>(physicalSlotRequests.size());
+			for (PhysicalSlotRequest request : physicalSlotRequests) {
+				final PhysicalSlotRequest.Result result = new PhysicalSlotRequest.Result(
+					request.getSlotRequestId(),
+					createPhysicalSlot());
+				results.add(result);
+			}
+			return CompletableFuture.completedFuture(results);
+		}
+
+		@Override
+		public void cancelSlotRequest(
+				final SlotRequestId slotRequestId,
+				final Throwable cause) {
+			cancelledSlotRequestIds.add(slotRequestId);
+		}
+
+		List<PhysicalSlotRequest> getSlotRequests() {
+			return Collections.unmodifiableList(slotRequests);
+		}
+
+		List<SlotRequestId> getCancelledSlotRequestIds() {
+			return Collections.unmodifiableList(cancelledSlotRequestIds);
+		}
+
+		void forceFailingSlotAllocation() {
+			this.forceFailingSlotAllocation = true;
+		}
+	}
+
+}