You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/12/02 14:18:07 UTC
flink git commit: [FLINK-5228] [network] Fix LocalInputChannel
re-trigger request and release deadlock
Repository: flink
Updated Branches:
refs/heads/release-1.1 0dc82baa0 -> 388acbca9
[FLINK-5228] [network] Fix LocalInputChannel re-trigger request and release deadlock
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/388acbca
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/388acbca
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/388acbca
Branch: refs/heads/release-1.1
Commit: 388acbca98200f13b0c016f9b79d900e355dc5ef
Parents: 0dc82ba
Author: Ufuk Celebi <uc...@apache.org>
Authored: Fri Dec 2 12:52:58 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Fri Dec 2 15:17:55 2016 +0100
----------------------------------------------------------------------
.../partition/consumer/LocalInputChannel.java | 38 ++++----
.../consumer/LocalInputChannelTest.java | 92 +++++++++++++++++++-
2 files changed, 114 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/388acbca/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
index 0a02ea1..51748b7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
@@ -49,7 +49,7 @@ public class LocalInputChannel extends InputChannel implements BufferAvailabilit
// ------------------------------------------------------------------------
- private final Object requestReleaseLock = new Object();
+ private final Object requestLock = new Object();
/** The local partition manager. */
private final ResultPartitionManager partitionManager;
@@ -99,9 +99,12 @@ public class LocalInputChannel extends InputChannel implements BufferAvailabilit
@Override
void requestSubpartition(int subpartitionIndex) throws IOException, InterruptedException {
+
+ boolean retriggerRequest = false;
+
// The lock is required to request only once in the presence of retriggered requests.
- synchronized (requestReleaseLock) {
- checkState(!isReleased, "released");
+ synchronized (requestLock) {
+ checkState(!isReleased, "LocalInputChannel has been released already");
if (subpartitionView == null) {
LOG.debug("{}: Requesting LOCAL subpartition {} of partition {}.",
@@ -125,20 +128,27 @@ public class LocalInputChannel extends InputChannel implements BufferAvailabilit
}
} catch (PartitionNotFoundException notFound) {
if (increaseBackoff()) {
- inputGate.retriggerPartitionRequest(partitionId.getPartitionId());
+ retriggerRequest = true;
} else {
throw notFound;
}
}
}
}
+
+ // Do this outside of the lock scope as this might lead to a
+ // deadlock with a concurrent release of the channel via the
+ // input gate.
+ if (retriggerRequest) {
+ inputGate.retriggerPartitionRequest(partitionId.getPartitionId());
+ }
}
/**
* Retriggers a subpartition request.
*/
void retriggerSubpartitionRequest(Timer timer, final int subpartitionIndex) {
- synchronized (requestReleaseLock) {
+ synchronized (requestLock) {
checkState(subpartitionView == null, "already requested partition");
timer.schedule(new TimerTask() {
@@ -193,7 +203,7 @@ public class LocalInputChannel extends InputChannel implements BufferAvailabilit
// synchronizing on the request lock means this blocks until the asynchronous request
// for the partition view has been completed
// by then the subpartition view is visible or the channel is released
- synchronized (requestReleaseLock) {
+ synchronized (requestLock) {
checkState(!isReleased, "released");
checkState(subpartitionView != null, "Queried for a buffer before requesting the subpartition.");
return subpartitionView;
@@ -231,19 +241,17 @@ public class LocalInputChannel extends InputChannel implements BufferAvailabilit
}
/**
- * Releases the look ahead {@link Buffer} instance and discards the queue
- * iterator.
+ * Releases the partition reader
*/
@Override
void releaseAllResources() throws IOException {
- synchronized (requestReleaseLock) {
- if (!isReleased) {
- isReleased = true;
+ if (!isReleased) {
+ isReleased = true;
- if (subpartitionView != null) {
- subpartitionView.releaseAllResources();
- subpartitionView = null;
- }
+ ResultSubpartitionView view = subpartitionView;
+ if (view != null) {
+ view.releaseAllResources();
+ subpartitionView = null;
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/388acbca/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 9b36ea9..974ac9f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -29,8 +29,8 @@ import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
-import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -254,6 +254,96 @@ public class LocalInputChannelTest {
ch.getNextBuffer();
}
+ /**
+ * Verifies that concurrent release via the SingleInputGate and re-triggering
+ * of a partition request works smoothly.
+ *
+ * - SingleInputGate acquires its request lock and tries to release all
+ * registered channels. When releasing a channel, it needs to acquire
+ * the channel's shared request-release lock.
+ * - If a LocalInputChannel concurrently retriggers a partition request via
+ * a Timer Thread it acquires the channel's request-release lock and calls
+ * the retrigger callback on the SingleInputGate, which again tries to
+ * acquire the gate's request lock.
+ *
+ * For certain timings this obviously leads to a deadlock. This test reliably
+ * reproduced such a timing (reported in FLINK-5228). This test is pretty much
+ * testing the buggy implementation and has not much more general value. If it
+ * becomes obsolete at some point (future greatness ;)), feel free to remove it.
+ *
+ * The fix in the end was to to not acquire the channels lock when releasing it
+ * and/or not doing any input gate callbacks while holding the channel's lock.
+ * I decided to do both.
+ */
+ @Test
+ public void testConcurrentReleaseAndRetriggerPartitionRequest() throws Exception {
+ final SingleInputGate gate = new SingleInputGate(
+ "test task name",
+ new JobID(),
+ new ExecutionAttemptID(),
+ new IntermediateDataSetID(),
+ 0,
+ 1,
+ mock(PartitionStateChecker.class),
+ new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()
+ );
+
+ ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
+ when(partitionManager
+ .createSubpartitionView(
+ any(ResultPartitionID.class),
+ anyInt(),
+ any(BufferProvider.class),
+ any(BufferAvailabilityListener.class)))
+ .thenAnswer(new Answer<ResultSubpartitionView>() {
+ @Override
+ public ResultSubpartitionView answer(InvocationOnMock invocationOnMock) throws Throwable {
+ // Sleep here a little to give the releaser Thread
+ // time to acquire the input gate lock. We throw
+ // the Exception to retrigger the request.
+ Thread.sleep(100);
+ throw new PartitionNotFoundException(new ResultPartitionID());
+ }
+ });
+
+ final LocalInputChannel channel = new LocalInputChannel(
+ gate,
+ 0,
+ new ResultPartitionID(),
+ partitionManager,
+ new TaskEventDispatcher(),
+ new Tuple2<>(1, 1),
+ new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+
+ gate.setInputChannel(new IntermediateResultPartitionID(), channel);
+
+ Thread releaser = new Thread() {
+ @Override
+ public void run() {
+ try {
+ gate.releaseAllResources();
+ } catch (IOException ignored) {
+ }
+ }
+ };
+
+ Thread requester = new Thread() {
+ @Override
+ public void run() {
+ try {
+ channel.requestSubpartition(0);
+ } catch (IOException | InterruptedException ignored) {
+ }
+ }
+ };
+
+ requester.start();
+ releaser.start();
+
+ releaser.join();
+ requester.join();
+ }
+
// ---------------------------------------------------------------------------------------------
private LocalInputChannel createLocalInputChannel(