You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/01/11 10:29:35 UTC
[2/4] flink git commit: [hotfix] add some more buffer recycling
checks in SpillableSubpartitionTest
[hotfix] add some more buffer recycling checks in SpillableSubpartitionTest
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9398dafa
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9398dafa
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9398dafa
Branch: refs/heads/release-1.4
Commit: 9398dafae6918488582bc89c1ab73e3e2bd5ea16
Parents: 6f50ed2
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Thu Nov 23 14:59:18 2017 +0100
Committer: Nico Kruber <ni...@data-artisans.com>
Committed: Wed Jan 10 10:42:37 2018 +0100
----------------------------------------------------------------------
.../partition/SpillableSubpartitionTest.java | 48 ++++++++++++++++++--
1 file changed, 45 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9398dafa/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
index 2b356a8..3b5c49c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
@@ -40,8 +40,12 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
@@ -181,32 +185,53 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
partition.add(buffer);
partition.add(buffer);
+ assertFalse(buffer.isRecycled());
assertEquals(3, partition.releaseMemory());
+ // now the buffer may be freed, depending on the timing of the write operation
+ // -> let's do this check at the end of the test (to save some time)
partition.finish();
- BufferAvailabilityListener listener = mock(BufferAvailabilityListener.class);
+ BufferAvailabilityListener listener = spy(new AwaitableBufferAvailablityListener());
SpilledSubpartitionView reader = (SpilledSubpartitionView) partition.createReadView(listener);
verify(listener, times(1)).notifyBuffersAvailable(eq(4L));
Buffer read = reader.getNextBuffer();
assertNotNull(read);
+ assertNotSame(buffer, read);
+ assertFalse(read.isRecycled());
read.recycle();
+ assertTrue(read.isRecycled());
read = reader.getNextBuffer();
assertNotNull(read);
+ assertNotSame(buffer, read);
+ assertFalse(read.isRecycled());
read.recycle();
+ assertTrue(read.isRecycled());
read = reader.getNextBuffer();
assertNotNull(read);
+ assertNotSame(buffer, read);
+ assertFalse(read.isRecycled());
read.recycle();
+ assertTrue(read.isRecycled());
// End of partition
read = reader.getNextBuffer();
assertNotNull(read);
assertEquals(EndOfPartitionEvent.class, EventSerializer.fromBuffer(read, ClassLoader.getSystemClassLoader()).getClass());
+ assertFalse(read.isRecycled());
read.recycle();
+ assertTrue(read.isRecycled());
+
+ // finally check that the buffer has been freed after a successful (or failed) write
+ final long deadline = System.currentTimeMillis() + 30_000L; // 30 secs
+ while (!buffer.isRecycled() && System.currentTimeMillis() < deadline) {
+ Thread.sleep(1);
+ }
+ assertTrue(buffer.isRecycled());
}
/**
@@ -231,31 +256,48 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
// Initial notification
assertEquals(1, listener.getNumNotifiedBuffers());
+ assertFalse(buffer.isRecycled());
Buffer read = reader.getNextBuffer();
- assertNotNull(read);
+ assertSame(buffer, read);
read.recycle();
assertEquals(2, listener.getNumNotifiedBuffers());
+ assertFalse(buffer.isRecycled());
// Spill now
assertEquals(2, partition.releaseMemory());
+ assertFalse(buffer.isRecycled()); // still one in the reader!
listener.awaitNotifications(4, 30_000);
assertEquals(4, listener.getNumNotifiedBuffers());
read = reader.getNextBuffer();
- assertNotNull(read);
+ assertSame(buffer, read);
read.recycle();
+ // now the buffer may be freed, depending on the timing of the write operation
+ // -> let's do this check at the end of the test (to save some time)
read = reader.getNextBuffer();
assertNotNull(read);
+ assertNotSame(buffer, read);
+ assertFalse(read.isRecycled());
read.recycle();
+ assertTrue(read.isRecycled());
// End of partition
read = reader.getNextBuffer();
assertNotNull(read);
assertEquals(EndOfPartitionEvent.class, EventSerializer.fromBuffer(read, ClassLoader.getSystemClassLoader()).getClass());
+ assertFalse(read.isRecycled());
read.recycle();
+ assertTrue(read.isRecycled());
+
+ // finally check that the buffer has been freed after a successful (or failed) write
+ final long deadline = System.currentTimeMillis() + 30_000L; // 30 secs
+ while (!buffer.isRecycled() && System.currentTimeMillis() < deadline) {
+ Thread.sleep(1);
+ }
+ assertTrue(buffer.isRecycled());
}
private static class AwaitableBufferAvailablityListener implements BufferAvailabilityListener {