You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by nk...@apache.org on 2018/09/19 10:28:26 UTC
[flink] 09/09: [hotfix][network][tests] split
PipelinedSubpartitionTest for better initialization
This is an automated email from the ASF dual-hosted git repository.
nkruber pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 915db25ea691258d4eae7ffff04580946a7c8afd
Author: Nico Kruber <ni...@data-artisans.com>
AuthorDate: Tue Sep 18 12:10:21 2018 +0200
[hotfix][network][tests] split PipelinedSubpartitionTest for better initialization
- add PipelinedSubpartitionWithReadViewTest which always creates a subpartition,
an availability listener, and a read view before each test and cleans up after
each test
- remove mockito use from testBasicPipelinedProduceConsumeLogic()
---
.../partition/PipelinedSubpartitionTest.java | 314 +--------------------
.../PipelinedSubpartitionWithReadViewTest.java | 276 ++++++++++++++++++
.../io/network/partition/SubpartitionTestBase.java | 2 +-
3 files changed, 292 insertions(+), 300 deletions(-)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
index b75bb7a..82f61ab 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
@@ -40,19 +40,12 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder;
-import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createEventBufferConsumer;
-import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferBuilder;
import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferConsumer;
-import static org.apache.flink.runtime.io.network.util.TestBufferFactory.BUFFER_SIZE;
import static org.apache.flink.util.FutureUtil.waitForAll;
import static org.apache.flink.util.Preconditions.checkState;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.greaterThan;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
@@ -62,6 +55,8 @@ import static org.mockito.Mockito.when;
/**
* Tests for {@link PipelinedSubpartition}.
+ *
+ * @see PipelinedSubpartitionWithReadViewTest
*/
public class PipelinedSubpartitionTest extends SubpartitionTestBase {
@@ -80,189 +75,6 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
return new PipelinedSubpartition(0, parent);
}
- @Test(expected = IllegalStateException.class)
- public void testAddTwoNonFinishedBuffer() throws Exception {
- final ResultSubpartition subpartition = createSubpartition();
- AwaitableBufferAvailablityListener availablityListener = new AwaitableBufferAvailablityListener();
- ResultSubpartitionView readView = subpartition.createReadView(availablityListener);
- availablityListener.resetNotificationCounters();
-
- try {
- subpartition.add(createBufferBuilder().createBufferConsumer());
- subpartition.add(createBufferBuilder().createBufferConsumer());
- assertNull(readView.getNextBuffer());
- } finally {
- subpartition.release();
- }
- }
-
- @Test
- public void testAddEmptyNonFinishedBuffer() throws Exception {
- final ResultSubpartition subpartition = createSubpartition();
- AwaitableBufferAvailablityListener availablityListener = new AwaitableBufferAvailablityListener();
- ResultSubpartitionView readView = subpartition.createReadView(availablityListener);
- availablityListener.resetNotificationCounters();
-
- try {
- assertEquals(0, availablityListener.getNumNotifications());
-
- BufferBuilder bufferBuilder = createBufferBuilder();
- subpartition.add(bufferBuilder.createBufferConsumer());
-
- assertEquals(0, availablityListener.getNumNotifications());
- assertNull(readView.getNextBuffer());
-
- bufferBuilder.finish();
- bufferBuilder = createBufferBuilder();
- subpartition.add(bufferBuilder.createBufferConsumer());
-
- assertEquals(1, availablityListener.getNumNotifications()); // notification from finishing previous buffer.
- assertNull(readView.getNextBuffer());
- assertEquals(1, subpartition.getBuffersInBacklog());
- } finally {
- readView.releaseAllResources();
- subpartition.release();
- }
- }
-
- @Test
- public void testAddNonEmptyNotFinishedBuffer() throws Exception {
- final ResultSubpartition subpartition = createSubpartition();
- AwaitableBufferAvailablityListener availablityListener = new AwaitableBufferAvailablityListener();
- ResultSubpartitionView readView = subpartition.createReadView(availablityListener);
- availablityListener.resetNotificationCounters();
-
- try {
- assertEquals(0, availablityListener.getNumNotifications());
-
- BufferBuilder bufferBuilder = createBufferBuilder();
- bufferBuilder.appendAndCommit(ByteBuffer.allocate(1024));
- subpartition.add(bufferBuilder.createBufferConsumer());
-
- // note that since the buffer builder is not finished, there is still a retained instance!
- assertNextBuffer(readView, 1024, false, 1, false, false);
- assertEquals(1, subpartition.getBuffersInBacklog());
- } finally {
- readView.releaseAllResources();
- subpartition.release();
- }
- }
-
- /**
- * Normally moreAvailable flag from InputChannel should ignore non finished BufferConsumers, otherwise we would
- * busy loop on the unfinished BufferConsumers.
- */
- @Test
- public void testUnfinishedBufferBehindFinished() throws Exception {
- final ResultSubpartition subpartition = createSubpartition();
- AwaitableBufferAvailablityListener availablityListener = new AwaitableBufferAvailablityListener();
- ResultSubpartitionView readView = subpartition.createReadView(availablityListener);
-
- try {
- subpartition.add(createFilledBufferConsumer(1025)); // finished
- subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // not finished
-
- assertThat(availablityListener.getNumNotifications(), greaterThan(0L));
- assertNextBuffer(readView, 1025, false, 1, false, true);
- // not notified, but we could still access the unfinished buffer
- assertNextBuffer(readView, 1024, false, 1, false, false);
- assertNoNextBuffer(readView);
- } finally {
- subpartition.release();
- }
- }
-
- /**
- * After flush call unfinished BufferConsumers should be reported as available, otherwise we might not flush some
- * of the data.
- */
- @Test
- public void testFlushWithUnfinishedBufferBehindFinished() throws Exception {
- final ResultSubpartition subpartition = createSubpartition();
- AwaitableBufferAvailablityListener availablityListener = new AwaitableBufferAvailablityListener();
- ResultSubpartitionView readView = subpartition.createReadView(availablityListener);
-
- try {
- subpartition.add(createFilledBufferConsumer(1025)); // finished
- subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // not finished
- long oldNumNotifications = availablityListener.getNumNotifications();
- subpartition.flush();
- // buffer queue is > 1, should already be notified, no further notification necessary
- assertThat(oldNumNotifications, greaterThan(0L));
- assertEquals(oldNumNotifications, availablityListener.getNumNotifications());
-
- assertNextBuffer(readView, 1025, true, 1, false, true);
- assertNextBuffer(readView, 1024, false, 1, false, false);
- assertNoNextBuffer(readView);
- } finally {
- subpartition.release();
- }
- }
-
- /**
- * A flush call with a buffer size of 1 should always notify consumers (unless already flushed).
- */
- @Test
- public void testFlushWithUnfinishedBufferBehindFinished2() throws Exception {
- final ResultSubpartition subpartition = createSubpartition();
- AwaitableBufferAvailablityListener availablityListener = new AwaitableBufferAvailablityListener();
- ResultSubpartitionView readView = subpartition.createReadView(availablityListener);
-
- try {
- // no buffers -> no notification or any other effects
- subpartition.flush();
- assertEquals(0, availablityListener.getNumNotifications());
-
- subpartition.add(createFilledBufferConsumer(1025)); // finished
- subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // not finished
-
- assertNextBuffer(readView, 1025, false, 1, false, true);
-
- long oldNumNotifications = availablityListener.getNumNotifications();
- subpartition.flush();
- // buffer queue is 1 again -> need to flush
- assertEquals(oldNumNotifications + 1, availablityListener.getNumNotifications());
- subpartition.flush();
- // calling again should not flush again
- assertEquals(oldNumNotifications + 1, availablityListener.getNumNotifications());
-
- assertNextBuffer(readView, 1024, false, 1, false, false);
- assertNoNextBuffer(readView);
- } finally {
- subpartition.release();
- }
- }
-
- @Test
- public void testMultipleEmptyBuffers() throws Exception {
- final ResultSubpartition subpartition = createSubpartition();
- AwaitableBufferAvailablityListener availablityListener = new AwaitableBufferAvailablityListener();
- ResultSubpartitionView readView = subpartition.createReadView(availablityListener);
- availablityListener.resetNotificationCounters();
-
- try {
- assertEquals(0, availablityListener.getNumNotifications());
-
- subpartition.add(createFilledBufferConsumer(0));
-
- assertEquals(1, availablityListener.getNumNotifications());
- subpartition.add(createFilledBufferConsumer(0));
- assertEquals(2, availablityListener.getNumNotifications());
-
- subpartition.add(createFilledBufferConsumer(0));
- assertEquals(2, availablityListener.getNumNotifications());
- assertEquals(3, subpartition.getBuffersInBacklog());
-
- subpartition.add(createFilledBufferConsumer(1024));
- assertEquals(2, availablityListener.getNumNotifications());
-
- assertNextBuffer(readView, 1024, false, 0, false, true);
- } finally {
- readView.releaseAllResources();
- subpartition.release();
- }
- }
-
@Test
public void testIllegalReadViewRequest() throws Exception {
final PipelinedSubpartition subpartition = createSubpartition();
@@ -278,100 +90,23 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
}
}
+ /**
+ * Verifies that the isReleased() check of the view checks the parent
+ * subpartition.
+ */
@Test
- public void testEmptyFlush() throws Exception {
- final PipelinedSubpartition subpartition = createSubpartition();
+ public void testIsReleasedChecksParent() {
+ PipelinedSubpartition subpartition = mock(PipelinedSubpartition.class);
- AwaitableBufferAvailablityListener listener = new AwaitableBufferAvailablityListener();
- subpartition.createReadView(listener);
- subpartition.flush();
- assertEquals(0, listener.getNumNotifications());
- }
+ PipelinedSubpartitionView reader = new PipelinedSubpartitionView(
+ subpartition, mock(BufferAvailabilityListener.class));
- @Test
- public void testBasicPipelinedProduceConsumeLogic() throws Exception {
- final PipelinedSubpartition subpartition = createSubpartition();
+ assertFalse(reader.isReleased());
+ verify(subpartition, times(1)).isReleased();
- BufferAvailabilityListener listener = mock(BufferAvailabilityListener.class);
-
- ResultSubpartitionView view = subpartition.createReadView(listener);
-
- // Empty => should return null
- assertFalse(view.nextBufferIsEvent());
- assertNoNextBuffer(view);
- assertFalse(view.nextBufferIsEvent()); // also after getNextBuffer()
- verify(listener, times(0)).notifyDataAvailable();
-
- // Add data to the queue...
- subpartition.add(createFilledBufferConsumer(BUFFER_SIZE));
- assertFalse(view.nextBufferIsEvent());
-
- assertEquals(1, subpartition.getTotalNumberOfBuffers());
- assertEquals(1, subpartition.getBuffersInBacklog());
- assertEquals(0, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
-
- // ...should have resulted in a notification
- verify(listener, times(1)).notifyDataAvailable();
-
- // ...and one available result
- assertNextBuffer(view, BUFFER_SIZE, false, subpartition.getBuffersInBacklog() - 1, false, true);
- assertEquals(BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
- assertEquals(0, subpartition.getBuffersInBacklog());
- assertNoNextBuffer(view);
- assertEquals(0, subpartition.getBuffersInBacklog());
-
- // Add data to the queue...
- subpartition.add(createFilledBufferConsumer(BUFFER_SIZE));
- assertFalse(view.nextBufferIsEvent());
-
- assertEquals(2, subpartition.getTotalNumberOfBuffers());
- assertEquals(1, subpartition.getBuffersInBacklog());
- assertEquals(BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
- verify(listener, times(2)).notifyDataAvailable();
-
- assertNextBuffer(view, BUFFER_SIZE, false, subpartition.getBuffersInBacklog() - 1, false, true);
- assertEquals(2 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
- assertEquals(0, subpartition.getBuffersInBacklog());
- assertNoNextBuffer(view);
- assertEquals(0, subpartition.getBuffersInBacklog());
-
- // some tests with events
-
- // fill with: buffer, event, and buffer
- subpartition.add(createFilledBufferConsumer(BUFFER_SIZE));
- assertFalse(view.nextBufferIsEvent());
- subpartition.add(createEventBufferConsumer(BUFFER_SIZE));
- assertFalse(view.nextBufferIsEvent());
- subpartition.add(createFilledBufferConsumer(BUFFER_SIZE));
- assertFalse(view.nextBufferIsEvent());
-
- assertEquals(5, subpartition.getTotalNumberOfBuffers());
- assertEquals(2, subpartition.getBuffersInBacklog()); // two buffers (events don't count)
- assertEquals(2 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
- verify(listener, times(4)).notifyDataAvailable();
-
- // the first buffer
- assertNextBuffer(view, BUFFER_SIZE, true, subpartition.getBuffersInBacklog() - 1, true, true);
- assertEquals(3 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
- assertEquals(1, subpartition.getBuffersInBacklog());
-
- // the event
- assertNextEvent(view, BUFFER_SIZE, null, true, subpartition.getBuffersInBacklog(), false, true);
- assertEquals(4 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
- assertEquals(1, subpartition.getBuffersInBacklog());
-
- // the remaining buffer
- assertNextBuffer(view, BUFFER_SIZE, false, subpartition.getBuffersInBacklog() - 1, false, true);
- assertEquals(5 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
- assertEquals(0, subpartition.getBuffersInBacklog());
-
- // nothing more
- assertNoNextBuffer(view);
- assertEquals(0, subpartition.getBuffersInBacklog());
-
- assertEquals(5, subpartition.getTotalNumberOfBuffers());
- assertEquals(5 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes());
- verify(listener, times(4)).notifyDataAvailable();
+ when(subpartition.isReleased()).thenReturn(true);
+ assertTrue(reader.isReleased());
+ verify(subpartition, times(2)).isReleased();
}
@Test
@@ -394,25 +129,6 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
testProduceConsume(true, true);
}
- /**
- * Verifies that the isReleased() check of the view checks the parent
- * subpartition.
- */
- @Test
- public void testIsReleasedChecksParent() throws Exception {
- PipelinedSubpartition subpartition = mock(PipelinedSubpartition.class);
-
- PipelinedSubpartitionView reader = new PipelinedSubpartitionView(
- subpartition, mock(BufferAvailabilityListener.class));
-
- assertFalse(reader.isReleased());
- verify(subpartition, times(1)).isReleased();
-
- when(subpartition.isReleased()).thenReturn(true);
- assertTrue(reader.isReleased());
- verify(subpartition, times(2)).isReleased();
- }
-
private void testProduceConsume(boolean isSlowProducer, boolean isSlowConsumer) throws Exception {
// Config
final int producerBufferPoolSize = 8;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.java
new file mode 100644
index 0000000..6f9920e
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder;
+import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createEventBufferConsumer;
+import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferBuilder;
+import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferConsumer;
+import static org.apache.flink.runtime.io.network.partition.SubpartitionTestBase.assertNextBuffer;
+import static org.apache.flink.runtime.io.network.partition.SubpartitionTestBase.assertNextEvent;
+import static org.apache.flink.runtime.io.network.partition.SubpartitionTestBase.assertNoNextBuffer;
+import static org.apache.flink.runtime.io.network.util.TestBufferFactory.BUFFER_SIZE;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Additional tests for {@link PipelinedSubpartition} which require an availability listener and a
+ * read view.
+ *
+ * @see PipelinedSubpartitionTest
+ */
+public class PipelinedSubpartitionWithReadViewTest {
+
+ private PipelinedSubpartition subpartition;
+ private AwaitableBufferAvailablityListener availablityListener;
+ private PipelinedSubpartitionView readView;
+
+ @Before
+ public void setup() throws IOException {
+ final ResultPartition parent = mock(ResultPartition.class);
+ subpartition = new PipelinedSubpartition(0, parent);
+ availablityListener = new AwaitableBufferAvailablityListener();
+ readView = subpartition.createReadView(availablityListener);
+ }
+
+ @After
+ public void tearDown() {
+ readView.releaseAllResources();
+ subpartition.release();
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testAddTwoNonFinishedBuffer() {
+ subpartition.add(createBufferBuilder().createBufferConsumer());
+ subpartition.add(createBufferBuilder().createBufferConsumer());
+ assertNull(readView.getNextBuffer());
+ }
+
+ @Test
+ public void testAddEmptyNonFinishedBuffer() {
+ assertEquals(0, availablityListener.getNumNotifications());
+
+ BufferBuilder bufferBuilder = createBufferBuilder();
+ subpartition.add(bufferBuilder.createBufferConsumer());
+
+ assertEquals(0, availablityListener.getNumNotifications());
+ assertNull(readView.getNextBuffer());
+
+ bufferBuilder.finish();
+ bufferBuilder = createBufferBuilder();
+ subpartition.add(bufferBuilder.createBufferConsumer());
+
+ assertEquals(1, availablityListener.getNumNotifications()); // notification from finishing previous buffer.
+ assertNull(readView.getNextBuffer());
+ assertEquals(1, subpartition.getBuffersInBacklog());
+ }
+
+ @Test
+ public void testAddNonEmptyNotFinishedBuffer() throws Exception {
+ assertEquals(0, availablityListener.getNumNotifications());
+
+ BufferBuilder bufferBuilder = createBufferBuilder();
+ bufferBuilder.appendAndCommit(ByteBuffer.allocate(1024));
+ subpartition.add(bufferBuilder.createBufferConsumer());
+
+ // note that since the buffer builder is not finished, there is still a retained instance!
+ assertNextBuffer(readView, 1024, false, 1, false, false);
+ assertEquals(1, subpartition.getBuffersInBacklog());
+ }
+
+ /**
+ * Normally moreAvailable flag from InputChannel should ignore non finished BufferConsumers, otherwise we would
+ * busy loop on the unfinished BufferConsumers.
+ */
+ @Test
+ public void testUnfinishedBufferBehindFinished() throws Exception {
+ subpartition.add(createFilledBufferConsumer(1025)); // finished
+ subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // not finished
+
+ assertThat(availablityListener.getNumNotifications(), greaterThan(0L));
+ assertNextBuffer(readView, 1025, false, 1, false, true);
+ // not notified, but we could still access the unfinished buffer
+ assertNextBuffer(readView, 1024, false, 1, false, false);
+ assertNoNextBuffer(readView);
+ }
+
+ /**
+ * After flush call unfinished BufferConsumers should be reported as available, otherwise we might not flush some
+ * of the data.
+ */
+ @Test
+ public void testFlushWithUnfinishedBufferBehindFinished() throws Exception {
+ subpartition.add(createFilledBufferConsumer(1025)); // finished
+ subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // not finished
+ long oldNumNotifications = availablityListener.getNumNotifications();
+ subpartition.flush();
+ // buffer queue is > 1, should already be notified, no further notification necessary
+ assertThat(oldNumNotifications, greaterThan(0L));
+ assertEquals(oldNumNotifications, availablityListener.getNumNotifications());
+
+ assertNextBuffer(readView, 1025, true, 1, false, true);
+ assertNextBuffer(readView, 1024, false, 1, false, false);
+ assertNoNextBuffer(readView);
+ }
+
+ /**
+ * A flush call with a buffer size of 1 should always notify consumers (unless already flushed).
+ */
+ @Test
+ public void testFlushWithUnfinishedBufferBehindFinished2() throws Exception {
+ // no buffers -> no notification or any other effects
+ subpartition.flush();
+ assertEquals(0, availablityListener.getNumNotifications());
+
+ subpartition.add(createFilledBufferConsumer(1025)); // finished
+ subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // not finished
+
+ assertNextBuffer(readView, 1025, false, 1, false, true);
+
+ long oldNumNotifications = availablityListener.getNumNotifications();
+ subpartition.flush();
+ // buffer queue is 1 again -> need to flush
+ assertEquals(oldNumNotifications + 1, availablityListener.getNumNotifications());
+ subpartition.flush();
+ // calling again should not flush again
+ assertEquals(oldNumNotifications + 1, availablityListener.getNumNotifications());
+
+ assertNextBuffer(readView, 1024, false, 1, false, false);
+ assertNoNextBuffer(readView);
+ }
+
+ @Test
+ public void testMultipleEmptyBuffers() throws Exception {
+ assertEquals(0, availablityListener.getNumNotifications());
+
+ subpartition.add(createFilledBufferConsumer(0));
+
+ assertEquals(1, availablityListener.getNumNotifications());
+ subpartition.add(createFilledBufferConsumer(0));
+ assertEquals(2, availablityListener.getNumNotifications());
+
+ subpartition.add(createFilledBufferConsumer(0));
+ assertEquals(2, availablityListener.getNumNotifications());
+ assertEquals(3, subpartition.getBuffersInBacklog());
+
+ subpartition.add(createFilledBufferConsumer(1024));
+ assertEquals(2, availablityListener.getNumNotifications());
+
+ assertNextBuffer(readView, 1024, false, 0, false, true);
+ }
+
+ @Test
+ public void testEmptyFlush() {
+ subpartition.flush();
+ assertEquals(0, availablityListener.getNumNotifications());
+ }
+
+ @Test
+ public void testBasicPipelinedProduceConsumeLogic() throws Exception {
+ // Empty => should return null
+ assertFalse(readView.nextBufferIsEvent());
+ assertNoNextBuffer(readView);
+ assertFalse(readView.nextBufferIsEvent()); // also after getNextBuffer()
+ assertEquals(0, availablityListener.getNumNotifications());
+
+ // Add data to the queue...
+ subpartition.add(createFilledBufferConsumer(BUFFER_SIZE));
+ assertFalse(readView.nextBufferIsEvent());
+
+ assertEquals(1, subpartition.getTotalNumberOfBuffers());
+ assertEquals(1, subpartition.getBuffersInBacklog());
+ assertEquals(0, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
+
+ // ...should have resulted in a notification
+ assertEquals(1, availablityListener.getNumNotifications());
+
+ // ...and one available result
+ assertNextBuffer(readView, BUFFER_SIZE, false, subpartition.getBuffersInBacklog() - 1, false, true);
+ assertEquals(BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
+ assertEquals(0, subpartition.getBuffersInBacklog());
+ assertNoNextBuffer(readView);
+ assertEquals(0, subpartition.getBuffersInBacklog());
+
+ // Add data to the queue...
+ subpartition.add(createFilledBufferConsumer(BUFFER_SIZE));
+ assertFalse(readView.nextBufferIsEvent());
+
+ assertEquals(2, subpartition.getTotalNumberOfBuffers());
+ assertEquals(1, subpartition.getBuffersInBacklog());
+ assertEquals(BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
+ assertEquals(2, availablityListener.getNumNotifications());
+
+ assertNextBuffer(readView, BUFFER_SIZE, false, subpartition.getBuffersInBacklog() - 1, false, true);
+ assertEquals(2 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
+ assertEquals(0, subpartition.getBuffersInBacklog());
+ assertNoNextBuffer(readView);
+ assertEquals(0, subpartition.getBuffersInBacklog());
+
+ // some tests with events
+
+ // fill with: buffer, event, and buffer
+ subpartition.add(createFilledBufferConsumer(BUFFER_SIZE));
+ assertFalse(readView.nextBufferIsEvent());
+ subpartition.add(createEventBufferConsumer(BUFFER_SIZE));
+ assertFalse(readView.nextBufferIsEvent());
+ subpartition.add(createFilledBufferConsumer(BUFFER_SIZE));
+ assertFalse(readView.nextBufferIsEvent());
+
+ assertEquals(5, subpartition.getTotalNumberOfBuffers());
+ assertEquals(2, subpartition.getBuffersInBacklog()); // two buffers (events don't count)
+ assertEquals(2 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
+ assertEquals(4, availablityListener.getNumNotifications());
+
+ // the first buffer
+ assertNextBuffer(readView, BUFFER_SIZE, true, subpartition.getBuffersInBacklog() - 1, true, true);
+ assertEquals(3 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
+ assertEquals(1, subpartition.getBuffersInBacklog());
+
+ // the event
+ assertNextEvent(readView, BUFFER_SIZE, null, true, subpartition.getBuffersInBacklog(), false, true);
+ assertEquals(4 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
+ assertEquals(1, subpartition.getBuffersInBacklog());
+
+ // the remaining buffer
+ assertNextBuffer(readView, BUFFER_SIZE, false, subpartition.getBuffersInBacklog() - 1, false, true);
+ assertEquals(5 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
+ assertEquals(0, subpartition.getBuffersInBacklog());
+
+ // nothing more
+ assertNoNextBuffer(readView);
+ assertEquals(0, subpartition.getBuffersInBacklog());
+
+ assertEquals(5, subpartition.getTotalNumberOfBuffers());
+ assertEquals(5 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes());
+ assertEquals(4, availablityListener.getNumNotifications());
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
index 5989cf8..9f5e6d0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
@@ -217,7 +217,7 @@ public abstract class SubpartitionTestBase extends TestLogger {
assertEquals("recycled", expectedRecycledAfterRecycle, bufferAndBacklog.buffer().isRecycled());
}
- protected void assertNoNextBuffer(ResultSubpartitionView readView) throws IOException, InterruptedException {
+ static void assertNoNextBuffer(ResultSubpartitionView readView) throws IOException, InterruptedException {
assertNull(readView.getNextBuffer());
}
}