You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by nk...@apache.org on 2018/09/19 19:32:51 UTC

[flink] 06/11: [hotfix][network][tests] add readView.nextBufferIsEvent to assertNextBufferOrEvent()

This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d9c49c17491a0a18eb267fa476c98e5359a560a6
Author: Nico Kruber <ni...@data-artisans.com>
AuthorDate: Thu Sep 13 18:44:49 2018 +0200

    [hotfix][network][tests] add readView.nextBufferIsEvent to assertNextBufferOrEvent()
---
 .../network/partition/SpillableSubpartitionTest.java | 20 ++++++--------------
 .../io/network/partition/SubpartitionTestBase.java   |  2 ++
 2 files changed, 8 insertions(+), 14 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
index 817795c..57d2cd6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
@@ -228,24 +228,20 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 		SpilledSubpartitionView reader = (SpilledSubpartitionView) partition.createReadView(listener);
 
 		assertEquals(1, listener.getNumNotifications());
-
 		assertFalse(reader.nextBufferIsEvent()); // buffer
+
 		assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 2, false, true);
 		assertEquals(2, partition.getBuffersInBacklog());
 
-		assertFalse(reader.nextBufferIsEvent()); // buffer
 		assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 1, true, true);
 		assertEquals(1, partition.getBuffersInBacklog());
 
-		assertTrue(reader.nextBufferIsEvent()); // event
 		assertNextEvent(reader, eventSize, CancelCheckpointMarker.class, true, 1, false, true);
 		assertEquals(1, partition.getBuffersInBacklog());
 
-		assertFalse(reader.nextBufferIsEvent()); // buffer
 		assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 0, true, true);
 		assertEquals(0, partition.getBuffersInBacklog());
 
-		assertTrue(reader.nextBufferIsEvent()); // end of partition event
 		assertNextEvent(reader, 4, EndOfPartitionEvent.class, false, 0, false, true);
 		assertEquals(0, partition.getBuffersInBacklog());
 
@@ -314,24 +310,20 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 		SpilledSubpartitionView reader = (SpilledSubpartitionView) partition.createReadView(listener);
 
 		assertEquals(1, listener.getNumNotifications());
-
 		assertFalse(reader.nextBufferIsEvent()); // full buffer
+
 		assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 2, false, true);
 		assertEquals(2, partition.getBuffersInBacklog());
 
-		assertFalse(reader.nextBufferIsEvent()); // full buffer
 		assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 1, true, true);
 		assertEquals(1, partition.getBuffersInBacklog());
 
-		assertTrue(reader.nextBufferIsEvent()); // event
 		assertNextEvent(reader, eventSize, CancelCheckpointMarker.class, true, 1, false, true);
 		assertEquals(1, partition.getBuffersInBacklog());
 
-		assertFalse(reader.nextBufferIsEvent()); // partial buffer
 		assertNextBuffer(reader, BUFFER_DATA_SIZE / 2, true, 0, true, true);
 		assertEquals(0, partition.getBuffersInBacklog());
 
-		assertTrue(reader.nextBufferIsEvent()); // end of partition event
 		assertNextEvent(reader, 4, EndOfPartitionEvent.class, false, 0, false, true);
 		assertEquals(0, partition.getBuffersInBacklog());
 
@@ -370,6 +362,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 		assertFalse(bufferConsumer.isRecycled());
 
 		assertFalse(reader.nextBufferIsEvent());
+
 		// first buffer (non-spilled)
 		assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 2, false, false);
 		assertEquals(BUFFER_DATA_SIZE, partition.getTotalNumberOfBytes()); // only updated when getting/spilling the buffers
@@ -397,19 +390,19 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 		Buffer buffer = bufferConsumer.build();
 		buffer.retainBuffer();
 
-		assertFalse(reader.nextBufferIsEvent()); // second buffer (retained in SpillableSubpartition#nextBuffer)
+		// second buffer (retained in SpillableSubpartition#nextBuffer)
 		assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 1, true, false);
 		assertEquals(BUFFER_DATA_SIZE * 3 + eventSize + 4, partition.getTotalNumberOfBytes()); // finally integrates the nextBuffer statistics
 		assertEquals(1, partition.getBuffersInBacklog());
 
 		bufferConsumer.close(); // recycle the retained buffer from above (should be the last reference!)
 
-		assertTrue(reader.nextBufferIsEvent()); // the event (spilled)
+		// the event (spilled)
 		assertNextEvent(reader, eventSize, CancelCheckpointMarker.class, true, 1, false, true);
 		assertEquals(BUFFER_DATA_SIZE * 3 + eventSize + 4, partition.getTotalNumberOfBytes()); // already updated during spilling
 		assertEquals(1, partition.getBuffersInBacklog());
 
-		assertFalse(reader.nextBufferIsEvent()); // last buffer (spilled)
+		// last buffer (spilled)
 		assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 0, true, true);
 		assertEquals(BUFFER_DATA_SIZE * 3 + eventSize + 4, partition.getTotalNumberOfBytes()); // already updated during spilling
 		assertEquals(0, partition.getBuffersInBacklog());
@@ -418,7 +411,6 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 		assertTrue(buffer.isRecycled());
 
 		// End of partition
-		assertTrue(reader.nextBufferIsEvent());
 		assertNextEvent(reader, 4, EndOfPartitionEvent.class, false, 0, false, true);
 		assertEquals(BUFFER_DATA_SIZE * 3 + eventSize + 4, partition.getTotalNumberOfBytes()); // already updated during spilling
 		assertEquals(0, partition.getBuffersInBacklog());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
index 8c90215..5989cf8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
@@ -207,6 +207,8 @@ public abstract class SubpartitionTestBase extends TestLogger {
 			assertEquals("backlog", expectedBuffersInBacklog, bufferAndBacklog.buffersInBacklog());
 			assertEquals("next is event", expectedNextBufferIsEvent,
 				bufferAndBacklog.nextBufferIsEvent());
+			assertEquals("next is event", expectedNextBufferIsEvent,
+				readView.nextBufferIsEvent());
 
 			assertFalse("not recycled", bufferAndBacklog.buffer().isRecycled());
 		} finally {