You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/01/11 10:29:37 UTC
[4/4] flink git commit: [FLINK-8371][network] always recycle Buffers
when releasing SpillableSubpartition
[FLINK-8371][network] always recycle Buffers when releasing SpillableSubpartition
There were places where Buffer instances were not released upon
SpillableSubpartition#release() with a view attached to a non-spilled
subpartition:
1) SpillableSubpartition#buffer:
SpillableSubpartition#release() delegates the recycling to the view, but
SpillableSubpartitionView does not clean up the 'buffers' queue (the
recycling was only done by the subpartition if there was no view).
2) SpillableSubpartitionView#nextBuffer:
If this field is populated when the subpartition is released, it will neither
be given out in subsequent SpillableSubpartitionView#getNextBuffer() calls
(there was a short path returning 'null' here), nor was it recycled
-> similarly to the PipelinesSubpartition implementation, make
SpillableSubpartition#release() always clean up and recycle the buffers
-> recycle SpillableSubpartitionView#nextBuffer in
SpillableSubpartitionView#releaseAllResources()
This closes #5276.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/71ede399
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/71ede399
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/71ede399
Branch: refs/heads/release-1.4
Commit: 71ede3992afe8f6907dd3c6c1e232c5b745048b4
Parents: a316989
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Fri Jan 5 18:18:35 2018 +0100
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Thu Jan 11 11:27:53 2018 +0100
----------------------------------------------------------------------
.../partition/PipelinedSubpartition.java | 2 -
.../partition/SpillableSubpartition.java | 11 ++-
.../partition/SpillableSubpartitionView.java | 7 ++
.../partition/PipelinedSubpartitionTest.java | 72 ++++++++++++++
.../partition/SpillableSubpartitionTest.java | 99 ++++++++++++++++++++
5 files changed, 185 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/71ede399/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
index c1d6f13..92eb7ce 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
@@ -128,7 +128,6 @@ class PipelinedSubpartition extends ResultSubpartition {
buffer.recycle();
}
- // Get the view...
view = readView;
readView = null;
@@ -138,7 +137,6 @@ class PipelinedSubpartition extends ResultSubpartition {
LOG.debug("Released {}.", this);
- // Release all resources of the view
if (view != null) {
view.releaseAllResources();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/71ede399/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
index 4a8e165..093e9c8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
@@ -138,6 +138,7 @@ class SpillableSubpartition extends ResultSubpartition {
@Override
public void release() throws IOException {
+ // view reference accessible outside the lock, but assigned inside the locked scope
final ResultSubpartitionView view;
synchronized (buffers) {
@@ -145,16 +146,18 @@ class SpillableSubpartition extends ResultSubpartition {
return;
}
+ // Release all available buffers
+ for (Buffer buffer : buffers) {
+ buffer.recycle();
+ }
+ buffers.clear();
+
view = readView;
// No consumer yet, we are responsible to clean everything up. If
// one is available, the view is responsible is to clean up (see
// below).
if (view == null) {
- for (Buffer buffer : buffers) {
- buffer.recycle();
- }
- buffers.clear();
// TODO This can block until all buffers are written out to
// disk if a spill is in-progress before deleting the file.
http://git-wip-us.apache.org/repos/asf/flink/blob/71ede399/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
index 6781902..f88a6b6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
@@ -165,6 +165,13 @@ class SpillableSubpartitionView implements ResultSubpartitionView {
if (spilled != null) {
spilled.releaseAllResources();
}
+ // we are never giving this buffer out in getNextBuffer(), so we need to clean it up
+ synchronized (buffers) {
+ if (nextBuffer != null) {
+ nextBuffer.recycle();
+ nextBuffer = null;
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/71ede399/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
index 6d36aa6..81ac40a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
@@ -19,16 +19,20 @@
package org.apache.flink.runtime.io.network.partition;
import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.util.TestConsumerCallback;
import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
import org.apache.flink.runtime.io.network.util.TestProducerSource;
import org.apache.flink.runtime.io.network.util.TestSubpartitionConsumer;
import org.apache.flink.runtime.io.network.util.TestSubpartitionProducer;
+
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.Test;
import java.util.concurrent.ExecutorService;
@@ -238,4 +242,72 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
producerResult.get();
consumerResult.get();
}
+
+
+ /**
+ * Tests cleanup of {@link PipelinedSubpartition#release()} with no read view attached.
+ */
+ @Test
+ public void testCleanupReleasedPartitionNoView() throws Exception {
+ testCleanupReleasedPartition(false);
+ }
+
+ /**
+ * Tests cleanup of {@link PipelinedSubpartition#release()} with a read view attached.
+ */
+ @Test
+ public void testCleanupReleasedPartitionWithView() throws Exception {
+ testCleanupReleasedPartition(true);
+ }
+
+ /**
+ * Tests cleanup of {@link PipelinedSubpartition#release()}.
+ *
+ * @param createView
+ * whether the partition should have a view attached to it (<tt>true</tt>) or not (<tt>false</tt>)
+ */
+ private void testCleanupReleasedPartition(boolean createView) throws Exception {
+ PipelinedSubpartition partition = createSubpartition();
+
+ Buffer buffer1 = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
+ FreeingBufferRecycler.INSTANCE);
+ Buffer buffer2 = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
+ FreeingBufferRecycler.INSTANCE);
+ boolean buffer1Recycled;
+ boolean buffer2Recycled;
+ try {
+ partition.add(buffer1);
+ partition.add(buffer2);
+ // create the read view first
+ ResultSubpartitionView view = null;
+ if (createView) {
+ view = partition.createReadView(numBuffers -> {});
+ }
+
+ partition.release();
+
+ assertTrue(partition.isReleased());
+ if (createView) {
+ assertTrue(view.isReleased());
+ }
+ assertTrue(buffer1.isRecycled());
+ } finally {
+ buffer1Recycled = buffer1.isRecycled();
+ if (!buffer1Recycled) {
+ buffer1.recycle();
+ }
+ buffer2Recycled = buffer2.isRecycled();
+ if (!buffer2Recycled) {
+ buffer2.recycle();
+ }
+ }
+ if (!buffer1Recycled) {
+ Assert.fail("buffer 1 not recycled");
+ }
+ if (!buffer2Recycled) {
+ Assert.fail("buffer 2 not recycled");
+ }
+ assertEquals(2, partition.getTotalNumberOfBuffers());
+ assertEquals(2 * 4096, partition.getTotalNumberOfBytes());
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/71ede399/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
index c50b361..18169b6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
@@ -541,6 +541,105 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
}
}
+ /**
+ * Tests cleanup of {@link SpillableSubpartition#release()} with a spillable partition and no
+ * read view attached.
+ */
+ @Test
+ public void testCleanupReleasedSpillablePartitionNoView() throws Exception {
+ testCleanupReleasedPartition(false, false);
+ }
+
+ /**
+ * Tests cleanup of {@link SpillableSubpartition#release()} with a spillable partition and a
+ * read view attached - [FLINK-8371].
+ */
+ @Test
+ public void testCleanupReleasedSpillablePartitionWithView() throws Exception {
+ testCleanupReleasedPartition(false, true);
+ }
+
+ /**
+ * Tests cleanup of {@link SpillableSubpartition#release()} with a spilled partition and no
+ * read view attached.
+ */
+ @Test
+ public void testCleanupReleasedSpilledPartitionNoView() throws Exception {
+ testCleanupReleasedPartition(true, false);
+ }
+
+ /**
+ * Tests cleanup of {@link SpillableSubpartition#release()} with a spilled partition and a
+ * read view attached.
+ */
+ @Test
+ public void testCleanupReleasedSpilledPartitionWithView() throws Exception {
+ testCleanupReleasedPartition(true, true);
+ }
+
+ /**
+ * Tests cleanup of {@link SpillableSubpartition#release()}.
+ *
+ * @param spilled
+ * whether the partition should be spilled to disk (<tt>true</tt>) or not (<tt>false</tt>,
+ * spillable)
+ * @param createView
+ * whether the partition should have a view attached to it (<tt>true</tt>) or not (<tt>false</tt>)
+ */
+ private void testCleanupReleasedPartition(boolean spilled, boolean createView) throws Exception {
+ SpillableSubpartition partition = createSubpartition();
+
+ Buffer buffer1 = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
+ FreeingBufferRecycler.INSTANCE);
+ Buffer buffer2 = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
+ FreeingBufferRecycler.INSTANCE);
+ boolean buffer1Recycled;
+ boolean buffer2Recycled;
+ try {
+ partition.add(buffer1);
+ partition.add(buffer2);
+ // create the read view before spilling
+ // (tests both code paths since this view may then contain the spilled view)
+ ResultSubpartitionView view = null;
+ if (createView) {
+ partition.finish();
+ view = partition.createReadView(numBuffers -> {});
+ }
+ if (spilled) {
+ // note: in case we create a view, one buffer will already reside in the view and
+ // one EndOfPartitionEvent will be added instead (so overall the number of
+ // buffers to spill is the same
+ assertEquals(2, partition.releaseMemory());
+ }
+
+ partition.release();
+
+ assertTrue(partition.isReleased());
+ if (createView) {
+ assertTrue(view.isReleased());
+ }
+ assertTrue(buffer1.isRecycled());
+ } finally {
+ buffer1Recycled = buffer1.isRecycled();
+ if (!buffer1Recycled) {
+ buffer1.recycle();
+ }
+ buffer2Recycled = buffer2.isRecycled();
+ if (!buffer2Recycled) {
+ buffer2.recycle();
+ }
+ }
+ if (!buffer1Recycled) {
+ Assert.fail("buffer 1 not recycled");
+ }
+ if (!buffer2Recycled) {
+ Assert.fail("buffer 2 not recycled");
+ }
+ // note: in case we create a view, there will be an additional EndOfPartitionEvent
+ assertEquals(createView ? 3 : 2, partition.getTotalNumberOfBuffers());
+ assertEquals((createView ? 4 : 0) + 2 * 4096, partition.getTotalNumberOfBytes());
+ }
+
private static class AwaitableBufferAvailablityListener implements BufferAvailabilityListener {
private long numNotifiedBuffers;