You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2020/06/16 09:14:38 UTC

[flink] 03/07: [FLINK-17018][runtime] Extract common logics of DefaultExecutionSlotAllocator into AbstractExecutionSlotAllocator

This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6d9eb50862624cb2af2d77e747550e8ce28908bd
Author: Zhu Zhu <re...@gmail.com>
AuthorDate: Thu Jun 11 22:50:00 2020 +0800

    [FLINK-17018][runtime] Extract common logics of DefaultExecutionSlotAllocator into AbstractExecutionSlotAllocator
---
 .../scheduler/AbstractExecutionSlotAllocator.java  | 131 +++++++++++++++
 .../scheduler/DefaultExecutionSlotAllocator.java   | 132 +++++----------
 .../AbstractExecutionSlotAllocatorTest.java        | 178 +++++++++++++++++++++
 .../DefaultExecutionSlotAllocatorTest.java         | 128 ++++-----------
 4 files changed, 374 insertions(+), 195 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/AbstractExecutionSlotAllocator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/AbstractExecutionSlotAllocator.java
new file mode 100644
index 0000000..d8dbf3b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/AbstractExecutionSlotAllocator.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Base class for all {@link ExecutionSlotAllocator}. It is responsible to allocate slots for tasks and
+ * keep the unfulfilled slot requests for further cancellation.
+ */
+abstract class AbstractExecutionSlotAllocator implements ExecutionSlotAllocator {
+
+	private final Map<ExecutionVertexID, SlotExecutionVertexAssignment> pendingSlotAssignments;
+
+	private final PreferredLocationsRetriever preferredLocationsRetriever;
+
+	AbstractExecutionSlotAllocator(final PreferredLocationsRetriever preferredLocationsRetriever) {
+		this.preferredLocationsRetriever = checkNotNull(preferredLocationsRetriever);
+		this.pendingSlotAssignments = new HashMap<>();
+	}
+
+	@Override
+	public void cancel(final ExecutionVertexID executionVertexId) {
+		final SlotExecutionVertexAssignment slotExecutionVertexAssignment = pendingSlotAssignments.get(executionVertexId);
+		if (slotExecutionVertexAssignment != null) {
+			slotExecutionVertexAssignment.getLogicalSlotFuture().cancel(false);
+		}
+	}
+
+	void validateSchedulingRequirements(final Collection<ExecutionVertexSchedulingRequirements> schedulingRequirements) {
+		schedulingRequirements.stream()
+			.map(ExecutionVertexSchedulingRequirements::getExecutionVertexId)
+			.forEach(id -> checkState(
+				!pendingSlotAssignments.containsKey(id),
+				"BUG: vertex %s tries to allocate a slot when its previous slot request is still pending", id));
+	}
+
+	SlotExecutionVertexAssignment createAndRegisterSlotExecutionVertexAssignment(
+			final ExecutionVertexID executionVertexId,
+			final CompletableFuture<LogicalSlot> logicalSlotFuture,
+			final Consumer<Throwable> slotRequestFailureHandler) {
+
+		final SlotExecutionVertexAssignment slotExecutionVertexAssignment =
+			new SlotExecutionVertexAssignment(executionVertexId, logicalSlotFuture);
+
+		// add to map first in case the slot future is already completed
+		pendingSlotAssignments.put(executionVertexId, slotExecutionVertexAssignment);
+
+		logicalSlotFuture.whenComplete(
+			(ignored, throwable) -> {
+				pendingSlotAssignments.remove(executionVertexId);
+				if (throwable != null) {
+					slotRequestFailureHandler.accept(throwable);
+				}
+			});
+
+		return slotExecutionVertexAssignment;
+	}
+
+	CompletableFuture<SlotProfile> getSlotProfileFuture(
+			final ExecutionVertexSchedulingRequirements schedulingRequirements,
+			final ResourceProfile physicalSlotResourceProfile,
+			final Set<ExecutionVertexID> producersToIgnore,
+			final Set<AllocationID> allPreviousAllocationIds) {
+
+		final CompletableFuture<Collection<TaskManagerLocation>> preferredLocationsFuture =
+			preferredLocationsRetriever.getPreferredLocations(
+				schedulingRequirements.getExecutionVertexId(),
+				producersToIgnore);
+
+		return preferredLocationsFuture.thenApply(
+			preferredLocations ->
+				SlotProfile.priorAllocation(
+					schedulingRequirements.getTaskResourceProfile(),
+					physicalSlotResourceProfile,
+					preferredLocations,
+					Collections.singletonList(schedulingRequirements.getPreviousAllocationId()),
+					allPreviousAllocationIds));
+	}
+
+	@VisibleForTesting
+	static Set<AllocationID> computeAllPriorAllocationIds(
+			final Collection<ExecutionVertexSchedulingRequirements> executionVertexSchedulingRequirements) {
+
+		return executionVertexSchedulingRequirements
+			.stream()
+			.map(ExecutionVertexSchedulingRequirements::getPreviousAllocationId)
+			.filter(Objects::nonNull)
+			.collect(Collectors.toSet());
+	}
+
+	@VisibleForTesting
+	Map<ExecutionVertexID, SlotExecutionVertexAssignment> getPendingSlotAssignments() {
+		return Collections.unmodifiableMap(pendingSlotAssignments);
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocator.java
index c7b2dd9..07da10e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocator.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.scheduler;
 
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.SlotProfile;
 import org.apache.flink.runtime.executiongraph.SlotProviderStrategy;
@@ -28,49 +27,34 @@ import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.SlotRequestId;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
-import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * Default {@link ExecutionSlotAllocator} which will use {@link SlotProvider} to allocate slots and
  * keep the unfulfilled requests for further cancellation.
  */
-public class DefaultExecutionSlotAllocator implements ExecutionSlotAllocator {
+public class DefaultExecutionSlotAllocator extends AbstractExecutionSlotAllocator {
 
 	private static final Logger LOG = LoggerFactory.getLogger(DefaultExecutionSlotAllocator.class);
 
-	/**
-	 * Store the uncompleted slot assignments.
-	 */
-	private final Map<ExecutionVertexID, SlotExecutionVertexAssignment> pendingSlotAssignments;
-
 	private final SlotProviderStrategy slotProviderStrategy;
 
-	private final PreferredLocationsRetriever preferredLocationsRetriever;
-
 	public DefaultExecutionSlotAllocator(
 			final SlotProviderStrategy slotProviderStrategy,
 			final PreferredLocationsRetriever preferredLocationsRetriever) {
-		this.slotProviderStrategy = checkNotNull(slotProviderStrategy);
-		this.preferredLocationsRetriever = checkNotNull(preferredLocationsRetriever);
 
-		pendingSlotAssignments = new HashMap<>();
+		super(preferredLocationsRetriever);
+		this.slotProviderStrategy = checkNotNull(slotProviderStrategy);
 	}
 
 	@Override
@@ -86,89 +70,49 @@ public class DefaultExecutionSlotAllocator implements ExecutionSlotAllocator {
 
 		for (ExecutionVertexSchedulingRequirements schedulingRequirements : executionVertexSchedulingRequirements) {
 			final ExecutionVertexID executionVertexId = schedulingRequirements.getExecutionVertexId();
-			final SlotRequestId slotRequestId = new SlotRequestId();
 			final SlotSharingGroupId slotSharingGroupId = schedulingRequirements.getSlotSharingGroupId();
 
-			LOG.debug("Allocate slot with id {} for execution {}", slotRequestId, executionVertexId);
-
-			CompletableFuture<LogicalSlot> slotFuture = calculatePreferredLocations(
-					executionVertexId).thenCompose(
-							(Collection<TaskManagerLocation> preferredLocations) ->
-								slotProviderStrategy.allocateSlot(
-									slotRequestId,
-									new ScheduledUnit(
-										executionVertexId,
-										slotSharingGroupId,
-										schedulingRequirements.getCoLocationConstraint()),
-									SlotProfile.priorAllocation(
-										schedulingRequirements.getTaskResourceProfile(),
-										schedulingRequirements.getPhysicalSlotResourceProfile(),
-										preferredLocations,
-										Collections.singletonList(schedulingRequirements.getPreviousAllocationId()),
-										allPreviousAllocationIds)));
-
-			SlotExecutionVertexAssignment slotExecutionVertexAssignment =
-					new SlotExecutionVertexAssignment(executionVertexId, slotFuture);
-			// add to map first to avoid the future completed before added.
-			pendingSlotAssignments.put(executionVertexId, slotExecutionVertexAssignment);
-
-			slotFuture.whenComplete(
-					(ignored, throwable) -> {
-						pendingSlotAssignments.remove(executionVertexId);
-						if (throwable != null) {
-							slotProviderStrategy.cancelSlotRequest(slotRequestId, slotSharingGroupId, throwable);
-						}
-					});
-
-			slotExecutionVertexAssignments.add(slotExecutionVertexAssignment);
-		}
+			final SlotRequestId slotRequestId = new SlotRequestId();
 
-		return slotExecutionVertexAssignments;
-	}
+			final CompletableFuture<LogicalSlot> slotFuture = allocateSlot(
+				schedulingRequirements,
+				slotRequestId,
+				allPreviousAllocationIds);
 
-	private void validateSchedulingRequirements(Collection<ExecutionVertexSchedulingRequirements> schedulingRequirements) {
-		schedulingRequirements.stream()
-			.map(ExecutionVertexSchedulingRequirements::getExecutionVertexId)
-			.forEach(id -> checkState(
-				!pendingSlotAssignments.containsKey(id),
-				"BUG: vertex %s tries to allocate a slot when its previous slot request is still pending", id));
-	}
+			final SlotExecutionVertexAssignment slotExecutionVertexAssignment =
+				createAndRegisterSlotExecutionVertexAssignment(
+					executionVertexId,
+					slotFuture,
+					throwable -> slotProviderStrategy.cancelSlotRequest(slotRequestId, slotSharingGroupId, throwable));
 
-	@Override
-	public void cancel(ExecutionVertexID executionVertexId) {
-		SlotExecutionVertexAssignment slotExecutionVertexAssignment = pendingSlotAssignments.get(executionVertexId);
-		if (slotExecutionVertexAssignment != null) {
-			slotExecutionVertexAssignment.getLogicalSlotFuture().cancel(false);
+			slotExecutionVertexAssignments.add(slotExecutionVertexAssignment);
 		}
-	}
-
-	/**
-	 * Calculates the preferred locations for an execution.
-	 * It will first try to use preferred locations based on state,
-	 * if null, will use the preferred locations based on inputs.
-	 */
-	private CompletableFuture<Collection<TaskManagerLocation>> calculatePreferredLocations(
-			ExecutionVertexID executionVertexId) {
-		return preferredLocationsRetriever.getPreferredLocations(executionVertexId, Collections.emptySet());
-	}
 
-	/**
-	 * Computes and returns a set with the prior allocation ids from all execution vertices scheduled together.
-	 *
-	 * @param executionVertexSchedulingRequirements contains the execution vertices which are scheduled together
-	 */
-	@VisibleForTesting
-	static Set<AllocationID> computeAllPriorAllocationIds(
-			Collection<ExecutionVertexSchedulingRequirements> executionVertexSchedulingRequirements) {
-		return executionVertexSchedulingRequirements
-			.stream()
-			.map(ExecutionVertexSchedulingRequirements::getPreviousAllocationId)
-			.filter(Objects::nonNull)
-			.collect(Collectors.toSet());
+		return slotExecutionVertexAssignments;
 	}
 
-	@VisibleForTesting
-	int getNumberOfPendingSlotAssignments() {
-		return pendingSlotAssignments.size();
+	private CompletableFuture<LogicalSlot> allocateSlot(
+			final ExecutionVertexSchedulingRequirements schedulingRequirements,
+			final SlotRequestId slotRequestId,
+			final Set<AllocationID> allPreviousAllocationIds) {
+
+		final ExecutionVertexID executionVertexId = schedulingRequirements.getExecutionVertexId();
+
+		LOG.debug("Allocate slot with id {} for execution {}", slotRequestId, executionVertexId);
+
+		final CompletableFuture<SlotProfile> slotProfileFuture = getSlotProfileFuture(
+			schedulingRequirements,
+			schedulingRequirements.getPhysicalSlotResourceProfile(),
+			Collections.emptySet(),
+			allPreviousAllocationIds);
+
+		return slotProfileFuture.thenCompose(
+			slotProfile -> slotProviderStrategy.allocateSlot(
+				slotRequestId,
+				new ScheduledUnit(
+					executionVertexId,
+					schedulingRequirements.getSlotSharingGroupId(),
+					schedulingRequirements.getCoLocationConstraint()),
+				slotProfile));
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/AbstractExecutionSlotAllocatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/AbstractExecutionSlotAllocatorTest.java
new file mode 100644
index 0000000..4077930
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/AbstractExecutionSlotAllocatorTest.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.hasSize;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link AbstractExecutionSlotAllocator}.
+ */
+public class AbstractExecutionSlotAllocatorTest extends TestLogger {
+
+	private AbstractExecutionSlotAllocator executionSlotAllocator;
+
+	@Before
+	public void setUp() throws Exception {
+		executionSlotAllocator = new TestingExecutionSlotAllocator();
+	}
+
+	@Test
+	public void testCancel() {
+		final ExecutionVertexID executionVertexId = new ExecutionVertexID(new JobVertexID(), 0);
+
+		final List<ExecutionVertexSchedulingRequirements> schedulingRequirements =
+			createSchedulingRequirements(executionVertexId);
+		final List<SlotExecutionVertexAssignment> assignments =
+			executionSlotAllocator.allocateSlotsFor(schedulingRequirements);
+
+		executionSlotAllocator.cancel(executionVertexId);
+
+		assertThat(assignments.get(0).getLogicalSlotFuture().isCancelled(), is(true));
+	}
+
+	@Test(expected = IllegalStateException.class)
+	public void testValidateSchedulingRequirements() {
+		final ExecutionVertexID executionVertexId = new ExecutionVertexID(new JobVertexID(), 0);
+
+		final List<ExecutionVertexSchedulingRequirements> schedulingRequirements =
+			createSchedulingRequirements(executionVertexId);
+		executionSlotAllocator.allocateSlotsFor(schedulingRequirements);
+
+		executionSlotAllocator.validateSchedulingRequirements(schedulingRequirements);
+	}
+
+	@Test
+	public void testCreateAndRegisterSlotExecutionVertexAssignment() {
+		final ExecutionVertexID executionVertexId = new ExecutionVertexID(new JobVertexID(), 0);
+
+		final List<ExecutionVertexSchedulingRequirements> schedulingRequirements =
+			createSchedulingRequirements(executionVertexId);
+		final List<SlotExecutionVertexAssignment> assignments =
+			executionSlotAllocator.allocateSlotsFor(schedulingRequirements);
+
+		assertThat(assignments, hasSize(1));
+
+		final SlotExecutionVertexAssignment assignment = assignments.get(0);
+		assertThat(assignment.getExecutionVertexId(), is(executionVertexId));
+		assertThat(assignment.getLogicalSlotFuture().isDone(), is(false));
+		assertThat(executionSlotAllocator.getPendingSlotAssignments().values(), contains(assignment));
+
+		assignment.getLogicalSlotFuture().cancel(false);
+
+		assertThat(executionSlotAllocator.getPendingSlotAssignments().keySet(), hasSize(0));
+	}
+
+	@Test
+	public void testCompletedExecutionVertexAssignmentWillBeUnregistered() {
+		final ExecutionVertexID executionVertexId = new ExecutionVertexID(new JobVertexID(), 0);
+
+		final List<ExecutionVertexSchedulingRequirements> schedulingRequirements =
+			createSchedulingRequirements(executionVertexId);
+		final List<SlotExecutionVertexAssignment> assignments =
+			executionSlotAllocator.allocateSlotsFor(schedulingRequirements);
+
+		assignments.get(0).getLogicalSlotFuture().cancel(false);
+
+		assertThat(executionSlotAllocator.getPendingSlotAssignments().keySet(), hasSize(0));
+	}
+
+	@Test
+	public void testComputeAllPriorAllocationIds() {
+		final List<AllocationID> expectAllocationIds = Arrays.asList(new AllocationID(), new AllocationID());
+		final List<ExecutionVertexSchedulingRequirements> testSchedulingRequirements = Arrays.asList(
+			new ExecutionVertexSchedulingRequirements.Builder().
+				withExecutionVertexId(new ExecutionVertexID(new JobVertexID(), 0)).
+				withPreviousAllocationId(expectAllocationIds.get(0)).
+				build(),
+			new ExecutionVertexSchedulingRequirements.Builder().
+				withExecutionVertexId(new ExecutionVertexID(new JobVertexID(), 1)).
+				withPreviousAllocationId(expectAllocationIds.get(0)).
+				build(),
+			new ExecutionVertexSchedulingRequirements.Builder().
+				withExecutionVertexId(new ExecutionVertexID(new JobVertexID(), 2)).
+				withPreviousAllocationId(expectAllocationIds.get(1)).
+				build(),
+			new ExecutionVertexSchedulingRequirements.Builder().
+				withExecutionVertexId(new ExecutionVertexID(new JobVertexID(), 3)).
+				build()
+		);
+
+		final Set<AllocationID> allPriorAllocationIds =
+			AbstractExecutionSlotAllocator.computeAllPriorAllocationIds(testSchedulingRequirements);
+		assertThat(allPriorAllocationIds, containsInAnyOrder(expectAllocationIds.toArray()));
+	}
+
+	private List<ExecutionVertexSchedulingRequirements> createSchedulingRequirements(
+			final ExecutionVertexID... executionVertexIds) {
+
+		final List<ExecutionVertexSchedulingRequirements> schedulingRequirements = new ArrayList<>(executionVertexIds.length);
+
+		for (ExecutionVertexID executionVertexId : executionVertexIds) {
+			schedulingRequirements.add(new ExecutionVertexSchedulingRequirements.Builder()
+				.withExecutionVertexId(executionVertexId).build());
+		}
+		return schedulingRequirements;
+	}
+
+	private static class TestingExecutionSlotAllocator extends AbstractExecutionSlotAllocator {
+
+		TestingExecutionSlotAllocator() {
+			super(
+				new DefaultPreferredLocationsRetriever(
+					new TestingStateLocationRetriever(),
+					new TestingInputsLocationsRetriever.Builder().build()));
+		}
+
+		@Override
+		public List<SlotExecutionVertexAssignment> allocateSlotsFor(
+				final List<ExecutionVertexSchedulingRequirements> executionVertexSchedulingRequirements) {
+
+			final List<SlotExecutionVertexAssignment> slotExecutionVertexAssignments =
+				new ArrayList<>(executionVertexSchedulingRequirements.size());
+
+			for (ExecutionVertexSchedulingRequirements schedulingRequirements : executionVertexSchedulingRequirements) {
+				slotExecutionVertexAssignments.add(
+					createAndRegisterSlotExecutionVertexAssignment(
+						schedulingRequirements.getExecutionVertexId(),
+						new CompletableFuture<>(),
+						throwable -> {}));
+			}
+
+			return slotExecutionVertexAssignments;
+		}
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorTest.java
index ad74357..0646a32 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorTest.java
@@ -49,18 +49,12 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
-import java.util.stream.Collectors;
 
 import static org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorTestUtils.createSchedulingRequirements;
 import static org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorTestUtils.findSlotAssignmentByExecutionVertexId;
 import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.hasSize;
-import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
@@ -111,7 +105,7 @@ public class DefaultExecutionSlotAllocatorTest extends TestLogger {
 		inputsLocationsRetriever.assignTaskManagerLocation(producerId);
 
 		assertTrue(consumerSlotAssignment.getLogicalSlotFuture().isDone());
-		assertEquals(0, executionSlotAllocator.getNumberOfPendingSlotAssignments());
+		assertThat(executionSlotAllocator.getPendingSlotAssignments().keySet(), hasSize(0));
 	}
 
 	/**
@@ -160,109 +154,47 @@ public class DefaultExecutionSlotAllocatorTest extends TestLogger {
 		assertThat(expectedSlotProfile.getPreferredLocations(), contains(taskManagerLocation));
 	}
 
-	/**
-	 * Tests that cancels an execution vertex which is not existed.
-	 */
 	@Test
-	public void testCancelNonExistingExecutionVertex() {
-		final DefaultExecutionSlotAllocator executionSlotAllocator = createExecutionSlotAllocator();
-
-		ExecutionVertexID inValidExecutionVertexId = new ExecutionVertexID(new JobVertexID(), 0);
-		executionSlotAllocator.cancel(inValidExecutionVertexId);
-
-		assertThat(slotProvider.getCancelledSlotRequestIds(), is(empty()));
-	}
-
-	/**
-	 * Tests that cancels a slot request which has already been fulfilled.
-	 */
-	@Test
-	public void testCancelFulfilledSlotRequest() {
-		final ExecutionVertexID producerId = new ExecutionVertexID(new JobVertexID(), 0);
+	public void testDuplicatedSlotAllocationIsNotAllowed() {
+		final ExecutionVertexID executionVertexId = new ExecutionVertexID(new JobVertexID(), 0);
 
 		final DefaultExecutionSlotAllocator executionSlotAllocator = createExecutionSlotAllocator();
+		slotProvider.disableSlotAllocation();
 
 		final List<ExecutionVertexSchedulingRequirements> schedulingRequirements =
-				createSchedulingRequirements(producerId);
+			createSchedulingRequirements(executionVertexId);
 		executionSlotAllocator.allocateSlotsFor(schedulingRequirements);
 
-		executionSlotAllocator.cancel(producerId);
-
-		assertThat(slotProvider.getCancelledSlotRequestIds(), is(empty()));
+		try {
+			executionSlotAllocator.allocateSlotsFor(schedulingRequirements);
+			fail("exception should happen");
+		} catch (IllegalStateException e) {
+			// IllegalStateException is expected
+		}
 	}
 
-	/**
-	 * Tests that cancels a slot request which has not been fulfilled.
-	 */
 	@Test
-	public void testCancelUnFulfilledSlotRequest() throws Exception {
-		final ExecutionVertexID producerId = new ExecutionVertexID(new JobVertexID(), 0);
-
+	public void testSlotAssignmentIsProperlyRegistered() {
 		final DefaultExecutionSlotAllocator executionSlotAllocator = createExecutionSlotAllocator();
 
-		slotProvider.disableSlotAllocation();
+		final ExecutionVertexID executionVertexID = new ExecutionVertexID(new JobVertexID(), 0);
 		final List<ExecutionVertexSchedulingRequirements> schedulingRequirements =
-				createSchedulingRequirements(producerId);
-		Collection<SlotExecutionVertexAssignment> assignments = executionSlotAllocator.allocateSlotsFor(schedulingRequirements);
-
-		executionSlotAllocator.cancel(producerId);
-
-		assertThat(slotProvider.getCancelledSlotRequestIds(), hasSize(1));
-		assertThat(slotProvider.getCancelledSlotRequestIds(), contains(slotProvider.getReceivedSlotRequestIds().toArray()));
-
-		try {
-			assignments.iterator().next().getLogicalSlotFuture().get();
-			fail("Expect a CancellationException but got nothing.");
-		} catch (CancellationException ignored) {
-			// Expected exception
-		}
-	}
+			createSchedulingRequirements(executionVertexID);
 
-	/**
-	 * Tests that all prior allocation ids are computed by union all previous allocation ids in scheduling requirements.
-	 */
-	@Test
-	public void testComputeAllPriorAllocationIds() {
-		List<AllocationID> expectAllocationIds = Arrays.asList(new AllocationID(), new AllocationID());
-		List<ExecutionVertexSchedulingRequirements> testSchedulingRequirements = Arrays.asList(
-				new ExecutionVertexSchedulingRequirements.Builder().
-						withExecutionVertexId(new ExecutionVertexID(new JobVertexID(), 0)).
-						withPreviousAllocationId(expectAllocationIds.get(0)).
-						build(),
-				new ExecutionVertexSchedulingRequirements.Builder().
-						withExecutionVertexId(new ExecutionVertexID(new JobVertexID(), 1)).
-						withPreviousAllocationId(expectAllocationIds.get(0)).
-						build(),
-				new ExecutionVertexSchedulingRequirements.Builder().
-						withExecutionVertexId(new ExecutionVertexID(new JobVertexID(), 2)).
-						withPreviousAllocationId(expectAllocationIds.get(1)).
-						build(),
-				new ExecutionVertexSchedulingRequirements.Builder().
-						withExecutionVertexId(new ExecutionVertexID(new JobVertexID(), 3)).
-						build()
-		);
+		slotProvider.disableSlotAllocation();
+		final Collection<SlotExecutionVertexAssignment> slotExecutionVertexAssignments =
+			executionSlotAllocator.allocateSlotsFor(schedulingRequirements);
 
-		Set<AllocationID> allPriorAllocationIds = DefaultExecutionSlotAllocator.computeAllPriorAllocationIds(testSchedulingRequirements);
-		assertThat(allPriorAllocationIds, containsInAnyOrder(expectAllocationIds.toArray()));
-	}
+		final SlotExecutionVertexAssignment slotAssignment = slotExecutionVertexAssignments.iterator().next();
 
-	@Test
-	public void testDuplicatedSlotAllocationIsNotAllowed() {
-		final ExecutionVertexID executionVertexId = new ExecutionVertexID(new JobVertexID(), 0);
+		assertThat(executionSlotAllocator.getPendingSlotAssignments().values(), contains(slotAssignment));
 
-		final DefaultExecutionSlotAllocator executionSlotAllocator = createExecutionSlotAllocator();
-		slotProvider.disableSlotAllocation();
+		executionSlotAllocator.cancel(executionVertexID);
 
-		final List<ExecutionVertexSchedulingRequirements> schedulingRequirements =
-			createSchedulingRequirements(executionVertexId);
-		executionSlotAllocator.allocateSlotsFor(schedulingRequirements);
+		assertThat(executionSlotAllocator.getPendingSlotAssignments().keySet(), hasSize(0));
 
-		try {
-			executionSlotAllocator.allocateSlotsFor(schedulingRequirements);
-			fail("exception should happen");
-		} catch (IllegalStateException e) {
-			// IllegalStateException is expected
-		}
+		final SlotRequestId slotRequestId = slotProvider.slotAllocationRequests.get(0).f0;
+		assertThat(slotProvider.getCancelledSlotRequestIds(), contains(slotRequestId));
 	}
 
 	private DefaultExecutionSlotAllocator createExecutionSlotAllocator() {
@@ -317,18 +249,12 @@ public class DefaultExecutionSlotAllocatorTest extends TestLogger {
 			return Collections.unmodifiableList(slotAllocationRequests);
 		}
 
-		public List<SlotRequestId> getReceivedSlotRequestIds() {
-			return slotAllocationRequests.stream()
-					.map(requestTuple -> requestTuple.f0)
-					.collect(Collectors.toList());
-		}
-
-		public List<SlotRequestId> getCancelledSlotRequestIds() {
-			return Collections.unmodifiableList(cancelledSlotRequestIds);
-		}
-
 		public void disableSlotAllocation() {
 			slotAllocationDisabled = true;
 		}
+
+		List<SlotRequestId> getCancelledSlotRequestIds() {
+			return cancelledSlotRequestIds;
+		}
 	}
 }