You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by nk...@apache.org on 2018/09/19 10:28:24 UTC
[flink] 07/09: [hotfix][network][tests] use assertNextBuffer etc in
PipelinedSubpartitionTest
This is an automated email from the ASF dual-hosted git repository.
nkruber pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git
commit fb18c28388b3613a19e204f88e4c46d4c0add569
Author: Nico Kruber <ni...@data-artisans.com>
AuthorDate: Thu Sep 13 18:47:25 2018 +0200
[hotfix][network][tests] use assertNextBuffer etc in PipelinedSubpartitionTest
---
.../partition/PipelinedSubpartitionTest.java | 53 +++++++---------------
1 file changed, 16 insertions(+), 37 deletions(-)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
index fc9a643..90bdb82 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
@@ -25,7 +25,6 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
-import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
import org.apache.flink.runtime.io.network.util.TestConsumerCallback;
import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
import org.apache.flink.runtime.io.network.util.TestProducerSource;
@@ -254,7 +253,7 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
// Empty => should return null
assertFalse(view.nextBufferIsEvent());
- assertNull(view.getNextBuffer());
+ assertNoNextBuffer(view);
assertFalse(view.nextBufferIsEvent()); // also after getNextBuffer()
verify(listener, times(0)).notifyDataAvailable();
@@ -270,16 +269,10 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
verify(listener, times(1)).notifyDataAvailable();
// ...and one available result
- assertFalse(view.nextBufferIsEvent());
- BufferAndBacklog read = view.getNextBuffer();
- assertNotNull(read);
- assertTrue(read.buffer().isBuffer());
+ assertNextBuffer(view, BUFFER_SIZE, false, subpartition.getBuffersInBacklog() - 1, false, true);
assertEquals(BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
assertEquals(0, subpartition.getBuffersInBacklog());
- assertEquals(subpartition.getBuffersInBacklog(), read.buffersInBacklog());
- assertFalse(read.nextBufferIsEvent());
- assertFalse(view.nextBufferIsEvent());
- assertNull(view.getNextBuffer());
+ assertNoNextBuffer(view);
assertEquals(0, subpartition.getBuffersInBacklog());
// Add data to the queue...
@@ -291,21 +284,15 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
assertEquals(BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
verify(listener, times(2)).notifyDataAvailable();
- assertFalse(view.nextBufferIsEvent());
- read = view.getNextBuffer();
- assertNotNull(read);
- assertTrue(read.buffer().isBuffer());
+ assertNextBuffer(view, BUFFER_SIZE, false, subpartition.getBuffersInBacklog() - 1, false, true);
assertEquals(2 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
assertEquals(0, subpartition.getBuffersInBacklog());
- assertEquals(subpartition.getBuffersInBacklog(), read.buffersInBacklog());
- assertFalse(read.nextBufferIsEvent());
- assertFalse(view.nextBufferIsEvent());
- assertNull(view.getNextBuffer());
+ assertNoNextBuffer(view);
assertEquals(0, subpartition.getBuffersInBacklog());
// some tests with events
- // fill with: buffer, event , and buffer
+ // fill with: buffer, event, and buffer
subpartition.add(createFilledBufferConsumer(BUFFER_SIZE));
assertFalse(view.nextBufferIsEvent());
subpartition.add(createEventBufferConsumer(BUFFER_SIZE));
@@ -318,32 +305,24 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
assertEquals(2 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
verify(listener, times(4)).notifyDataAvailable();
- assertFalse(view.nextBufferIsEvent()); // the first buffer
- read = view.getNextBuffer();
- assertNotNull(read);
- assertTrue(read.buffer().isBuffer());
+ // the first buffer
+ assertNextBuffer(view, BUFFER_SIZE, true, subpartition.getBuffersInBacklog() - 1, true, true);
assertEquals(3 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
assertEquals(1, subpartition.getBuffersInBacklog());
- assertEquals(subpartition.getBuffersInBacklog(), read.buffersInBacklog());
- assertTrue(read.nextBufferIsEvent());
- assertTrue(view.nextBufferIsEvent()); // the event
- read = view.getNextBuffer();
- assertNotNull(read);
- assertFalse(read.buffer().isBuffer());
+ // the event
+ assertNextEvent(view, BUFFER_SIZE, null, true, subpartition.getBuffersInBacklog(), false, true);
assertEquals(4 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
assertEquals(1, subpartition.getBuffersInBacklog());
- assertEquals(subpartition.getBuffersInBacklog(), read.buffersInBacklog());
- assertFalse(read.nextBufferIsEvent());
- assertFalse(view.nextBufferIsEvent()); // the remaining buffer
- read = view.getNextBuffer();
- assertNotNull(read);
- assertTrue(read.buffer().isBuffer());
+ // the remaining buffer
+ assertNextBuffer(view, BUFFER_SIZE, false, subpartition.getBuffersInBacklog() - 1, false, true);
assertEquals(5 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
assertEquals(0, subpartition.getBuffersInBacklog());
- assertEquals(subpartition.getBuffersInBacklog(), read.buffersInBacklog());
- assertFalse(read.nextBufferIsEvent());
+
+ // nothing more
+ assertNoNextBuffer(view);
+ assertEquals(0, subpartition.getBuffersInBacklog());
assertEquals(5, subpartition.getTotalNumberOfBuffers());
assertEquals(5 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes());