You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2015/08/26 11:08:02 UTC

flink git commit: [FLINK-2460] [runtime] Check parent state in isReleased() check of partition view

Repository: flink
Updated Branches:
  refs/heads/master 5a9daca44 -> a17d4e823


[FLINK-2460] [runtime] Check parent state in isReleased() check of partition view

- Address PR comments

This closes #1051.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a17d4e82
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a17d4e82
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a17d4e82

Branch: refs/heads/master
Commit: a17d4e823fd08e8e49fde169dc3ddd264964d85a
Parents: 5a9daca
Author: Ufuk Celebi <uc...@apache.org>
Authored: Mon Aug 10 15:15:07 2015 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Wed Aug 26 11:07:03 2015 +0200

----------------------------------------------------------------------
 .../partition/PipelinedSubpartition.java        |  7 +++-
 .../network/partition/ResultSubpartition.java   |  2 +
 .../partition/SpillableSubpartition.java        |  7 +++-
 .../partition/SpillableSubpartitionView.java    |  4 +-
 .../SpilledSubpartitionViewAsyncIO.java         |  2 +-
 .../SpilledSubpartitionViewSyncIO.java          |  2 +-
 .../partition/PipelinedSubpartitionTest.java    |  4 ++
 .../network/partition/SubpartitionTestBase.java | 42 ++++++++++++++++++++
 8 files changed, 64 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a17d4e82/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
index 931790a..3b7a2a6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
@@ -41,7 +41,7 @@ class PipelinedSubpartition extends ResultSubpartition {
 	private boolean isFinished;
 
 	/** Flag indicating whether the subpartition has been released. */
-	private boolean isReleased;
+	private volatile boolean isReleased;
 
 	/**
 	 * A data availability listener. Registered, when the consuming task is faster than the
@@ -167,6 +167,11 @@ class PipelinedSubpartition extends ResultSubpartition {
 	}
 
 	@Override
+	public boolean isReleased() {
+		return isReleased;
+	}
+
+	@Override
 	public PipelinedSubpartitionView createReadView(BufferProvider bufferProvider) {
 		synchronized (buffers) {
 			if (readView != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/a17d4e82/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
index e9dfe32..b7ca9c4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
@@ -81,4 +81,6 @@ public abstract class ResultSubpartition {
 
 	abstract int releaseMemory() throws IOException;
 
+	abstract public boolean isReleased();
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a17d4e82/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
index 4a18691..21e9cc6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
@@ -59,7 +59,7 @@ class SpillableSubpartition extends ResultSubpartition {
 	private boolean isFinished;
 
 	/** Flag indicating whether the subpartition has been released. */
-	boolean isReleased;
+	private volatile boolean isReleased;
 
 	/** The read view to consume this subpartition. */
 	private ResultSubpartitionView readView;
@@ -168,6 +168,11 @@ class SpillableSubpartition extends ResultSubpartition {
 	}
 
 	@Override
+	public boolean isReleased() {
+		return isReleased;
+	}
+
+	@Override
 	public ResultSubpartitionView createReadView(BufferProvider bufferProvider) throws IOException {
 		synchronized (buffers) {
 			if (!isFinished) {

http://git-wip-us.apache.org/repos/asf/flink/blob/a17d4e82/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
index 972e34b..c9da40a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
@@ -73,7 +73,7 @@ class SpillableSubpartitionView implements ResultSubpartitionView {
 
 		// 1) In-memory
 		synchronized (parent.buffers) {
-			if (parent.isReleased) {
+			if (parent.isReleased()) {
 				return null;
 			}
 
@@ -162,7 +162,7 @@ class SpillableSubpartitionView implements ResultSubpartitionView {
 
 	@Override
 	public boolean isReleased() {
-		return isReleased.get();
+		return parent.isReleased() || isReleased.get();
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a17d4e82/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewAsyncIO.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewAsyncIO.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewAsyncIO.java
index ea5c20b..052a7cd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewAsyncIO.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewAsyncIO.java
@@ -187,7 +187,7 @@ class SpilledSubpartitionViewAsyncIO implements ResultSubpartitionView {
 
 	@Override
 	public boolean isReleased() {
-		return isReleased;
+		return parent.isReleased() || isReleased;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a17d4e82/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewSyncIO.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewSyncIO.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewSyncIO.java
index 24099a7..5b91668 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewSyncIO.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewSyncIO.java
@@ -108,7 +108,7 @@ class SpilledSubpartitionViewSyncIO implements ResultSubpartitionView {
 
 	@Override
 	public boolean isReleased() {
-		return isReleased.get();
+		return parent.isReleased() || isReleased.get();
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a17d4e82/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
index 8750a1a..6520066 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
@@ -18,11 +18,14 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.io.network.util.TestBufferFactory;
 import org.apache.flink.runtime.io.network.util.TestConsumerCallback;
 import org.apache.flink.runtime.io.network.util.TestNotificationListener;
 import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
@@ -36,6 +39,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
+import static org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode.ASYNC;
 import static org.apache.flink.runtime.io.network.util.TestBufferFactory.createBuffer;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;

http://git-wip-us.apache.org/repos/asf/flink/blob/a17d4e82/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
index cb0069b..26a8f29 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
@@ -19,10 +19,15 @@
 package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.util.TestBufferFactory;
+import org.apache.flink.runtime.io.network.util.TestInfiniteBufferProvider;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 
 /**
@@ -68,4 +73,41 @@ public abstract class SubpartitionTestBase extends TestLogger {
 			}
 		}
 	}
+
+	@Test
+	public void testReleaseParent() throws Exception {
+		final ResultSubpartition partition = createSubpartition();
+		verifyViewReleasedAfterParentRelease(partition);
+	}
+
+	@Test
+	public void testReleaseParentAfterSpilled() throws Exception {
+		final ResultSubpartition partition = createSubpartition();
+		partition.releaseMemory();
+
+		verifyViewReleasedAfterParentRelease(partition);
+	}
+
+	private void verifyViewReleasedAfterParentRelease(ResultSubpartition partition) throws Exception {
+		// Add a buffer
+		Buffer buffer = TestBufferFactory.createBuffer();
+		partition.add(buffer);
+		partition.finish();
+
+		TestInfiniteBufferProvider buffers = new TestInfiniteBufferProvider();
+
+		// Create the view
+		ResultSubpartitionView view = partition.createReadView(buffers);
+
+		// The added buffer and end-of-partition event
+		assertNotNull(view.getNextBuffer());
+		assertNotNull(view.getNextBuffer());
+
+		// Release the parent
+		assertFalse(view.isReleased());
+		partition.release();
+
+		// Verify that parent release is reflected at partition view
+		assertTrue(view.isReleased());
+	}
 }