You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/01/11 10:29:37 UTC

[4/4] flink git commit: [FLINK-8371][network] always recycle Buffers when releasing SpillableSubpartition

[FLINK-8371][network] always recycle Buffers when releasing SpillableSubpartition

There were places where Buffer instances were not released upon
SpillableSubpartition#release() with a view attached to a non-spilled
subpartition:

1) SpillableSubpartition#buffer:
  SpillableSubpartition#release() delegates the recycling to the view, but
  SpillableSubpartitionView does not clean up the 'buffers' queue (the
  recycling was only done by the subpartition if there was no view).
2) SpillableSubpartitionView#nextBuffer:
  If this field is populated when the subpartition is released, it will neither
  be given out in subsequent SpillableSubpartitionView#getNextBuffer() calls
  (there was a short path returning 'null' here), nor was it recycled

-> similarly to the PipelinesSubpartition implementation, make
   SpillableSubpartition#release() always clean up and recycle the buffers
-> recycle SpillableSubpartitionView#nextBuffer in
   SpillableSubpartitionView#releaseAllResources()

This closes #5276.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/71ede399
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/71ede399
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/71ede399

Branch: refs/heads/release-1.4
Commit: 71ede3992afe8f6907dd3c6c1e232c5b745048b4
Parents: a316989
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Fri Jan 5 18:18:35 2018 +0100
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Thu Jan 11 11:27:53 2018 +0100

----------------------------------------------------------------------
 .../partition/PipelinedSubpartition.java        |  2 -
 .../partition/SpillableSubpartition.java        | 11 ++-
 .../partition/SpillableSubpartitionView.java    |  7 ++
 .../partition/PipelinedSubpartitionTest.java    | 72 ++++++++++++++
 .../partition/SpillableSubpartitionTest.java    | 99 ++++++++++++++++++++
 5 files changed, 185 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/71ede399/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
index c1d6f13..92eb7ce 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
@@ -128,7 +128,6 @@ class PipelinedSubpartition extends ResultSubpartition {
 				buffer.recycle();
 			}
 
-			// Get the view...
 			view = readView;
 			readView = null;
 
@@ -138,7 +137,6 @@ class PipelinedSubpartition extends ResultSubpartition {
 
 		LOG.debug("Released {}.", this);
 
-		// Release all resources of the view
 		if (view != null) {
 			view.releaseAllResources();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/71ede399/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
index 4a8e165..093e9c8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
@@ -138,6 +138,7 @@ class SpillableSubpartition extends ResultSubpartition {
 
 	@Override
 	public void release() throws IOException {
+		// view reference accessible outside the lock, but assigned inside the locked scope
 		final ResultSubpartitionView view;
 
 		synchronized (buffers) {
@@ -145,16 +146,18 @@ class SpillableSubpartition extends ResultSubpartition {
 				return;
 			}
 
+			// Release all available buffers
+			for (Buffer buffer : buffers) {
+				buffer.recycle();
+			}
+			buffers.clear();
+
 			view = readView;
 
 			// No consumer yet, we are responsible to clean everything up. If
 			// one is available, the view is responsible is to clean up (see
 			// below).
 			if (view == null) {
-				for (Buffer buffer : buffers) {
-					buffer.recycle();
-				}
-				buffers.clear();
 
 				// TODO This can block until all buffers are written out to
 				// disk if a spill is in-progress before deleting the file.

http://git-wip-us.apache.org/repos/asf/flink/blob/71ede399/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
index 6781902..f88a6b6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
@@ -165,6 +165,13 @@ class SpillableSubpartitionView implements ResultSubpartitionView {
 			if (spilled != null) {
 				spilled.releaseAllResources();
 			}
+			// we are never giving this buffer out in getNextBuffer(), so we need to clean it up
+			synchronized (buffers) {
+				if (nextBuffer != null) {
+					nextBuffer.recycle();
+					nextBuffer = null;
+				}
+			}
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/71ede399/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
index 6d36aa6..81ac40a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
@@ -19,16 +19,20 @@
 package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.util.TestConsumerCallback;
 import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
 import org.apache.flink.runtime.io.network.util.TestProducerSource;
 import org.apache.flink.runtime.io.network.util.TestSubpartitionConsumer;
 import org.apache.flink.runtime.io.network.util.TestSubpartitionProducer;
+
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.concurrent.ExecutorService;
@@ -238,4 +242,72 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
 		producerResult.get();
 		consumerResult.get();
 	}
+
+
+	/**
+	 * Tests cleanup of {@link PipelinedSubpartition#release()} with no read view attached.
+	 */
+	@Test
+	public void testCleanupReleasedPartitionNoView() throws Exception {
+		testCleanupReleasedPartition(false);
+	}
+
+	/**
+	 * Tests cleanup of {@link PipelinedSubpartition#release()} with a read view attached.
+	 */
+	@Test
+	public void testCleanupReleasedPartitionWithView() throws Exception {
+		testCleanupReleasedPartition(true);
+	}
+
+	/**
+	 * Tests cleanup of {@link PipelinedSubpartition#release()}.
+	 *
+	 * @param createView
+	 * 		whether the partition should have a view attached to it (<tt>true</tt>) or not (<tt>false</tt>)
+	 */
+	private void testCleanupReleasedPartition(boolean createView) throws Exception {
+		PipelinedSubpartition partition = createSubpartition();
+
+		Buffer buffer1 = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
+			FreeingBufferRecycler.INSTANCE);
+		Buffer buffer2 = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
+			FreeingBufferRecycler.INSTANCE);
+		boolean buffer1Recycled;
+		boolean buffer2Recycled;
+		try {
+			partition.add(buffer1);
+			partition.add(buffer2);
+			// create the read view first
+			ResultSubpartitionView view = null;
+			if (createView) {
+				view = partition.createReadView(numBuffers -> {});
+			}
+
+			partition.release();
+
+			assertTrue(partition.isReleased());
+			if (createView) {
+				assertTrue(view.isReleased());
+			}
+			assertTrue(buffer1.isRecycled());
+		} finally {
+			buffer1Recycled = buffer1.isRecycled();
+			if (!buffer1Recycled) {
+				buffer1.recycle();
+			}
+			buffer2Recycled = buffer2.isRecycled();
+			if (!buffer2Recycled) {
+				buffer2.recycle();
+			}
+		}
+		if (!buffer1Recycled) {
+			Assert.fail("buffer 1 not recycled");
+		}
+		if (!buffer2Recycled) {
+			Assert.fail("buffer 2 not recycled");
+		}
+		assertEquals(2, partition.getTotalNumberOfBuffers());
+		assertEquals(2 * 4096, partition.getTotalNumberOfBytes());
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/71ede399/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
index c50b361..18169b6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
@@ -541,6 +541,105 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 		}
 	}
 
+	/**
+	 * Tests cleanup of {@link SpillableSubpartition#release()} with a spillable partition and no
+	 * read view attached.
+	 */
+	@Test
+	public void testCleanupReleasedSpillablePartitionNoView() throws Exception {
+		testCleanupReleasedPartition(false, false);
+	}
+
+	/**
+	 * Tests cleanup of {@link SpillableSubpartition#release()} with a spillable partition and a
+	 * read view attached - [FLINK-8371].
+	 */
+	@Test
+	public void testCleanupReleasedSpillablePartitionWithView() throws Exception {
+		testCleanupReleasedPartition(false, true);
+	}
+
+	/**
+	 * Tests cleanup of {@link SpillableSubpartition#release()} with a spilled partition and no
+	 * read view attached.
+	 */
+	@Test
+	public void testCleanupReleasedSpilledPartitionNoView() throws Exception {
+		testCleanupReleasedPartition(true, false);
+	}
+
+	/**
+	 * Tests cleanup of {@link SpillableSubpartition#release()} with a spilled partition and a
+	 * read view attached.
+	 */
+	@Test
+	public void testCleanupReleasedSpilledPartitionWithView() throws Exception {
+		testCleanupReleasedPartition(true, true);
+	}
+
+	/**
+	 * Tests cleanup of {@link SpillableSubpartition#release()}.
+	 *
+	 * @param spilled
+	 * 		whether the partition should be spilled to disk (<tt>true</tt>) or not (<tt>false</tt>,
+	 * 		spillable)
+	 * @param createView
+	 * 		whether the partition should have a view attached to it (<tt>true</tt>) or not (<tt>false</tt>)
+	 */
+	private void testCleanupReleasedPartition(boolean spilled, boolean createView) throws Exception {
+		SpillableSubpartition partition = createSubpartition();
+
+		Buffer buffer1 = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
+			FreeingBufferRecycler.INSTANCE);
+		Buffer buffer2 = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
+			FreeingBufferRecycler.INSTANCE);
+		boolean buffer1Recycled;
+		boolean buffer2Recycled;
+		try {
+			partition.add(buffer1);
+			partition.add(buffer2);
+			// create the read view before spilling
+			// (tests both code paths since this view may then contain the spilled view)
+			ResultSubpartitionView view = null;
+			if (createView) {
+				partition.finish();
+				view = partition.createReadView(numBuffers -> {});
+			}
+			if (spilled) {
+				// note: in case we create a view, one buffer will already reside in the view and
+				//       one EndOfPartitionEvent will be added instead (so overall the number of
+				//       buffers to spill is the same
+				assertEquals(2, partition.releaseMemory());
+			}
+
+			partition.release();
+
+			assertTrue(partition.isReleased());
+			if (createView) {
+				assertTrue(view.isReleased());
+			}
+			assertTrue(buffer1.isRecycled());
+		} finally {
+			buffer1Recycled = buffer1.isRecycled();
+			if (!buffer1Recycled) {
+				buffer1.recycle();
+			}
+			buffer2Recycled = buffer2.isRecycled();
+			if (!buffer2Recycled) {
+				buffer2.recycle();
+			}
+		}
+		if (!buffer1Recycled) {
+			Assert.fail("buffer 1 not recycled");
+		}
+		if (!buffer2Recycled) {
+			Assert.fail("buffer 2 not recycled");
+		}
+		// note: in case we create a view, there will be an additional EndOfPartitionEvent
+		assertEquals(createView ? 3 : 2, partition.getTotalNumberOfBuffers());
+		assertEquals((createView ? 4 : 0) + 2 * 4096, partition.getTotalNumberOfBytes());
+	}
+
 	private static class AwaitableBufferAvailablityListener implements BufferAvailabilityListener {
 
 		private long numNotifiedBuffers;