You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by nk...@apache.org on 2018/09/19 10:28:23 UTC
[flink] 06/09: [hotfix][network][tests] add
readView.nextBufferIsEvent to assertNextBufferOrEvent()
This is an automated email from the ASF dual-hosted git repository.
nkruber pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 1abe6aa0940321891233f60905a8152d9ca9c5e9
Author: Nico Kruber <ni...@data-artisans.com>
AuthorDate: Thu Sep 13 18:44:49 2018 +0200
[hotfix][network][tests] add readView.nextBufferIsEvent to assertNextBufferOrEvent()
---
.../network/partition/SpillableSubpartitionTest.java | 20 ++++++--------------
.../io/network/partition/SubpartitionTestBase.java | 2 ++
2 files changed, 8 insertions(+), 14 deletions(-)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
index 817795c..57d2cd6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
@@ -228,24 +228,20 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
SpilledSubpartitionView reader = (SpilledSubpartitionView) partition.createReadView(listener);
assertEquals(1, listener.getNumNotifications());
-
assertFalse(reader.nextBufferIsEvent()); // buffer
+
assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 2, false, true);
assertEquals(2, partition.getBuffersInBacklog());
- assertFalse(reader.nextBufferIsEvent()); // buffer
assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 1, true, true);
assertEquals(1, partition.getBuffersInBacklog());
- assertTrue(reader.nextBufferIsEvent()); // event
assertNextEvent(reader, eventSize, CancelCheckpointMarker.class, true, 1, false, true);
assertEquals(1, partition.getBuffersInBacklog());
- assertFalse(reader.nextBufferIsEvent()); // buffer
assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 0, true, true);
assertEquals(0, partition.getBuffersInBacklog());
- assertTrue(reader.nextBufferIsEvent()); // end of partition event
assertNextEvent(reader, 4, EndOfPartitionEvent.class, false, 0, false, true);
assertEquals(0, partition.getBuffersInBacklog());
@@ -314,24 +310,20 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
SpilledSubpartitionView reader = (SpilledSubpartitionView) partition.createReadView(listener);
assertEquals(1, listener.getNumNotifications());
-
assertFalse(reader.nextBufferIsEvent()); // full buffer
+
assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 2, false, true);
assertEquals(2, partition.getBuffersInBacklog());
- assertFalse(reader.nextBufferIsEvent()); // full buffer
assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 1, true, true);
assertEquals(1, partition.getBuffersInBacklog());
- assertTrue(reader.nextBufferIsEvent()); // event
assertNextEvent(reader, eventSize, CancelCheckpointMarker.class, true, 1, false, true);
assertEquals(1, partition.getBuffersInBacklog());
- assertFalse(reader.nextBufferIsEvent()); // partial buffer
assertNextBuffer(reader, BUFFER_DATA_SIZE / 2, true, 0, true, true);
assertEquals(0, partition.getBuffersInBacklog());
- assertTrue(reader.nextBufferIsEvent()); // end of partition event
assertNextEvent(reader, 4, EndOfPartitionEvent.class, false, 0, false, true);
assertEquals(0, partition.getBuffersInBacklog());
@@ -370,6 +362,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
assertFalse(bufferConsumer.isRecycled());
assertFalse(reader.nextBufferIsEvent());
+
// first buffer (non-spilled)
assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 2, false, false);
assertEquals(BUFFER_DATA_SIZE, partition.getTotalNumberOfBytes()); // only updated when getting/spilling the buffers
@@ -397,19 +390,19 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
Buffer buffer = bufferConsumer.build();
buffer.retainBuffer();
- assertFalse(reader.nextBufferIsEvent()); // second buffer (retained in SpillableSubpartition#nextBuffer)
+ // second buffer (retained in SpillableSubpartition#nextBuffer)
assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 1, true, false);
assertEquals(BUFFER_DATA_SIZE * 3 + eventSize + 4, partition.getTotalNumberOfBytes()); // finally integrates the nextBuffer statistics
assertEquals(1, partition.getBuffersInBacklog());
bufferConsumer.close(); // recycle the retained buffer from above (should be the last reference!)
- assertTrue(reader.nextBufferIsEvent()); // the event (spilled)
+ // the event (spilled)
assertNextEvent(reader, eventSize, CancelCheckpointMarker.class, true, 1, false, true);
assertEquals(BUFFER_DATA_SIZE * 3 + eventSize + 4, partition.getTotalNumberOfBytes()); // already updated during spilling
assertEquals(1, partition.getBuffersInBacklog());
- assertFalse(reader.nextBufferIsEvent()); // last buffer (spilled)
+ // last buffer (spilled)
assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 0, true, true);
assertEquals(BUFFER_DATA_SIZE * 3 + eventSize + 4, partition.getTotalNumberOfBytes()); // already updated during spilling
assertEquals(0, partition.getBuffersInBacklog());
@@ -418,7 +411,6 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
assertTrue(buffer.isRecycled());
// End of partition
- assertTrue(reader.nextBufferIsEvent());
assertNextEvent(reader, 4, EndOfPartitionEvent.class, false, 0, false, true);
assertEquals(BUFFER_DATA_SIZE * 3 + eventSize + 4, partition.getTotalNumberOfBytes()); // already updated during spilling
assertEquals(0, partition.getBuffersInBacklog());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
index 8c90215..5989cf8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
@@ -207,6 +207,8 @@ public abstract class SubpartitionTestBase extends TestLogger {
assertEquals("backlog", expectedBuffersInBacklog, bufferAndBacklog.buffersInBacklog());
assertEquals("next is event", expectedNextBufferIsEvent,
bufferAndBacklog.nextBufferIsEvent());
+ assertEquals("next is event", expectedNextBufferIsEvent,
+ readView.nextBufferIsEvent());
assertFalse("not recycled", bufferAndBacklog.buffer().isRecycled());
} finally {