You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/12/02 14:18:07 UTC

flink git commit: [FLINK-5228] [network] Fix LocalInputChannel re-trigger request and release deadlock

Repository: flink
Updated Branches:
  refs/heads/release-1.1 0dc82baa0 -> 388acbca9


[FLINK-5228] [network] Fix LocalInputChannel re-trigger request and release deadlock


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/388acbca
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/388acbca
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/388acbca

Branch: refs/heads/release-1.1
Commit: 388acbca98200f13b0c016f9b79d900e355dc5ef
Parents: 0dc82ba
Author: Ufuk Celebi <uc...@apache.org>
Authored: Fri Dec 2 12:52:58 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Fri Dec 2 15:17:55 2016 +0100

----------------------------------------------------------------------
 .../partition/consumer/LocalInputChannel.java   | 38 ++++----
 .../consumer/LocalInputChannelTest.java         | 92 +++++++++++++++++++-
 2 files changed, 114 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/388acbca/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
index 0a02ea1..51748b7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
@@ -49,7 +49,7 @@ public class LocalInputChannel extends InputChannel implements BufferAvailabilit
 
 	// ------------------------------------------------------------------------
 
-	private final Object requestReleaseLock = new Object();
+	private final Object requestLock = new Object();
 
 	/** The local partition manager. */
 	private final ResultPartitionManager partitionManager;
@@ -99,9 +99,12 @@ public class LocalInputChannel extends InputChannel implements BufferAvailabilit
 
 	@Override
 	void requestSubpartition(int subpartitionIndex) throws IOException, InterruptedException {
+
+		boolean retriggerRequest = false;
+
 		// The lock is required to request only once in the presence of retriggered requests.
-		synchronized (requestReleaseLock) {
-			checkState(!isReleased, "released");
+		synchronized (requestLock) {
+			checkState(!isReleased, "LocalInputChannel has been released already");
 
 			if (subpartitionView == null) {
 				LOG.debug("{}: Requesting LOCAL subpartition {} of partition {}.",
@@ -125,20 +128,27 @@ public class LocalInputChannel extends InputChannel implements BufferAvailabilit
 					}
 				} catch (PartitionNotFoundException notFound) {
 					if (increaseBackoff()) {
-						inputGate.retriggerPartitionRequest(partitionId.getPartitionId());
+						retriggerRequest = true;
 					} else {
 						throw notFound;
 					}
 				}
 			}
 		}
+
+		// Do this outside of the lock scope as this might lead to a
+		// deadlock with a concurrent release of the channel via the
+		// input gate.
+		if (retriggerRequest) {
+			inputGate.retriggerPartitionRequest(partitionId.getPartitionId());
+		}
 	}
 
 	/**
 	 * Retriggers a subpartition request.
 	 */
 	void retriggerSubpartitionRequest(Timer timer, final int subpartitionIndex) {
-		synchronized (requestReleaseLock) {
+		synchronized (requestLock) {
 			checkState(subpartitionView == null, "already requested partition");
 
 			timer.schedule(new TimerTask() {
@@ -193,7 +203,7 @@ public class LocalInputChannel extends InputChannel implements BufferAvailabilit
 		// synchronizing on the request lock means this blocks until the asynchronous request
 		// for the partition view has been completed
 		// by then the subpartition view is visible or the channel is released
-		synchronized (requestReleaseLock) {
+		synchronized (requestLock) {
 			checkState(!isReleased, "released");
 			checkState(subpartitionView != null, "Queried for a buffer before requesting the subpartition.");
 			return subpartitionView;
@@ -231,19 +241,17 @@ public class LocalInputChannel extends InputChannel implements BufferAvailabilit
 	}
 
 	/**
-	 * Releases the look ahead {@link Buffer} instance and discards the queue
-	 * iterator.
+	 * Releases the partition reader
 	 */
 	@Override
 	void releaseAllResources() throws IOException {
-		synchronized (requestReleaseLock) {
-			if (!isReleased) {
-				isReleased = true;
+		if (!isReleased) {
+			isReleased = true;
 
-				if (subpartitionView != null) {
-					subpartitionView.releaseAllResources();
-					subpartitionView = null;
-				}
+			ResultSubpartitionView view = subpartitionView;
+			if (view != null) {
+				view.releaseAllResources();
+				subpartitionView = null;
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/388acbca/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 9b36ea9..974ac9f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -29,8 +29,8 @@ import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
-import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
 import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -254,6 +254,96 @@ public class LocalInputChannelTest {
 		ch.getNextBuffer();
 	}
 
+	/**
+	 * Verifies that concurrent release via the SingleInputGate and re-triggering
+	 * of a partition request works smoothly.
+	 *
+	 * - SingleInputGate acquires its request lock and tries to release all
+	 * registered channels. When releasing a channel, it needs to acquire
+	 * the channel's shared request-release lock.
+	 * - If a LocalInputChannel concurrently retriggers a partition request via
+	 * a Timer Thread it acquires the channel's request-release lock and calls
+	 * the retrigger callback on the SingleInputGate, which again tries to
+	 * acquire the gate's request lock.
+	 *
+	 * For certain timings this obviously leads to a deadlock. This test reliably
+	 * reproduced such a timing (reported in FLINK-5228). This test is pretty much
+	 * testing the buggy implementation and has not much more general value. If it
+	 * becomes obsolete at some point (future greatness ;)), feel free to remove it.
+	 *
+	 * The fix in the end was to to not acquire the channels lock when releasing it
+	 * and/or not doing any input gate callbacks while holding the channel's lock.
+	 * I decided to do both.
+	 */
+	@Test
+	public void testConcurrentReleaseAndRetriggerPartitionRequest() throws Exception {
+		final SingleInputGate gate = new SingleInputGate(
+			"test task name",
+			new JobID(),
+			new ExecutionAttemptID(),
+			new IntermediateDataSetID(),
+			0,
+			1,
+			mock(PartitionStateChecker.class),
+			new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()
+		);
+
+		ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
+		when(partitionManager
+			.createSubpartitionView(
+				any(ResultPartitionID.class),
+				anyInt(),
+				any(BufferProvider.class),
+				any(BufferAvailabilityListener.class)))
+			.thenAnswer(new Answer<ResultSubpartitionView>() {
+				@Override
+				public ResultSubpartitionView answer(InvocationOnMock invocationOnMock) throws Throwable {
+					// Sleep here a little to give the releaser Thread
+					// time to acquire the input gate lock. We throw
+					// the Exception to retrigger the request.
+					Thread.sleep(100);
+					throw new PartitionNotFoundException(new ResultPartitionID());
+				}
+			});
+
+		final LocalInputChannel channel = new LocalInputChannel(
+			gate,
+			0,
+			new ResultPartitionID(),
+			partitionManager,
+			new TaskEventDispatcher(),
+			new Tuple2<>(1, 1),
+			new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+
+		gate.setInputChannel(new IntermediateResultPartitionID(), channel);
+
+		Thread releaser = new Thread() {
+			@Override
+			public void run() {
+				try {
+					gate.releaseAllResources();
+				} catch (IOException ignored) {
+				}
+			}
+		};
+
+		Thread requester = new Thread() {
+			@Override
+			public void run() {
+				try {
+					channel.requestSubpartition(0);
+				} catch (IOException | InterruptedException ignored) {
+				}
+			}
+		};
+
+		requester.start();
+		releaser.start();
+
+		releaser.join();
+		requester.join();
+	}
+
 	// ---------------------------------------------------------------------------------------------
 
 	private LocalInputChannel createLocalInputChannel(