You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/01/05 14:21:52 UTC
[2/5] flink git commit: [hotfix] add some more buffer recycling
checks in SpillableSubpartitionTest
[hotfix] add some more buffer recycling checks in SpillableSubpartitionTest
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7fa3b55e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7fa3b55e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7fa3b55e
Branch: refs/heads/master
Commit: 7fa3b55eaad1d7a93d2993405f1e1210e545da0b
Parents: 57cef72
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Thu Nov 23 14:59:18 2017 +0100
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Fri Jan 5 14:59:34 2018 +0100
----------------------------------------------------------------------
.../partition/SpillableSubpartitionTest.java | 48 ++++++++++++++++++--
1 file changed, 45 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/7fa3b55e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
index 2b356a8..3b5c49c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
@@ -40,8 +40,12 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
@@ -181,32 +185,53 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
partition.add(buffer);
partition.add(buffer);
+ assertFalse(buffer.isRecycled());
assertEquals(3, partition.releaseMemory());
+ // now the buffer may be freed, depending on the timing of the write operation
+ // -> let's do this check at the end of the test (to save some time)
partition.finish();
- BufferAvailabilityListener listener = mock(BufferAvailabilityListener.class);
+ BufferAvailabilityListener listener = spy(new AwaitableBufferAvailablityListener());
SpilledSubpartitionView reader = (SpilledSubpartitionView) partition.createReadView(listener);
verify(listener, times(1)).notifyBuffersAvailable(eq(4L));
Buffer read = reader.getNextBuffer();
assertNotNull(read);
+ assertNotSame(buffer, read);
+ assertFalse(read.isRecycled());
read.recycle();
+ assertTrue(read.isRecycled());
read = reader.getNextBuffer();
assertNotNull(read);
+ assertNotSame(buffer, read);
+ assertFalse(read.isRecycled());
read.recycle();
+ assertTrue(read.isRecycled());
read = reader.getNextBuffer();
assertNotNull(read);
+ assertNotSame(buffer, read);
+ assertFalse(read.isRecycled());
read.recycle();
+ assertTrue(read.isRecycled());
// End of partition
read = reader.getNextBuffer();
assertNotNull(read);
assertEquals(EndOfPartitionEvent.class, EventSerializer.fromBuffer(read, ClassLoader.getSystemClassLoader()).getClass());
+ assertFalse(read.isRecycled());
read.recycle();
+ assertTrue(read.isRecycled());
+
+ // finally check that the buffer has been freed after a successful (or failed) write
+ final long deadline = System.currentTimeMillis() + 30_000L; // 30 secs
+ while (!buffer.isRecycled() && System.currentTimeMillis() < deadline) {
+ Thread.sleep(1);
+ }
+ assertTrue(buffer.isRecycled());
}
/**
@@ -231,31 +256,48 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
// Initial notification
assertEquals(1, listener.getNumNotifiedBuffers());
+ assertFalse(buffer.isRecycled());
Buffer read = reader.getNextBuffer();
- assertNotNull(read);
+ assertSame(buffer, read);
read.recycle();
assertEquals(2, listener.getNumNotifiedBuffers());
+ assertFalse(buffer.isRecycled());
// Spill now
assertEquals(2, partition.releaseMemory());
+ assertFalse(buffer.isRecycled()); // still one in the reader!
listener.awaitNotifications(4, 30_000);
assertEquals(4, listener.getNumNotifiedBuffers());
read = reader.getNextBuffer();
- assertNotNull(read);
+ assertSame(buffer, read);
read.recycle();
+ // now the buffer may be freed, depending on the timing of the write operation
+ // -> let's do this check at the end of the test (to save some time)
read = reader.getNextBuffer();
assertNotNull(read);
+ assertNotSame(buffer, read);
+ assertFalse(read.isRecycled());
read.recycle();
+ assertTrue(read.isRecycled());
// End of partition
read = reader.getNextBuffer();
assertNotNull(read);
assertEquals(EndOfPartitionEvent.class, EventSerializer.fromBuffer(read, ClassLoader.getSystemClassLoader()).getClass());
+ assertFalse(read.isRecycled());
read.recycle();
+ assertTrue(read.isRecycled());
+
+ // finally check that the buffer has been freed after a successful (or failed) write
+ final long deadline = System.currentTimeMillis() + 30_000L; // 30 secs
+ while (!buffer.isRecycled() && System.currentTimeMillis() < deadline) {
+ Thread.sleep(1);
+ }
+ assertTrue(buffer.isRecycled());
}
private static class AwaitableBufferAvailablityListener implements BufferAvailabilityListener {