You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2020/06/16 09:14:38 UTC
[flink] 03/07: [FLINK-17018][runtime] Extract common logics of
DefaultExecutionSlotAllocator into AbstractExecutionSlotAllocator
This is an automated email from the ASF dual-hosted git repository.
zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6d9eb50862624cb2af2d77e747550e8ce28908bd
Author: Zhu Zhu <re...@gmail.com>
AuthorDate: Thu Jun 11 22:50:00 2020 +0800
[FLINK-17018][runtime] Extract common logics of DefaultExecutionSlotAllocator into AbstractExecutionSlotAllocator
---
.../scheduler/AbstractExecutionSlotAllocator.java | 131 +++++++++++++++
.../scheduler/DefaultExecutionSlotAllocator.java | 132 +++++----------
.../AbstractExecutionSlotAllocatorTest.java | 178 +++++++++++++++++++++
.../DefaultExecutionSlotAllocatorTest.java | 128 ++++-----------
4 files changed, 374 insertions(+), 195 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/AbstractExecutionSlotAllocator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/AbstractExecutionSlotAllocator.java
new file mode 100644
index 0000000..d8dbf3b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/AbstractExecutionSlotAllocator.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Base class for all {@link ExecutionSlotAllocator}. It is responsible to allocate slots for tasks and
+ * keep the unfulfilled slot requests for further cancellation.
+ */
+abstract class AbstractExecutionSlotAllocator implements ExecutionSlotAllocator {
+
+ private final Map<ExecutionVertexID, SlotExecutionVertexAssignment> pendingSlotAssignments;
+
+ private final PreferredLocationsRetriever preferredLocationsRetriever;
+
+ AbstractExecutionSlotAllocator(final PreferredLocationsRetriever preferredLocationsRetriever) {
+ this.preferredLocationsRetriever = checkNotNull(preferredLocationsRetriever);
+ this.pendingSlotAssignments = new HashMap<>();
+ }
+
+ @Override
+ public void cancel(final ExecutionVertexID executionVertexId) {
+ final SlotExecutionVertexAssignment slotExecutionVertexAssignment = pendingSlotAssignments.get(executionVertexId);
+ if (slotExecutionVertexAssignment != null) {
+ slotExecutionVertexAssignment.getLogicalSlotFuture().cancel(false);
+ }
+ }
+
+ void validateSchedulingRequirements(final Collection<ExecutionVertexSchedulingRequirements> schedulingRequirements) {
+ schedulingRequirements.stream()
+ .map(ExecutionVertexSchedulingRequirements::getExecutionVertexId)
+ .forEach(id -> checkState(
+ !pendingSlotAssignments.containsKey(id),
+ "BUG: vertex %s tries to allocate a slot when its previous slot request is still pending", id));
+ }
+
+ SlotExecutionVertexAssignment createAndRegisterSlotExecutionVertexAssignment(
+ final ExecutionVertexID executionVertexId,
+ final CompletableFuture<LogicalSlot> logicalSlotFuture,
+ final Consumer<Throwable> slotRequestFailureHandler) {
+
+ final SlotExecutionVertexAssignment slotExecutionVertexAssignment =
+ new SlotExecutionVertexAssignment(executionVertexId, logicalSlotFuture);
+
+ // add to map first in case the slot future is already completed
+ pendingSlotAssignments.put(executionVertexId, slotExecutionVertexAssignment);
+
+ logicalSlotFuture.whenComplete(
+ (ignored, throwable) -> {
+ pendingSlotAssignments.remove(executionVertexId);
+ if (throwable != null) {
+ slotRequestFailureHandler.accept(throwable);
+ }
+ });
+
+ return slotExecutionVertexAssignment;
+ }
+
+ CompletableFuture<SlotProfile> getSlotProfileFuture(
+ final ExecutionVertexSchedulingRequirements schedulingRequirements,
+ final ResourceProfile physicalSlotResourceProfile,
+ final Set<ExecutionVertexID> producersToIgnore,
+ final Set<AllocationID> allPreviousAllocationIds) {
+
+ final CompletableFuture<Collection<TaskManagerLocation>> preferredLocationsFuture =
+ preferredLocationsRetriever.getPreferredLocations(
+ schedulingRequirements.getExecutionVertexId(),
+ producersToIgnore);
+
+ return preferredLocationsFuture.thenApply(
+ preferredLocations ->
+ SlotProfile.priorAllocation(
+ schedulingRequirements.getTaskResourceProfile(),
+ physicalSlotResourceProfile,
+ preferredLocations,
+ Collections.singletonList(schedulingRequirements.getPreviousAllocationId()),
+ allPreviousAllocationIds));
+ }
+
+ @VisibleForTesting
+ static Set<AllocationID> computeAllPriorAllocationIds(
+ final Collection<ExecutionVertexSchedulingRequirements> executionVertexSchedulingRequirements) {
+
+ return executionVertexSchedulingRequirements
+ .stream()
+ .map(ExecutionVertexSchedulingRequirements::getPreviousAllocationId)
+ .filter(Objects::nonNull)
+ .collect(Collectors.toSet());
+ }
+
+ @VisibleForTesting
+ Map<ExecutionVertexID, SlotExecutionVertexAssignment> getPendingSlotAssignments() {
+ return Collections.unmodifiableMap(pendingSlotAssignments);
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocator.java
index c7b2dd9..07da10e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocator.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.scheduler;
-import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.SlotProfile;
import org.apache.flink.runtime.executiongraph.SlotProviderStrategy;
@@ -28,49 +27,34 @@ import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
-import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
-import java.util.stream.Collectors;
import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
/**
* Default {@link ExecutionSlotAllocator} which will use {@link SlotProvider} to allocate slots and
* keep the unfulfilled requests for further cancellation.
*/
-public class DefaultExecutionSlotAllocator implements ExecutionSlotAllocator {
+public class DefaultExecutionSlotAllocator extends AbstractExecutionSlotAllocator {
private static final Logger LOG = LoggerFactory.getLogger(DefaultExecutionSlotAllocator.class);
- /**
- * Store the uncompleted slot assignments.
- */
- private final Map<ExecutionVertexID, SlotExecutionVertexAssignment> pendingSlotAssignments;
-
private final SlotProviderStrategy slotProviderStrategy;
- private final PreferredLocationsRetriever preferredLocationsRetriever;
-
public DefaultExecutionSlotAllocator(
final SlotProviderStrategy slotProviderStrategy,
final PreferredLocationsRetriever preferredLocationsRetriever) {
- this.slotProviderStrategy = checkNotNull(slotProviderStrategy);
- this.preferredLocationsRetriever = checkNotNull(preferredLocationsRetriever);
- pendingSlotAssignments = new HashMap<>();
+ super(preferredLocationsRetriever);
+ this.slotProviderStrategy = checkNotNull(slotProviderStrategy);
}
@Override
@@ -86,89 +70,49 @@ public class DefaultExecutionSlotAllocator implements ExecutionSlotAllocator {
for (ExecutionVertexSchedulingRequirements schedulingRequirements : executionVertexSchedulingRequirements) {
final ExecutionVertexID executionVertexId = schedulingRequirements.getExecutionVertexId();
- final SlotRequestId slotRequestId = new SlotRequestId();
final SlotSharingGroupId slotSharingGroupId = schedulingRequirements.getSlotSharingGroupId();
- LOG.debug("Allocate slot with id {} for execution {}", slotRequestId, executionVertexId);
-
- CompletableFuture<LogicalSlot> slotFuture = calculatePreferredLocations(
- executionVertexId).thenCompose(
- (Collection<TaskManagerLocation> preferredLocations) ->
- slotProviderStrategy.allocateSlot(
- slotRequestId,
- new ScheduledUnit(
- executionVertexId,
- slotSharingGroupId,
- schedulingRequirements.getCoLocationConstraint()),
- SlotProfile.priorAllocation(
- schedulingRequirements.getTaskResourceProfile(),
- schedulingRequirements.getPhysicalSlotResourceProfile(),
- preferredLocations,
- Collections.singletonList(schedulingRequirements.getPreviousAllocationId()),
- allPreviousAllocationIds)));
-
- SlotExecutionVertexAssignment slotExecutionVertexAssignment =
- new SlotExecutionVertexAssignment(executionVertexId, slotFuture);
- // add to map first to avoid the future completed before added.
- pendingSlotAssignments.put(executionVertexId, slotExecutionVertexAssignment);
-
- slotFuture.whenComplete(
- (ignored, throwable) -> {
- pendingSlotAssignments.remove(executionVertexId);
- if (throwable != null) {
- slotProviderStrategy.cancelSlotRequest(slotRequestId, slotSharingGroupId, throwable);
- }
- });
-
- slotExecutionVertexAssignments.add(slotExecutionVertexAssignment);
- }
+ final SlotRequestId slotRequestId = new SlotRequestId();
- return slotExecutionVertexAssignments;
- }
+ final CompletableFuture<LogicalSlot> slotFuture = allocateSlot(
+ schedulingRequirements,
+ slotRequestId,
+ allPreviousAllocationIds);
- private void validateSchedulingRequirements(Collection<ExecutionVertexSchedulingRequirements> schedulingRequirements) {
- schedulingRequirements.stream()
- .map(ExecutionVertexSchedulingRequirements::getExecutionVertexId)
- .forEach(id -> checkState(
- !pendingSlotAssignments.containsKey(id),
- "BUG: vertex %s tries to allocate a slot when its previous slot request is still pending", id));
- }
+ final SlotExecutionVertexAssignment slotExecutionVertexAssignment =
+ createAndRegisterSlotExecutionVertexAssignment(
+ executionVertexId,
+ slotFuture,
+ throwable -> slotProviderStrategy.cancelSlotRequest(slotRequestId, slotSharingGroupId, throwable));
- @Override
- public void cancel(ExecutionVertexID executionVertexId) {
- SlotExecutionVertexAssignment slotExecutionVertexAssignment = pendingSlotAssignments.get(executionVertexId);
- if (slotExecutionVertexAssignment != null) {
- slotExecutionVertexAssignment.getLogicalSlotFuture().cancel(false);
+ slotExecutionVertexAssignments.add(slotExecutionVertexAssignment);
}
- }
-
- /**
- * Calculates the preferred locations for an execution.
- * It will first try to use preferred locations based on state,
- * if null, will use the preferred locations based on inputs.
- */
- private CompletableFuture<Collection<TaskManagerLocation>> calculatePreferredLocations(
- ExecutionVertexID executionVertexId) {
- return preferredLocationsRetriever.getPreferredLocations(executionVertexId, Collections.emptySet());
- }
- /**
- * Computes and returns a set with the prior allocation ids from all execution vertices scheduled together.
- *
- * @param executionVertexSchedulingRequirements contains the execution vertices which are scheduled together
- */
- @VisibleForTesting
- static Set<AllocationID> computeAllPriorAllocationIds(
- Collection<ExecutionVertexSchedulingRequirements> executionVertexSchedulingRequirements) {
- return executionVertexSchedulingRequirements
- .stream()
- .map(ExecutionVertexSchedulingRequirements::getPreviousAllocationId)
- .filter(Objects::nonNull)
- .collect(Collectors.toSet());
+ return slotExecutionVertexAssignments;
}
- @VisibleForTesting
- int getNumberOfPendingSlotAssignments() {
- return pendingSlotAssignments.size();
+ private CompletableFuture<LogicalSlot> allocateSlot(
+ final ExecutionVertexSchedulingRequirements schedulingRequirements,
+ final SlotRequestId slotRequestId,
+ final Set<AllocationID> allPreviousAllocationIds) {
+
+ final ExecutionVertexID executionVertexId = schedulingRequirements.getExecutionVertexId();
+
+ LOG.debug("Allocate slot with id {} for execution {}", slotRequestId, executionVertexId);
+
+ final CompletableFuture<SlotProfile> slotProfileFuture = getSlotProfileFuture(
+ schedulingRequirements,
+ schedulingRequirements.getPhysicalSlotResourceProfile(),
+ Collections.emptySet(),
+ allPreviousAllocationIds);
+
+ return slotProfileFuture.thenCompose(
+ slotProfile -> slotProviderStrategy.allocateSlot(
+ slotRequestId,
+ new ScheduledUnit(
+ executionVertexId,
+ schedulingRequirements.getSlotSharingGroupId(),
+ schedulingRequirements.getCoLocationConstraint()),
+ slotProfile));
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/AbstractExecutionSlotAllocatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/AbstractExecutionSlotAllocatorTest.java
new file mode 100644
index 0000000..4077930
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/AbstractExecutionSlotAllocatorTest.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.hasSize;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link AbstractExecutionSlotAllocator}.
+ */
+public class AbstractExecutionSlotAllocatorTest extends TestLogger {
+
+ private AbstractExecutionSlotAllocator executionSlotAllocator;
+
+ @Before
+ public void setUp() throws Exception {
+ executionSlotAllocator = new TestingExecutionSlotAllocator();
+ }
+
+ @Test
+ public void testCancel() {
+ final ExecutionVertexID executionVertexId = new ExecutionVertexID(new JobVertexID(), 0);
+
+ final List<ExecutionVertexSchedulingRequirements> schedulingRequirements =
+ createSchedulingRequirements(executionVertexId);
+ final List<SlotExecutionVertexAssignment> assignments =
+ executionSlotAllocator.allocateSlotsFor(schedulingRequirements);
+
+ executionSlotAllocator.cancel(executionVertexId);
+
+ assertThat(assignments.get(0).getLogicalSlotFuture().isCancelled(), is(true));
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testValidateSchedulingRequirements() {
+ final ExecutionVertexID executionVertexId = new ExecutionVertexID(new JobVertexID(), 0);
+
+ final List<ExecutionVertexSchedulingRequirements> schedulingRequirements =
+ createSchedulingRequirements(executionVertexId);
+ executionSlotAllocator.allocateSlotsFor(schedulingRequirements);
+
+ executionSlotAllocator.validateSchedulingRequirements(schedulingRequirements);
+ }
+
+ @Test
+ public void testCreateAndRegisterSlotExecutionVertexAssignment() {
+ final ExecutionVertexID executionVertexId = new ExecutionVertexID(new JobVertexID(), 0);
+
+ final List<ExecutionVertexSchedulingRequirements> schedulingRequirements =
+ createSchedulingRequirements(executionVertexId);
+ final List<SlotExecutionVertexAssignment> assignments =
+ executionSlotAllocator.allocateSlotsFor(schedulingRequirements);
+
+ assertThat(assignments, hasSize(1));
+
+ final SlotExecutionVertexAssignment assignment = assignments.get(0);
+ assertThat(assignment.getExecutionVertexId(), is(executionVertexId));
+ assertThat(assignment.getLogicalSlotFuture().isDone(), is(false));
+ assertThat(executionSlotAllocator.getPendingSlotAssignments().values(), contains(assignment));
+
+ assignment.getLogicalSlotFuture().cancel(false);
+
+ assertThat(executionSlotAllocator.getPendingSlotAssignments().keySet(), hasSize(0));
+ }
+
+ @Test
+ public void testCompletedExecutionVertexAssignmentWillBeUnregistered() {
+ final ExecutionVertexID executionVertexId = new ExecutionVertexID(new JobVertexID(), 0);
+
+ final List<ExecutionVertexSchedulingRequirements> schedulingRequirements =
+ createSchedulingRequirements(executionVertexId);
+ final List<SlotExecutionVertexAssignment> assignments =
+ executionSlotAllocator.allocateSlotsFor(schedulingRequirements);
+
+ assignments.get(0).getLogicalSlotFuture().cancel(false);
+
+ assertThat(executionSlotAllocator.getPendingSlotAssignments().keySet(), hasSize(0));
+ }
+
+ @Test
+ public void testComputeAllPriorAllocationIds() {
+ final List<AllocationID> expectAllocationIds = Arrays.asList(new AllocationID(), new AllocationID());
+ final List<ExecutionVertexSchedulingRequirements> testSchedulingRequirements = Arrays.asList(
+ new ExecutionVertexSchedulingRequirements.Builder().
+ withExecutionVertexId(new ExecutionVertexID(new JobVertexID(), 0)).
+ withPreviousAllocationId(expectAllocationIds.get(0)).
+ build(),
+ new ExecutionVertexSchedulingRequirements.Builder().
+ withExecutionVertexId(new ExecutionVertexID(new JobVertexID(), 1)).
+ withPreviousAllocationId(expectAllocationIds.get(0)).
+ build(),
+ new ExecutionVertexSchedulingRequirements.Builder().
+ withExecutionVertexId(new ExecutionVertexID(new JobVertexID(), 2)).
+ withPreviousAllocationId(expectAllocationIds.get(1)).
+ build(),
+ new ExecutionVertexSchedulingRequirements.Builder().
+ withExecutionVertexId(new ExecutionVertexID(new JobVertexID(), 3)).
+ build()
+ );
+
+ final Set<AllocationID> allPriorAllocationIds =
+ AbstractExecutionSlotAllocator.computeAllPriorAllocationIds(testSchedulingRequirements);
+ assertThat(allPriorAllocationIds, containsInAnyOrder(expectAllocationIds.toArray()));
+ }
+
+ private List<ExecutionVertexSchedulingRequirements> createSchedulingRequirements(
+ final ExecutionVertexID... executionVertexIds) {
+
+ final List<ExecutionVertexSchedulingRequirements> schedulingRequirements = new ArrayList<>(executionVertexIds.length);
+
+ for (ExecutionVertexID executionVertexId : executionVertexIds) {
+ schedulingRequirements.add(new ExecutionVertexSchedulingRequirements.Builder()
+ .withExecutionVertexId(executionVertexId).build());
+ }
+ return schedulingRequirements;
+ }
+
+ private static class TestingExecutionSlotAllocator extends AbstractExecutionSlotAllocator {
+
+ TestingExecutionSlotAllocator() {
+ super(
+ new DefaultPreferredLocationsRetriever(
+ new TestingStateLocationRetriever(),
+ new TestingInputsLocationsRetriever.Builder().build()));
+ }
+
+ @Override
+ public List<SlotExecutionVertexAssignment> allocateSlotsFor(
+ final List<ExecutionVertexSchedulingRequirements> executionVertexSchedulingRequirements) {
+
+ final List<SlotExecutionVertexAssignment> slotExecutionVertexAssignments =
+ new ArrayList<>(executionVertexSchedulingRequirements.size());
+
+ for (ExecutionVertexSchedulingRequirements schedulingRequirements : executionVertexSchedulingRequirements) {
+ slotExecutionVertexAssignments.add(
+ createAndRegisterSlotExecutionVertexAssignment(
+ schedulingRequirements.getExecutionVertexId(),
+ new CompletableFuture<>(),
+ throwable -> {}));
+ }
+
+ return slotExecutionVertexAssignments;
+ }
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorTest.java
index ad74357..0646a32 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorTest.java
@@ -49,18 +49,12 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
-import java.util.stream.Collectors;
import static org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorTestUtils.createSchedulingRequirements;
import static org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorTestUtils.findSlotAssignmentByExecutionVertexId;
import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.hasSize;
-import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
@@ -111,7 +105,7 @@ public class DefaultExecutionSlotAllocatorTest extends TestLogger {
inputsLocationsRetriever.assignTaskManagerLocation(producerId);
assertTrue(consumerSlotAssignment.getLogicalSlotFuture().isDone());
- assertEquals(0, executionSlotAllocator.getNumberOfPendingSlotAssignments());
+ assertThat(executionSlotAllocator.getPendingSlotAssignments().keySet(), hasSize(0));
}
/**
@@ -160,109 +154,47 @@ public class DefaultExecutionSlotAllocatorTest extends TestLogger {
assertThat(expectedSlotProfile.getPreferredLocations(), contains(taskManagerLocation));
}
- /**
- * Tests that cancels an execution vertex which is not existed.
- */
@Test
- public void testCancelNonExistingExecutionVertex() {
- final DefaultExecutionSlotAllocator executionSlotAllocator = createExecutionSlotAllocator();
-
- ExecutionVertexID inValidExecutionVertexId = new ExecutionVertexID(new JobVertexID(), 0);
- executionSlotAllocator.cancel(inValidExecutionVertexId);
-
- assertThat(slotProvider.getCancelledSlotRequestIds(), is(empty()));
- }
-
- /**
- * Tests that cancels a slot request which has already been fulfilled.
- */
- @Test
- public void testCancelFulfilledSlotRequest() {
- final ExecutionVertexID producerId = new ExecutionVertexID(new JobVertexID(), 0);
+ public void testDuplicatedSlotAllocationIsNotAllowed() {
+ final ExecutionVertexID executionVertexId = new ExecutionVertexID(new JobVertexID(), 0);
final DefaultExecutionSlotAllocator executionSlotAllocator = createExecutionSlotAllocator();
+ slotProvider.disableSlotAllocation();
final List<ExecutionVertexSchedulingRequirements> schedulingRequirements =
- createSchedulingRequirements(producerId);
+ createSchedulingRequirements(executionVertexId);
executionSlotAllocator.allocateSlotsFor(schedulingRequirements);
- executionSlotAllocator.cancel(producerId);
-
- assertThat(slotProvider.getCancelledSlotRequestIds(), is(empty()));
+ try {
+ executionSlotAllocator.allocateSlotsFor(schedulingRequirements);
+ fail("exception should happen");
+ } catch (IllegalStateException e) {
+ // IllegalStateException is expected
+ }
}
- /**
- * Tests that cancels a slot request which has not been fulfilled.
- */
@Test
- public void testCancelUnFulfilledSlotRequest() throws Exception {
- final ExecutionVertexID producerId = new ExecutionVertexID(new JobVertexID(), 0);
-
+ public void testSlotAssignmentIsProperlyRegistered() {
final DefaultExecutionSlotAllocator executionSlotAllocator = createExecutionSlotAllocator();
- slotProvider.disableSlotAllocation();
+ final ExecutionVertexID executionVertexID = new ExecutionVertexID(new JobVertexID(), 0);
final List<ExecutionVertexSchedulingRequirements> schedulingRequirements =
- createSchedulingRequirements(producerId);
- Collection<SlotExecutionVertexAssignment> assignments = executionSlotAllocator.allocateSlotsFor(schedulingRequirements);
-
- executionSlotAllocator.cancel(producerId);
-
- assertThat(slotProvider.getCancelledSlotRequestIds(), hasSize(1));
- assertThat(slotProvider.getCancelledSlotRequestIds(), contains(slotProvider.getReceivedSlotRequestIds().toArray()));
-
- try {
- assignments.iterator().next().getLogicalSlotFuture().get();
- fail("Expect a CancellationException but got nothing.");
- } catch (CancellationException ignored) {
- // Expected exception
- }
- }
+ createSchedulingRequirements(executionVertexID);
- /**
- * Tests that all prior allocation ids are computed by union all previous allocation ids in scheduling requirements.
- */
- @Test
- public void testComputeAllPriorAllocationIds() {
- List<AllocationID> expectAllocationIds = Arrays.asList(new AllocationID(), new AllocationID());
- List<ExecutionVertexSchedulingRequirements> testSchedulingRequirements = Arrays.asList(
- new ExecutionVertexSchedulingRequirements.Builder().
- withExecutionVertexId(new ExecutionVertexID(new JobVertexID(), 0)).
- withPreviousAllocationId(expectAllocationIds.get(0)).
- build(),
- new ExecutionVertexSchedulingRequirements.Builder().
- withExecutionVertexId(new ExecutionVertexID(new JobVertexID(), 1)).
- withPreviousAllocationId(expectAllocationIds.get(0)).
- build(),
- new ExecutionVertexSchedulingRequirements.Builder().
- withExecutionVertexId(new ExecutionVertexID(new JobVertexID(), 2)).
- withPreviousAllocationId(expectAllocationIds.get(1)).
- build(),
- new ExecutionVertexSchedulingRequirements.Builder().
- withExecutionVertexId(new ExecutionVertexID(new JobVertexID(), 3)).
- build()
- );
+ slotProvider.disableSlotAllocation();
+ final Collection<SlotExecutionVertexAssignment> slotExecutionVertexAssignments =
+ executionSlotAllocator.allocateSlotsFor(schedulingRequirements);
- Set<AllocationID> allPriorAllocationIds = DefaultExecutionSlotAllocator.computeAllPriorAllocationIds(testSchedulingRequirements);
- assertThat(allPriorAllocationIds, containsInAnyOrder(expectAllocationIds.toArray()));
- }
+ final SlotExecutionVertexAssignment slotAssignment = slotExecutionVertexAssignments.iterator().next();
- @Test
- public void testDuplicatedSlotAllocationIsNotAllowed() {
- final ExecutionVertexID executionVertexId = new ExecutionVertexID(new JobVertexID(), 0);
+ assertThat(executionSlotAllocator.getPendingSlotAssignments().values(), contains(slotAssignment));
- final DefaultExecutionSlotAllocator executionSlotAllocator = createExecutionSlotAllocator();
- slotProvider.disableSlotAllocation();
+ executionSlotAllocator.cancel(executionVertexID);
- final List<ExecutionVertexSchedulingRequirements> schedulingRequirements =
- createSchedulingRequirements(executionVertexId);
- executionSlotAllocator.allocateSlotsFor(schedulingRequirements);
+ assertThat(executionSlotAllocator.getPendingSlotAssignments().keySet(), hasSize(0));
- try {
- executionSlotAllocator.allocateSlotsFor(schedulingRequirements);
- fail("exception should happen");
- } catch (IllegalStateException e) {
- // IllegalStateException is expected
- }
+ final SlotRequestId slotRequestId = slotProvider.slotAllocationRequests.get(0).f0;
+ assertThat(slotProvider.getCancelledSlotRequestIds(), contains(slotRequestId));
}
private DefaultExecutionSlotAllocator createExecutionSlotAllocator() {
@@ -317,18 +249,12 @@ public class DefaultExecutionSlotAllocatorTest extends TestLogger {
return Collections.unmodifiableList(slotAllocationRequests);
}
- public List<SlotRequestId> getReceivedSlotRequestIds() {
- return slotAllocationRequests.stream()
- .map(requestTuple -> requestTuple.f0)
- .collect(Collectors.toList());
- }
-
- public List<SlotRequestId> getCancelledSlotRequestIds() {
- return Collections.unmodifiableList(cancelledSlotRequestIds);
- }
-
public void disableSlotAllocation() {
slotAllocationDisabled = true;
}
+
+ List<SlotRequestId> getCancelledSlotRequestIds() {
+ return cancelledSlotRequestIds;
+ }
}
}