You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by nk...@apache.org on 2018/09/19 10:28:25 UTC

[flink] 08/09: [FLINK-10331][network] reduce unnecessary flushing

This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 16ea186fef3ac7eee8679f8e87132313508573b3
Author: Nico Kruber <ni...@data-artisans.com>
AuthorDate: Thu Sep 13 11:13:55 2018 +0200

    [FLINK-10331][network] reduce unnecessary flushing
    
    Do not flush (again) if
    - a previous flush request has not been completely handled yet and/or is still enqueued or
    - the network stack is still polling from this subpartition and doesn't need a new notification
    
    This closes #6692.
---
 .../network/partition/PipelinedSubpartition.java   | 46 ++++++++++++++++------
 .../partition/PipelinedSubpartitionTest.java       | 45 +++++++++++++++++++++
 2 files changed, 79 insertions(+), 12 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
index 91e0d4b..d2d7fdb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
@@ -36,6 +36,19 @@ import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * A pipelined in-memory only subpartition, which can be consumed once.
+ *
+ * <p>Whenever {@link #add(BufferConsumer)} adds a finished {@link BufferConsumer} or a second
+ * {@link BufferConsumer} (in which case we will assume the first one finished), we will
+ * {@link PipelinedSubpartitionView#notifyDataAvailable() notify} a read view created via
+ * {@link #createReadView(BufferAvailabilityListener)} of new data availability. Except by calling
+ * {@link #flush()} explicitly, we always only notify when the first finished buffer turns up and
+ * then, the reader has to drain the buffers via {@link #pollBuffer()} until its return value shows
+ * no more buffers being available. This results in a buffer queue which is either empty or has an
+ * unfinished {@link BufferConsumer} left from which the notifications will eventually start again.
+ *
+ * <p>Explicit calls to {@link #flush()} will force this
+ * {@link PipelinedSubpartitionView#notifyDataAvailable() notification} for any
+ * {@link BufferConsumer} present in the queue.
  */
 class PipelinedSubpartition extends ResultSubpartition {
 
@@ -67,17 +80,6 @@ class PipelinedSubpartition extends ResultSubpartition {
 	}
 
 	@Override
-	public void flush() {
-		synchronized (buffers) {
-			if (buffers.isEmpty()) {
-				return;
-			}
-			flushRequested = !buffers.isEmpty();
-			notifyDataAvailable();
-		}
-	}
-
-	@Override
 	public void finish() throws IOException {
 		add(EventSerializer.toBufferConsumer(EndOfPartitionEvent.INSTANCE), true);
 		LOG.debug("{}: Finished {}.", parent.getOwningTaskName(), this);
@@ -99,7 +101,7 @@ class PipelinedSubpartition extends ResultSubpartition {
 
 			if (finish) {
 				isFinished = true;
-				flush();
+				notifyDataAvailable();
 			}
 			else {
 				maybeNotifyDataAvailable();
@@ -279,6 +281,23 @@ class PipelinedSubpartition extends ResultSubpartition {
 		return Math.max(buffers.size(), 0);
 	}
 
+	@Override
+	public void flush() {
+		synchronized (buffers) {
+			if (buffers.isEmpty()) {
+				return;
+			}
+			if (!flushRequested) {
+				flushRequested = true; // set this before the notification!
+				// if there is more then 1 buffer, we already notified the reader
+				// (at the latest when adding the second buffer)
+				if (buffers.size() == 1) {
+					notifyDataAvailable();
+				}
+			}
+		}
+	}
+
 	private void maybeNotifyDataAvailable() {
 		// Notify only when we added first finished buffer.
 		if (getNumberOfFinishedBuffers() == 1) {
@@ -295,6 +314,9 @@ class PipelinedSubpartition extends ResultSubpartition {
 	private int getNumberOfFinishedBuffers() {
 		assert Thread.holdsLock(buffers);
 
+		// NOTE: isFinished() is not guaranteed to provide the most up-to-date state here
+		// worst-case: a single finished buffer sits around until the next flush() call
+		// (but we do not offer stronger guarantees anyway)
 		if (buffers.size() == 1 && buffers.peekLast().isFinished()) {
 			return 1;
 		}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
index 90bdb82..b75bb7a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
@@ -47,6 +47,8 @@ import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.
 import static org.apache.flink.runtime.io.network.util.TestBufferFactory.BUFFER_SIZE;
 import static org.apache.flink.util.FutureUtil.waitForAll;
 import static org.apache.flink.util.Preconditions.checkState;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThan;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -160,7 +162,11 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
 			subpartition.add(createFilledBufferConsumer(1025)); // finished
 			subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // not finished
 
+			assertThat(availablityListener.getNumNotifications(), greaterThan(0L));
 			assertNextBuffer(readView, 1025, false, 1, false, true);
+			// not notified, but we could still access the unfinished buffer
+			assertNextBuffer(readView, 1024, false, 1, false, false);
+			assertNoNextBuffer(readView);
 		} finally {
 			subpartition.release();
 		}
@@ -179,10 +185,49 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
 		try {
 			subpartition.add(createFilledBufferConsumer(1025)); // finished
 			subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // not finished
+			long oldNumNotifications = availablityListener.getNumNotifications();
 			subpartition.flush();
+			// buffer queue is > 1, should already be notified, no further notification necessary
+			assertThat(oldNumNotifications, greaterThan(0L));
+			assertEquals(oldNumNotifications, availablityListener.getNumNotifications());
 
 			assertNextBuffer(readView, 1025, true, 1, false, true);
 			assertNextBuffer(readView, 1024, false, 1, false, false);
+			assertNoNextBuffer(readView);
+		} finally {
+			subpartition.release();
+		}
+	}
+
+	/**
+	 * A flush call with a buffer size of 1 should always notify consumers (unless already flushed).
+	 */
+	@Test
+	public void testFlushWithUnfinishedBufferBehindFinished2() throws Exception {
+		final ResultSubpartition subpartition = createSubpartition();
+		AwaitableBufferAvailablityListener availablityListener = new AwaitableBufferAvailablityListener();
+		ResultSubpartitionView readView = subpartition.createReadView(availablityListener);
+
+		try {
+			// no buffers -> no notification or any other effects
+			subpartition.flush();
+			assertEquals(0, availablityListener.getNumNotifications());
+
+			subpartition.add(createFilledBufferConsumer(1025)); // finished
+			subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // not finished
+
+			assertNextBuffer(readView, 1025, false, 1, false, true);
+
+			long oldNumNotifications = availablityListener.getNumNotifications();
+			subpartition.flush();
+			// buffer queue is 1 again -> need to flush
+			assertEquals(oldNumNotifications + 1, availablityListener.getNumNotifications());
+			subpartition.flush();
+			// calling again should not flush again
+			assertEquals(oldNumNotifications + 1, availablityListener.getNumNotifications());
+
+			assertNextBuffer(readView, 1024, false, 1, false, false);
+			assertNoNextBuffer(readView);
 		} finally {
 			subpartition.release();
 		}