You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by az...@apache.org on 2020/09/30 13:27:19 UTC

[flink] branch master updated: [FLINK-19388][coordination] Do not remove logical slots from SharedSlot if it is releasing

This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d50daa  [FLINK-19388][coordination] Do not remove logical slots from SharedSlot if it is releasing
8d50daa is described below

commit 8d50daa7e13e00cffbd09efac73e6bea0e48ee5c
Author: Andrey Zagrebin <az...@apache.org>
AuthorDate: Tue Sep 29 14:09:29 2020 +0200

    [FLINK-19388][coordination] Do not remove logical slots from SharedSlot if it is releasing
    
    `SharedSlot#release` releases all logical slots in a loop over their collection.
    The logical slot releases lead to their execution failures.
    This can cause cancellation of other executions sharing same slot.
    The execution failure can cause cancelation of other sharing executions by `Scheduler`.
    The canceled executions subsequently call `SharedSlot#returnLogicalSlot`
    which modifies the logical slot collection while it is being iterated in `SharedSlot#release`,
    if the canceled executions share the same slot. This leads to `ConcurrentModificationException`.
    
    To avoid the `ConcurrentModificationException`, the logical slot collection can be copied before iterating it.
    
    This closes #13511.
---
 .../apache/flink/runtime/scheduler/SharedSlot.java | 21 +++++++++++++----
 .../flink/runtime/scheduler/SharedSlotTest.java    | 27 +++++++++++++++++++++-
 2 files changed, 43 insertions(+), 5 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SharedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SharedSlot.java
index 1d55bbd..1525060 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SharedSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SharedSlot.java
@@ -35,9 +35,11 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Consumer;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 
 /**
  * Shared slot implementation for the {@link SlotSharingExecutionSlotAllocator}.
@@ -203,6 +205,7 @@ class SharedSlot implements SlotOwner, PhysicalSlot.Payload {
 	}
 
 	private void removeLogicalSlotRequest(SlotRequestId logicalSlotRequestId) {
+		LOG.debug("Remove {}", getLogicalSlotString(logicalSlotRequestId));
 		Preconditions.checkState(
 			requestedLogicalSlots.removeKeyB(logicalSlotRequestId) != null,
 			"Trying to remove a logical slot request which has been either already removed or never created.");
@@ -215,10 +218,19 @@ class SharedSlot implements SlotOwner, PhysicalSlot.Payload {
 			slotContextFuture.isDone(),
 			"Releasing of the shared slot is expected only from its successfully allocated physical slot ({})",
 			physicalSlotRequestId);
-		for (ExecutionVertexID executionVertexId : requestedLogicalSlots.keySetA()) {
-			LOG.debug("Release {}", getLogicalSlotString(executionVertexId));
+		LOG.debug("Release shared slot ({})", physicalSlotRequestId);
+
+		// copy the logical slot collection to avoid ConcurrentModificationException
+		// if logical slot releases cause cancellation of other executions
+		// which will try to call returnLogicalSlot and modify requestedLogicalSlots collection
+		Map<ExecutionVertexID, CompletableFuture<SingleLogicalSlot>> logicalSlotFutures = requestedLogicalSlots
+			.keySetA()
+			.stream()
+			.collect(Collectors.toMap(executionVertexId -> executionVertexId, requestedLogicalSlots::getValueByKeyA));
+		for (Map.Entry<ExecutionVertexID, CompletableFuture<SingleLogicalSlot>> entry : logicalSlotFutures.entrySet()) {
+			LOG.debug("Release {}", getLogicalSlotString(entry.getKey()));
 			CompletableFuture<SingleLogicalSlot> logicalSlotFuture =
-				requestedLogicalSlots.getValueByKeyA(executionVertexId);
+				entry.getValue();
 			Preconditions.checkNotNull(logicalSlotFuture);
 			Preconditions.checkState(
 				logicalSlotFuture.isDone(),
@@ -231,8 +243,9 @@ class SharedSlot implements SlotOwner, PhysicalSlot.Payload {
 	}
 
 	private void releaseExternally() {
-		if (state == State.ALLOCATED && requestedLogicalSlots.values().isEmpty()) {
+		if (state != State.RELEASED && requestedLogicalSlots.values().isEmpty()) {
 			state = State.RELEASED;
+			LOG.debug("Release shared slot externally ({})", physicalSlotRequestId);
 			externalReleaseCallback.accept(executionSlotSharingGroup);
 		}
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SharedSlotTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SharedSlotTest.java
index da5091c..afb5b3e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SharedSlotTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SharedSlotTest.java
@@ -50,7 +50,8 @@ import static org.junit.Assert.fail;
  */
 public class SharedSlotTest extends TestLogger {
 	private static final ExecutionVertexID EV1 = createRandomExecutionVertexId();
-	private static final ExecutionSlotSharingGroup SG = createExecutionSlotSharingGroup(EV1);
+	private static final ExecutionVertexID EV2 = createRandomExecutionVertexId();
+	private static final ExecutionSlotSharingGroup SG = createExecutionSlotSharingGroup(EV1, EV2);
 	private static final SlotRequestId PHYSICAL_SLOT_REQUEST_ID = new SlotRequestId();
 	private static final ResourceProfile RP = ResourceProfile.newBuilder().setCpuCores(2.0).build();
 
@@ -285,6 +286,30 @@ public class SharedSlotTest extends TestLogger {
 		assertThat(released.get(), is(1));
 	}
 
+	@Test
+	public void testReturnLogicalSlotWhileReleasingDoesNotCauseConcurrentModificationException() {
+		CompletableFuture<PhysicalSlot> slotContextFuture = CompletableFuture
+			.completedFuture(new TestingPhysicalSlot(RP, new AllocationID()));
+		SharedSlot sharedSlot = SharedSlotBuilder
+			.newBuilder()
+			.withSlotContextFuture(slotContextFuture)
+			.build();
+		LogicalSlot logicalSlot1 = sharedSlot.allocateLogicalSlot(EV1).join();
+		LogicalSlot logicalSlot2 = sharedSlot.allocateLogicalSlot(EV2).join();
+		logicalSlot1.tryAssignPayload(new LogicalSlot.Payload() {
+			@Override
+			public void fail(Throwable cause) {
+				sharedSlot.returnLogicalSlot(logicalSlot2);
+			}
+
+			@Override
+			public CompletableFuture<?> getTerminalStateFuture() {
+				return CompletableFuture.completedFuture(null);
+			}
+		});
+		sharedSlot.release(new Throwable());
+	}
+
 	private static class SharedSlotBuilder {
 		private CompletableFuture<PhysicalSlot> slotContextFuture = new CompletableFuture<>();
 		private boolean slotWillBeOccupiedIndefinitely = false;