You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2015/07/24 14:00:22 UTC

flink git commit: [FLINK-2384] [runtime] Move blocking I/O call outside of synchronized block

Repository: flink
Updated Branches:
  refs/heads/master 9b1343d7f -> efca79cfb


[FLINK-2384] [runtime] Move blocking I/O call outside of synchronized block

Problem: Waiting on asynchronous write requests with the partition lock can
result in a deadlock, because all other operations on the same partition are
blocked. It is possible that the I/O writer itself needs to access the
partition, in which cases the whole program blocks.

Solution: Move the wait outside the synchronized block. This was not necessary
before, because no operation assumes the spilling to be finished when the
finish call has returned.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/efca79cf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/efca79cf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/efca79cf

Branch: refs/heads/master
Commit: efca79cfb7b496b4bec70561cc94af069c644ef2
Parents: 9b1343d
Author: Ufuk Celebi <uc...@apache.org>
Authored: Thu Jul 23 15:19:57 2015 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Fri Jul 24 13:59:53 2015 +0200

----------------------------------------------------------------------
 .../partition/SpillableSubpartition.java        | 10 +--
 .../partition/SpillableSubpartitionTest.java    | 74 +++++++++++++++++++-
 2 files changed, 78 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/efca79cf/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
index 7ec24ac..91f2042 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
@@ -98,14 +98,14 @@ class SpillableSubpartition extends ResultSubpartition {
 	public void finish() throws IOException {
 		synchronized (buffers) {
 			if (add(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE))) {
-				// If we are spilling/have spilled, wait for the writer to finish.
-				if (spillWriter != null) {
-					spillWriter.close();
-				}
-
 				isFinished = true;
 			}
 		}
+
+		// If we are spilling/have spilled, wait for the writer to finish.
+		if (spillWriter != null) {
+			spillWriter.close();
+		}
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/efca79cf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
index 8c8692d..c530eff 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
@@ -18,15 +18,27 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
+import org.apache.flink.runtime.io.disk.iomanager.AsynchronousBufferFileWriter;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.junit.AfterClass;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
-import static org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode.*;
+import static org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode.SYNC;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class SpillableSubpartitionTest extends SubpartitionTestBase {
 
@@ -46,4 +58,64 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 	ResultSubpartition createSubpartition() {
 		return new SpillableSubpartition(0, mock(ResultPartition.class), ioManager, SYNC);
 	}
+
+
+	/**
+	 * Tests a fix for FLINK-2384.
+	 *
+	 * @see <a href="https://issues.apache.org/jira/browse/FLINK-2384">FLINK-2384</a>
+	 */
+	@Test
+	public void testConcurrentFinishAndReleaseMemory() throws Exception {
+		// Latches to blocking
+		final CountDownLatch doneLatch = new CountDownLatch(1);
+		final CountDownLatch blockLatch = new CountDownLatch(1);
+
+		// Blocking spill writer (blocks on the close call)
+		AsynchronousBufferFileWriter spillWriter = mock(AsynchronousBufferFileWriter.class);
+		doAnswer(new Answer<Void>() {
+			@Override
+			public Void answer(InvocationOnMock invocation) throws Throwable {
+				blockLatch.countDown();
+				doneLatch.await();
+				return null;
+			}
+		}).when(spillWriter).close();
+
+		// Mock I/O manager returning the blocking spill writer
+		IOManager ioManager = mock(IOManager.class);
+		when(ioManager.createBufferFileWriter(any(FileIOChannel.ID.class)))
+				.thenReturn(spillWriter);
+
+		// The partition
+		final SpillableSubpartition partition = new SpillableSubpartition(
+				0, mock(ResultPartition.class), ioManager, SYNC);
+
+		// Spill the partition initially (creates the spill writer)
+		partition.releaseMemory();
+
+		ExecutorService executor = Executors.newSingleThreadExecutor();
+
+		// Finish the partition (this blocks because of the mock blocking writer)
+		Future<Void> blockingFinish = executor.submit(new Callable<Void>() {
+			@Override
+			public Void call() throws Exception {
+				partition.finish();
+				return null;
+			}
+		});
+
+		// Ensure that the blocking call has been made
+		blockLatch.await();
+
+		// This call needs to go through. FLINK-2384 discovered a bug, in
+		// which the finish call was holding a lock, which was leading to a
+		// deadlock when another operation on the partition was happening.
+		partition.releaseMemory();
+
+		// Check that the finish call succeeded w/o problems as well to avoid
+		// false test successes.
+		doneLatch.countDown();
+		blockingFinish.get();
+	}
 }