You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/08/02 07:59:22 UTC
[flink] branch master updated: [FLINK-13421] Exclude releasing root
slots from slot allocation
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 3841129 [FLINK-13421] Exclude releasing root slots from slot allocation
3841129 is described below
commit 38411298a2fa71194cd7d81405ee8f6b069f377f
Author: zhuzhu.zz <zh...@alibaba-inc.com>
AuthorDate: Wed Jul 31 15:01:33 2019 +0800
[FLINK-13421] Exclude releasing root slots from slot allocation
Make MultiTaskSlot not available for allocation when it’s releasing children
to avoid ConcurrentModificationException.
This closes #9288.
---
.../jobmaster/slotpool/SlotSharingManager.java | 22 +++++----
.../executiongraph/ExecutionGraphRestartTest.java | 40 ++++++++++++++++
.../jobmaster/slotpool/SlotSharingManagerTest.java | 56 ++++++++++++++++++++++
3 files changed, 110 insertions(+), 8 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
index b2aeed3..f7dd18b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
@@ -51,6 +51,7 @@ import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.function.Function;
+import java.util.function.Predicate;
import java.util.stream.Collectors;
/**
@@ -185,7 +186,7 @@ public class SlotSharingManager {
.values()
.stream()
.flatMap((Map<AllocationID, MultiTaskSlot> map) -> map.values().stream())
- .filter((MultiTaskSlot multiTaskSlot) -> !multiTaskSlot.contains(groupId))
+ .filter(validMultiTaskSlotAndDoesNotContain(groupId))
.map((MultiTaskSlot multiTaskSlot) -> {
SlotInfo slotInfo = multiTaskSlot.getSlotContextFuture().join();
return new SlotSelectionStrategy.SlotInfoAndResources(
@@ -194,6 +195,10 @@ public class SlotSharingManager {
}).collect(Collectors.toList());
}
+ private Predicate<MultiTaskSlot> validMultiTaskSlotAndDoesNotContain(@Nullable AbstractID groupId) {
+ return (MultiTaskSlot multiTaskSlot) -> !multiTaskSlot.contains(groupId) && !multiTaskSlot.isReleasing();
+ }
+
@Nullable
public MultiTaskSlot getResolvedRootSlot(@Nonnull SlotInfo slotInfo) {
Map<AllocationID, MultiTaskSlot> forLocationEntry = resolvedRootSlots.get(slotInfo.getTaskManagerLocation());
@@ -209,13 +214,10 @@ public class SlotSharingManager {
*/
@Nullable
MultiTaskSlot getUnresolvedRootSlot(AbstractID groupId) {
- for (MultiTaskSlot multiTaskSlot : unresolvedRootSlots.values()) {
- if (!multiTaskSlot.contains(groupId)) {
- return multiTaskSlot;
- }
- }
-
- return null;
+ return unresolvedRootSlots.values().stream()
+ .filter(validMultiTaskSlotAndDoesNotContain(groupId))
+ .findFirst()
+ .orElse(null);
}
@Override
@@ -627,6 +629,10 @@ public class SlotSharingManager {
}
}
+ boolean isReleasing() {
+ return releasingChildren;
+ }
+
@Override
public String toString() {
String physicalSlotDescription;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index bf312a7..a885398 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.executiongraph;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.akka.AkkaUtils;
@@ -553,6 +554,45 @@ public class ExecutionGraphRestartTest extends TestLogger {
}
}
+ /**
+ * SlotPool#failAllocation should not fail with a {@link java.util.ConcurrentModificationException}
+ * if there is a concurrent scheduling operation. See FLINK-13421.
+ */
+ @Test
+ public void slotPoolExecutionGraph_ConcurrentSchedulingAndAllocationFailure_ShouldNotFailWithConcurrentModificationException() throws Exception {
+ final SlotSharingGroup group = new SlotSharingGroup();
+ final JobVertex vertex1 = createNoOpVertex("vertex1", 1);
+ vertex1.setSlotSharingGroup(group);
+ final JobVertex vertex2 = createNoOpVertex("vertex2", 3);
+ vertex2.setSlotSharingGroup(group);
+ final JobVertex vertex3 = createNoOpVertex("vertex3", 1);
+ vertex3.setSlotSharingGroup(group);
+ vertex3.connectNewDataSetAsInput(vertex2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+
+ try (SlotPool slotPool = createSlotPoolImpl()) {
+ final SlotProvider slots = createSchedulerWithSlots(2, slotPool, new LocalTaskManagerLocation());
+
+ final AllocationID allocationId = slotPool.getAvailableSlotsInformation().iterator().next().getAllocationId();
+
+ final ExecutionGraph eg = new ExecutionGraphTestUtils.TestingExecutionGraphBuilder(TEST_JOB_ID, vertex1, vertex2, vertex3)
+ .setSlotProvider(slots)
+ .setAllocationTimeout(Time.minutes(60))
+ .setScheduleMode(ScheduleMode.EAGER)
+ .setAllowQueuedScheduling(true)
+ .build();
+
+ eg.start(mainThreadExecutor);
+
+ eg.scheduleForExecution();
+
+ slotPool.failAllocation(
+ allocationId,
+ new Exception("test exception"));
+
+ eg.waitUntilTerminal();
+ }
+ }
+
@Test
public void testRestartWithEagerSchedulingAndSlotSharing() throws Exception {
// this test is inconclusive if not used with a proper multi-threaded executor
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java
index ecf2e4e..04ef8b2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java
@@ -51,6 +51,7 @@ import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.nullValue;
@@ -524,6 +525,61 @@ public class SlotSharingManagerTest extends TestLogger {
assertEquals(rootSlot1.getSlotRequestId(), resolvedRootSlot.getSlotRequestId());
}
+ /**
+ * Tests that we cannot retrieve a slot when it's releasing children.
+ */
+ @Test
+ public void testResolvedSlotInReleasingIsNotAvailable() throws Exception {
+ final TestingAllocatedSlotActions allocatedSlotActions = new TestingAllocatedSlotActions();
+
+ final SlotSharingManager slotSharingManager = new SlotSharingManager(
+ SLOT_SHARING_GROUP_ID,
+ allocatedSlotActions,
+ SLOT_OWNER);
+
+ final SlotSharingManager.MultiTaskSlot rootSlot = slotSharingManager.createRootSlot(
+ new SlotRequestId(),
+ CompletableFuture.completedFuture(
+ new SimpleSlotContext(
+ new AllocationID(),
+ new LocalTaskManagerLocation(),
+ 0,
+ new SimpleAckingTaskManagerGateway())),
+ new SlotRequestId());
+
+ final AbstractID groupId1 = new AbstractID();
+ final SlotSharingManager.SingleTaskSlot singleTaskSlot = rootSlot.allocateSingleTaskSlot(
+ new SlotRequestId(),
+ ResourceProfile.UNKNOWN,
+ groupId1,
+ Locality.UNCONSTRAINED);
+
+ final AtomicBoolean verified = new AtomicBoolean(false);
+
+ final AbstractID groupId2 = new AbstractID();
+ // register a verification in MultiTaskSlot's children releasing loop
+ singleTaskSlot.getLogicalSlotFuture().get().tryAssignPayload(new LogicalSlot.Payload() {
+ @Override
+ public void fail(Throwable cause) {
+ assertEquals(0, slotSharingManager.listResolvedRootSlotInfo(groupId2).size());
+
+ verified.set(true);
+ }
+
+ @Override
+ public CompletableFuture<?> getTerminalStateFuture() {
+ return null;
+ }
+ });
+
+ assertEquals(1, slotSharingManager.listResolvedRootSlotInfo(groupId2).size());
+
+ rootSlot.release(new Exception("test exception"));
+
+ // ensure the verification in Payload#fail is passed
+ assertTrue(verified.get());
+ }
+
@Test
public void testGetUnresolvedSlot() {
final TestingAllocatedSlotActions allocatedSlotActions = new TestingAllocatedSlotActions();