You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/01/11 10:29:35 UTC

[2/4] flink git commit: [hotfix] add some more buffer recycling checks in SpillableSubpartitionTest

[hotfix] add some more buffer recycling checks in SpillableSubpartitionTest


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9398dafa
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9398dafa
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9398dafa

Branch: refs/heads/release-1.4
Commit: 9398dafae6918488582bc89c1ab73e3e2bd5ea16
Parents: 6f50ed2
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Thu Nov 23 14:59:18 2017 +0100
Committer: Nico Kruber <ni...@data-artisans.com>
Committed: Wed Jan 10 10:42:37 2018 +0100

----------------------------------------------------------------------
 .../partition/SpillableSubpartitionTest.java    | 48 ++++++++++++++++++--
 1 file changed, 45 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9398dafa/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
index 2b356a8..3b5c49c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
@@ -40,8 +40,12 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doAnswer;
@@ -181,32 +185,53 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 		partition.add(buffer);
 		partition.add(buffer);
 
+		assertFalse(buffer.isRecycled());
 		assertEquals(3, partition.releaseMemory());
+		// now the buffer may be freed, depending on the timing of the write operation
+		// -> let's do this check at the end of the test (to save some time)
 
 		partition.finish();
 
-		BufferAvailabilityListener listener = mock(BufferAvailabilityListener.class);
+		BufferAvailabilityListener listener = spy(new AwaitableBufferAvailablityListener());
 		SpilledSubpartitionView reader = (SpilledSubpartitionView) partition.createReadView(listener);
 
 		verify(listener, times(1)).notifyBuffersAvailable(eq(4L));
 
 		Buffer read = reader.getNextBuffer();
 		assertNotNull(read);
+		assertNotSame(buffer, read);
+		assertFalse(read.isRecycled());
 		read.recycle();
+		assertTrue(read.isRecycled());
 
 		read = reader.getNextBuffer();
 		assertNotNull(read);
+		assertNotSame(buffer, read);
+		assertFalse(read.isRecycled());
 		read.recycle();
+		assertTrue(read.isRecycled());
 
 		read = reader.getNextBuffer();
 		assertNotNull(read);
+		assertNotSame(buffer, read);
+		assertFalse(read.isRecycled());
 		read.recycle();
+		assertTrue(read.isRecycled());
 
 		// End of partition
 		read = reader.getNextBuffer();
 		assertNotNull(read);
 		assertEquals(EndOfPartitionEvent.class, EventSerializer.fromBuffer(read, ClassLoader.getSystemClassLoader()).getClass());
+		assertFalse(read.isRecycled());
 		read.recycle();
+		assertTrue(read.isRecycled());
+
+		// finally check that the buffer has been freed after a successful (or failed) write
+		final long deadline = System.currentTimeMillis() + 30_000L; // 30 secs
+		while (!buffer.isRecycled() && System.currentTimeMillis() < deadline) {
+			Thread.sleep(1);
+		}
+		assertTrue(buffer.isRecycled());
 	}
 
 	/**
@@ -231,31 +256,48 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 
 		// Initial notification
 		assertEquals(1, listener.getNumNotifiedBuffers());
+		assertFalse(buffer.isRecycled());
 
 		Buffer read = reader.getNextBuffer();
-		assertNotNull(read);
+		assertSame(buffer, read);
 		read.recycle();
 		assertEquals(2, listener.getNumNotifiedBuffers());
+		assertFalse(buffer.isRecycled());
 
 		// Spill now
 		assertEquals(2, partition.releaseMemory());
+		assertFalse(buffer.isRecycled()); // still one in the reader!
 
 		listener.awaitNotifications(4, 30_000);
 		assertEquals(4, listener.getNumNotifiedBuffers());
 
 		read = reader.getNextBuffer();
-		assertNotNull(read);
+		assertSame(buffer, read);
 		read.recycle();
+		// now the buffer may be freed, depending on the timing of the write operation
+		// -> let's do this check at the end of the test (to save some time)
 
 		read = reader.getNextBuffer();
 		assertNotNull(read);
+		assertNotSame(buffer, read);
+		assertFalse(read.isRecycled());
 		read.recycle();
+		assertTrue(read.isRecycled());
 
 		// End of partition
 		read = reader.getNextBuffer();
 		assertNotNull(read);
 		assertEquals(EndOfPartitionEvent.class, EventSerializer.fromBuffer(read, ClassLoader.getSystemClassLoader()).getClass());
+		assertFalse(read.isRecycled());
 		read.recycle();
+		assertTrue(read.isRecycled());
+
+		// finally check that the buffer has been freed after a successful (or failed) write
+		final long deadline = System.currentTimeMillis() + 30_000L; // 30 secs
+		while (!buffer.isRecycled() && System.currentTimeMillis() < deadline) {
+			Thread.sleep(1);
+		}
+		assertTrue(buffer.isRecycled());
 	}
 
 	private static class AwaitableBufferAvailablityListener implements BufferAvailabilityListener {