You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/08/02 07:59:22 UTC

[flink] branch master updated: [FLINK-13421] Exclude releasing root slots from slot allocation

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 3841129  [FLINK-13421] Exclude releasing root slots from slot allocation
3841129 is described below

commit 38411298a2fa71194cd7d81405ee8f6b069f377f
Author: zhuzhu.zz <zh...@alibaba-inc.com>
AuthorDate: Wed Jul 31 15:01:33 2019 +0800

    [FLINK-13421] Exclude releasing root slots from slot allocation
    
    Make MultiTaskSlot not available for allocation when it’s releasing children
    to avoid ConcurrentModificationException.
    
    This closes #9288.
---
 .../jobmaster/slotpool/SlotSharingManager.java     | 22 +++++----
 .../executiongraph/ExecutionGraphRestartTest.java  | 40 ++++++++++++++++
 .../jobmaster/slotpool/SlotSharingManagerTest.java | 56 ++++++++++++++++++++++
 3 files changed, 110 insertions(+), 8 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
index b2aeed3..f7dd18b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
@@ -51,6 +51,7 @@ import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.function.Function;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
 /**
@@ -185,7 +186,7 @@ public class SlotSharingManager {
 			.values()
 			.stream()
 				.flatMap((Map<AllocationID, MultiTaskSlot> map) -> map.values().stream())
-				.filter((MultiTaskSlot multiTaskSlot) -> !multiTaskSlot.contains(groupId))
+				.filter(validMultiTaskSlotAndDoesNotContain(groupId))
 				.map((MultiTaskSlot multiTaskSlot) -> {
 					SlotInfo slotInfo = multiTaskSlot.getSlotContextFuture().join();
 					return new SlotSelectionStrategy.SlotInfoAndResources(
@@ -194,6 +195,10 @@ public class SlotSharingManager {
 				}).collect(Collectors.toList());
 	}
 
+	private Predicate<MultiTaskSlot> validMultiTaskSlotAndDoesNotContain(@Nullable AbstractID groupId) {
+		return (MultiTaskSlot multiTaskSlot) -> !multiTaskSlot.contains(groupId) && !multiTaskSlot.isReleasing();
+	}
+
 	@Nullable
 	public MultiTaskSlot getResolvedRootSlot(@Nonnull SlotInfo slotInfo) {
 		Map<AllocationID, MultiTaskSlot> forLocationEntry = resolvedRootSlots.get(slotInfo.getTaskManagerLocation());
@@ -209,13 +214,10 @@ public class SlotSharingManager {
 	 */
 	@Nullable
 	MultiTaskSlot getUnresolvedRootSlot(AbstractID groupId) {
-		for (MultiTaskSlot multiTaskSlot : unresolvedRootSlots.values()) {
-			if (!multiTaskSlot.contains(groupId)) {
-				return multiTaskSlot;
-			}
-		}
-
-		return null;
+		return unresolvedRootSlots.values().stream()
+			.filter(validMultiTaskSlotAndDoesNotContain(groupId))
+			.findFirst()
+			.orElse(null);
 	}
 
 	@Override
@@ -627,6 +629,10 @@ public class SlotSharingManager {
 			}
 		}
 
+		boolean isReleasing() {
+			return releasingChildren;
+		}
+
 		@Override
 		public String toString() {
 			String physicalSlotDescription;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index bf312a7..a885398 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.executiongraph;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -553,6 +554,45 @@ public class ExecutionGraphRestartTest extends TestLogger {
 		}
 	}
 
+	/**
+	 * SlotPool#failAllocation should not fail with a {@link java.util.ConcurrentModificationException}
+	 * if there is a concurrent scheduling operation. See FLINK-13421.
+	 */
+	@Test
+	public void slotPoolExecutionGraph_ConcurrentSchedulingAndAllocationFailure_ShouldNotFailWithConcurrentModificationException() throws Exception {
+		final SlotSharingGroup group = new SlotSharingGroup();
+		final JobVertex vertex1 = createNoOpVertex("vertex1", 1);
+		vertex1.setSlotSharingGroup(group);
+		final JobVertex vertex2 = createNoOpVertex("vertex2", 3);
+		vertex2.setSlotSharingGroup(group);
+		final JobVertex vertex3 = createNoOpVertex("vertex3", 1);
+		vertex3.setSlotSharingGroup(group);
+		vertex3.connectNewDataSetAsInput(vertex2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+
+		try (SlotPool slotPool = createSlotPoolImpl()) {
+			final SlotProvider slots = createSchedulerWithSlots(2, slotPool, new LocalTaskManagerLocation());
+
+			final AllocationID allocationId = slotPool.getAvailableSlotsInformation().iterator().next().getAllocationId();
+
+			final ExecutionGraph eg = new ExecutionGraphTestUtils.TestingExecutionGraphBuilder(TEST_JOB_ID, vertex1, vertex2, vertex3)
+				.setSlotProvider(slots)
+				.setAllocationTimeout(Time.minutes(60))
+				.setScheduleMode(ScheduleMode.EAGER)
+				.setAllowQueuedScheduling(true)
+				.build();
+
+			eg.start(mainThreadExecutor);
+
+			eg.scheduleForExecution();
+
+			slotPool.failAllocation(
+				allocationId,
+				new Exception("test exception"));
+
+			eg.waitUntilTerminal();
+		}
+	}
+
 	@Test
 	public void testRestartWithEagerSchedulingAndSlotSharing() throws Exception {
 		// this test is inconclusive if not used with a proper multi-threaded executor
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java
index ecf2e4e..04ef8b2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java
@@ -51,6 +51,7 @@ import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.hamcrest.CoreMatchers.notNullValue;
 import static org.hamcrest.CoreMatchers.nullValue;
@@ -524,6 +525,61 @@ public class SlotSharingManagerTest extends TestLogger {
 		assertEquals(rootSlot1.getSlotRequestId(), resolvedRootSlot.getSlotRequestId());
 	}
 
+	/**
+	 * Tests that we cannot retrieve a slot when it's releasing children.
+	 */
+	@Test
+	public void testResolvedSlotInReleasingIsNotAvailable() throws Exception {
+		final TestingAllocatedSlotActions allocatedSlotActions = new TestingAllocatedSlotActions();
+
+		final SlotSharingManager slotSharingManager = new SlotSharingManager(
+			SLOT_SHARING_GROUP_ID,
+			allocatedSlotActions,
+			SLOT_OWNER);
+
+		final SlotSharingManager.MultiTaskSlot rootSlot = slotSharingManager.createRootSlot(
+			new SlotRequestId(),
+			CompletableFuture.completedFuture(
+				new SimpleSlotContext(
+					new AllocationID(),
+					new LocalTaskManagerLocation(),
+					0,
+					new SimpleAckingTaskManagerGateway())),
+			new SlotRequestId());
+
+		final AbstractID groupId1 = new AbstractID();
+		final SlotSharingManager.SingleTaskSlot singleTaskSlot = rootSlot.allocateSingleTaskSlot(
+			new SlotRequestId(),
+			ResourceProfile.UNKNOWN,
+			groupId1,
+			Locality.UNCONSTRAINED);
+
+		final AtomicBoolean verified = new AtomicBoolean(false);
+
+		final AbstractID groupId2 = new AbstractID();
+		// register a verification in MultiTaskSlot's children releasing loop
+		singleTaskSlot.getLogicalSlotFuture().get().tryAssignPayload(new LogicalSlot.Payload() {
+			@Override
+			public void fail(Throwable cause) {
+				assertEquals(0, slotSharingManager.listResolvedRootSlotInfo(groupId2).size());
+
+				verified.set(true);
+			}
+
+			@Override
+			public CompletableFuture<?> getTerminalStateFuture() {
+				return null;
+			}
+		});
+
+		assertEquals(1, slotSharingManager.listResolvedRootSlotInfo(groupId2).size());
+
+		rootSlot.release(new Exception("test exception"));
+
+		// ensure the verification in Payload#fail is passed
+		assertTrue(verified.get());
+	}
+
 	@Test
 	public void testGetUnresolvedSlot() {
 		final TestingAllocatedSlotActions allocatedSlotActions = new TestingAllocatedSlotActions();