You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2015/08/26 11:08:02 UTC
flink git commit: [FLINK-2460] [runtime] Check parent state in
isReleased() check of partition view
Repository: flink
Updated Branches:
refs/heads/master 5a9daca44 -> a17d4e823
[FLINK-2460] [runtime] Check parent state in isReleased() check of partition view
- Address PR comments
This closes #1051.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a17d4e82
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a17d4e82
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a17d4e82
Branch: refs/heads/master
Commit: a17d4e823fd08e8e49fde169dc3ddd264964d85a
Parents: 5a9daca
Author: Ufuk Celebi <uc...@apache.org>
Authored: Mon Aug 10 15:15:07 2015 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Wed Aug 26 11:07:03 2015 +0200
----------------------------------------------------------------------
.../partition/PipelinedSubpartition.java | 7 +++-
.../network/partition/ResultSubpartition.java | 2 +
.../partition/SpillableSubpartition.java | 7 +++-
.../partition/SpillableSubpartitionView.java | 4 +-
.../SpilledSubpartitionViewAsyncIO.java | 2 +-
.../SpilledSubpartitionViewSyncIO.java | 2 +-
.../partition/PipelinedSubpartitionTest.java | 4 ++
.../network/partition/SubpartitionTestBase.java | 42 ++++++++++++++++++++
8 files changed, 64 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a17d4e82/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
index 931790a..3b7a2a6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
@@ -41,7 +41,7 @@ class PipelinedSubpartition extends ResultSubpartition {
private boolean isFinished;
/** Flag indicating whether the subpartition has been released. */
- private boolean isReleased;
+ private volatile boolean isReleased;
/**
* A data availability listener. Registered, when the consuming task is faster than the
@@ -167,6 +167,11 @@ class PipelinedSubpartition extends ResultSubpartition {
}
@Override
+ public boolean isReleased() {
+ return isReleased;
+ }
+
+ @Override
public PipelinedSubpartitionView createReadView(BufferProvider bufferProvider) {
synchronized (buffers) {
if (readView != null) {
http://git-wip-us.apache.org/repos/asf/flink/blob/a17d4e82/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
index e9dfe32..b7ca9c4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
@@ -81,4 +81,6 @@ public abstract class ResultSubpartition {
abstract int releaseMemory() throws IOException;
+ abstract public boolean isReleased();
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a17d4e82/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
index 4a18691..21e9cc6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
@@ -59,7 +59,7 @@ class SpillableSubpartition extends ResultSubpartition {
private boolean isFinished;
/** Flag indicating whether the subpartition has been released. */
- boolean isReleased;
+ private volatile boolean isReleased;
/** The read view to consume this subpartition. */
private ResultSubpartitionView readView;
@@ -168,6 +168,11 @@ class SpillableSubpartition extends ResultSubpartition {
}
@Override
+ public boolean isReleased() {
+ return isReleased;
+ }
+
+ @Override
public ResultSubpartitionView createReadView(BufferProvider bufferProvider) throws IOException {
synchronized (buffers) {
if (!isFinished) {
http://git-wip-us.apache.org/repos/asf/flink/blob/a17d4e82/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
index 972e34b..c9da40a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
@@ -73,7 +73,7 @@ class SpillableSubpartitionView implements ResultSubpartitionView {
// 1) In-memory
synchronized (parent.buffers) {
- if (parent.isReleased) {
+ if (parent.isReleased()) {
return null;
}
@@ -162,7 +162,7 @@ class SpillableSubpartitionView implements ResultSubpartitionView {
@Override
public boolean isReleased() {
- return isReleased.get();
+ return parent.isReleased() || isReleased.get();
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/a17d4e82/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewAsyncIO.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewAsyncIO.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewAsyncIO.java
index ea5c20b..052a7cd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewAsyncIO.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewAsyncIO.java
@@ -187,7 +187,7 @@ class SpilledSubpartitionViewAsyncIO implements ResultSubpartitionView {
@Override
public boolean isReleased() {
- return isReleased;
+ return parent.isReleased() || isReleased;
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/a17d4e82/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewSyncIO.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewSyncIO.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewSyncIO.java
index 24099a7..5b91668 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewSyncIO.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewSyncIO.java
@@ -108,7 +108,7 @@ class SpilledSubpartitionViewSyncIO implements ResultSubpartitionView {
@Override
public boolean isReleased() {
- return isReleased.get();
+ return parent.isReleased() || isReleased.get();
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/a17d4e82/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
index 8750a1a..6520066 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
@@ -18,11 +18,14 @@
package org.apache.flink.runtime.io.network.partition;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.io.network.util.TestBufferFactory;
import org.apache.flink.runtime.io.network.util.TestConsumerCallback;
import org.apache.flink.runtime.io.network.util.TestNotificationListener;
import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
@@ -36,6 +39,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import static org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode.ASYNC;
import static org.apache.flink.runtime.io.network.util.TestBufferFactory.createBuffer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
http://git-wip-us.apache.org/repos/asf/flink/blob/a17d4e82/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
index cb0069b..26a8f29 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
@@ -19,10 +19,15 @@
package org.apache.flink.runtime.io.network.partition;
import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.util.TestBufferFactory;
+import org.apache.flink.runtime.io.network.util.TestInfiniteBufferProvider;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
/**
@@ -68,4 +73,41 @@ public abstract class SubpartitionTestBase extends TestLogger {
}
}
}
+
+ @Test
+ public void testReleaseParent() throws Exception {
+ final ResultSubpartition partition = createSubpartition();
+ verifyViewReleasedAfterParentRelease(partition);
+ }
+
+ @Test
+ public void testReleaseParentAfterSpilled() throws Exception {
+ final ResultSubpartition partition = createSubpartition();
+ partition.releaseMemory();
+
+ verifyViewReleasedAfterParentRelease(partition);
+ }
+
+ private void verifyViewReleasedAfterParentRelease(ResultSubpartition partition) throws Exception {
+ // Add a buffer
+ Buffer buffer = TestBufferFactory.createBuffer();
+ partition.add(buffer);
+ partition.finish();
+
+ TestInfiniteBufferProvider buffers = new TestInfiniteBufferProvider();
+
+ // Create the view
+ ResultSubpartitionView view = partition.createReadView(buffers);
+
+ // The added buffer and end-of-partition event
+ assertNotNull(view.getNextBuffer());
+ assertNotNull(view.getNextBuffer());
+
+ // Release the parent
+ assertFalse(view.isReleased());
+ partition.release();
+
+ // Verify that parent release is reflected at partition view
+ assertTrue(view.isReleased());
+ }
}