You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2015/07/24 14:00:22 UTC
flink git commit: [FLINK-2384] [runtime] Move blocking I/O call
outside of synchronized block
Repository: flink
Updated Branches:
refs/heads/master 9b1343d7f -> efca79cfb
[FLINK-2384] [runtime] Move blocking I/O call outside of synchronized block
Problem: Waiting on asynchronous write requests with the partition lock can
result in a deadlock, because all other operations on the same partition are
blocked. It is possible that the I/O writer itself needs to access the
partition, in which cases the whole program blocks.
Solution: Move the wait outside the synchronized block. This was not necessary
before, because no operation assumes the spilling to be finished when the
finish call has returned.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/efca79cf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/efca79cf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/efca79cf
Branch: refs/heads/master
Commit: efca79cfb7b496b4bec70561cc94af069c644ef2
Parents: 9b1343d
Author: Ufuk Celebi <uc...@apache.org>
Authored: Thu Jul 23 15:19:57 2015 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Fri Jul 24 13:59:53 2015 +0200
----------------------------------------------------------------------
.../partition/SpillableSubpartition.java | 10 +--
.../partition/SpillableSubpartitionTest.java | 74 +++++++++++++++++++-
2 files changed, 78 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/efca79cf/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
index 7ec24ac..91f2042 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
@@ -98,14 +98,14 @@ class SpillableSubpartition extends ResultSubpartition {
public void finish() throws IOException {
synchronized (buffers) {
if (add(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE))) {
- // If we are spilling/have spilled, wait for the writer to finish.
- if (spillWriter != null) {
- spillWriter.close();
- }
-
isFinished = true;
}
}
+
+ // If we are spilling/have spilled, wait for the writer to finish.
+ if (spillWriter != null) {
+ spillWriter.close();
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/efca79cf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
index 8c8692d..c530eff 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
@@ -18,15 +18,27 @@
package org.apache.flink.runtime.io.network.partition;
+import org.apache.flink.runtime.io.disk.iomanager.AsynchronousBufferFileWriter;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.junit.AfterClass;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
-import static org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode.*;
+import static org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode.SYNC;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
public class SpillableSubpartitionTest extends SubpartitionTestBase {
@@ -46,4 +58,64 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
ResultSubpartition createSubpartition() {
return new SpillableSubpartition(0, mock(ResultPartition.class), ioManager, SYNC);
}
+
+
+ /**
+ * Tests a fix for FLINK-2384.
+ *
+ * @see <a href="https://issues.apache.org/jira/browse/FLINK-2384">FLINK-2384</a>
+ */
+ @Test
+ public void testConcurrentFinishAndReleaseMemory() throws Exception {
+ // Latches to blocking
+ final CountDownLatch doneLatch = new CountDownLatch(1);
+ final CountDownLatch blockLatch = new CountDownLatch(1);
+
+ // Blocking spill writer (blocks on the close call)
+ AsynchronousBufferFileWriter spillWriter = mock(AsynchronousBufferFileWriter.class);
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ blockLatch.countDown();
+ doneLatch.await();
+ return null;
+ }
+ }).when(spillWriter).close();
+
+ // Mock I/O manager returning the blocking spill writer
+ IOManager ioManager = mock(IOManager.class);
+ when(ioManager.createBufferFileWriter(any(FileIOChannel.ID.class)))
+ .thenReturn(spillWriter);
+
+ // The partition
+ final SpillableSubpartition partition = new SpillableSubpartition(
+ 0, mock(ResultPartition.class), ioManager, SYNC);
+
+ // Spill the partition initially (creates the spill writer)
+ partition.releaseMemory();
+
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+
+ // Finish the partition (this blocks because of the mock blocking writer)
+ Future<Void> blockingFinish = executor.submit(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ partition.finish();
+ return null;
+ }
+ });
+
+ // Ensure that the blocking call has been made
+ blockLatch.await();
+
+ // This call needs to go through. FLINK-2384 discovered a bug, in
+ // which the finish call was holding a lock, which was leading to a
+ // deadlock when another operation on the partition was happening.
+ partition.releaseMemory();
+
+ // Check that the finish call succeeded w/o problems as well to avoid
+ // false test successes.
+ doneLatch.countDown();
+ blockingFinish.get();
+ }
}