You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by nk...@apache.org on 2018/09/19 10:28:24 UTC

[flink] 07/09: [hotfix][network][tests] use assertNextBuffer etc in PipelinedSubpartitionTest

This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fb18c28388b3613a19e204f88e4c46d4c0add569
Author: Nico Kruber <ni...@data-artisans.com>
AuthorDate: Thu Sep 13 18:47:25 2018 +0200

    [hotfix][network][tests] use assertNextBuffer etc in PipelinedSubpartitionTest
---
 .../partition/PipelinedSubpartitionTest.java       | 53 +++++++---------------
 1 file changed, 16 insertions(+), 37 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
index fc9a643..90bdb82 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
@@ -25,7 +25,6 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
-import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
 import org.apache.flink.runtime.io.network.util.TestConsumerCallback;
 import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
 import org.apache.flink.runtime.io.network.util.TestProducerSource;
@@ -254,7 +253,7 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
 
 		// Empty => should return null
 		assertFalse(view.nextBufferIsEvent());
-		assertNull(view.getNextBuffer());
+		assertNoNextBuffer(view);
 		assertFalse(view.nextBufferIsEvent()); // also after getNextBuffer()
 		verify(listener, times(0)).notifyDataAvailable();
 
@@ -270,16 +269,10 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
 		verify(listener, times(1)).notifyDataAvailable();
 
 		// ...and one available result
-		assertFalse(view.nextBufferIsEvent());
-		BufferAndBacklog read = view.getNextBuffer();
-		assertNotNull(read);
-		assertTrue(read.buffer().isBuffer());
+		assertNextBuffer(view, BUFFER_SIZE, false, subpartition.getBuffersInBacklog() - 1, false, true);
 		assertEquals(BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
 		assertEquals(0, subpartition.getBuffersInBacklog());
-		assertEquals(subpartition.getBuffersInBacklog(), read.buffersInBacklog());
-		assertFalse(read.nextBufferIsEvent());
-		assertFalse(view.nextBufferIsEvent());
-		assertNull(view.getNextBuffer());
+		assertNoNextBuffer(view);
 		assertEquals(0, subpartition.getBuffersInBacklog());
 
 		// Add data to the queue...
@@ -291,21 +284,15 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
 		assertEquals(BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
 		verify(listener, times(2)).notifyDataAvailable();
 
-		assertFalse(view.nextBufferIsEvent());
-		read = view.getNextBuffer();
-		assertNotNull(read);
-		assertTrue(read.buffer().isBuffer());
+		assertNextBuffer(view, BUFFER_SIZE, false, subpartition.getBuffersInBacklog() - 1, false, true);
 		assertEquals(2 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
 		assertEquals(0, subpartition.getBuffersInBacklog());
-		assertEquals(subpartition.getBuffersInBacklog(), read.buffersInBacklog());
-		assertFalse(read.nextBufferIsEvent());
-		assertFalse(view.nextBufferIsEvent());
-		assertNull(view.getNextBuffer());
+		assertNoNextBuffer(view);
 		assertEquals(0, subpartition.getBuffersInBacklog());
 
 		// some tests with events
 
-		// fill with: buffer, event , and buffer
+		// fill with: buffer, event, and buffer
 		subpartition.add(createFilledBufferConsumer(BUFFER_SIZE));
 		assertFalse(view.nextBufferIsEvent());
 		subpartition.add(createEventBufferConsumer(BUFFER_SIZE));
@@ -318,32 +305,24 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
 		assertEquals(2 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
 		verify(listener, times(4)).notifyDataAvailable();
 
-		assertFalse(view.nextBufferIsEvent()); // the first buffer
-		read = view.getNextBuffer();
-		assertNotNull(read);
-		assertTrue(read.buffer().isBuffer());
+		// the first buffer
+		assertNextBuffer(view, BUFFER_SIZE, true, subpartition.getBuffersInBacklog() - 1, true, true);
 		assertEquals(3 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
 		assertEquals(1, subpartition.getBuffersInBacklog());
-		assertEquals(subpartition.getBuffersInBacklog(), read.buffersInBacklog());
-		assertTrue(read.nextBufferIsEvent());
 
-		assertTrue(view.nextBufferIsEvent()); // the event
-		read = view.getNextBuffer();
-		assertNotNull(read);
-		assertFalse(read.buffer().isBuffer());
+		// the event
+		assertNextEvent(view, BUFFER_SIZE, null, true, subpartition.getBuffersInBacklog(), false, true);
 		assertEquals(4 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
 		assertEquals(1, subpartition.getBuffersInBacklog());
-		assertEquals(subpartition.getBuffersInBacklog(), read.buffersInBacklog());
-		assertFalse(read.nextBufferIsEvent());
 
-		assertFalse(view.nextBufferIsEvent()); // the remaining buffer
-		read = view.getNextBuffer();
-		assertNotNull(read);
-		assertTrue(read.buffer().isBuffer());
+		// the remaining buffer
+		assertNextBuffer(view, BUFFER_SIZE, false, subpartition.getBuffersInBacklog() - 1, false, true);
 		assertEquals(5 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
 		assertEquals(0, subpartition.getBuffersInBacklog());
-		assertEquals(subpartition.getBuffersInBacklog(), read.buffersInBacklog());
-		assertFalse(read.nextBufferIsEvent());
+
+		// nothing more
+		assertNoNextBuffer(view);
+		assertEquals(0, subpartition.getBuffersInBacklog());
 
 		assertEquals(5, subpartition.getTotalNumberOfBuffers());
 		assertEquals(5 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes());