You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by nk...@apache.org on 2018/09/19 10:28:25 UTC
[flink] 08/09: [FLINK-10331][network] reduce unnecessary flushing
This is an automated email from the ASF dual-hosted git repository.
nkruber pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 16ea186fef3ac7eee8679f8e87132313508573b3
Author: Nico Kruber <ni...@data-artisans.com>
AuthorDate: Thu Sep 13 11:13:55 2018 +0200
[FLINK-10331][network] reduce unnecessary flushing
Do not flush (again) if
- a previous flush request has not been completely handled yet and/or is still enqueued or
- the network stack is still polling from this subpartition and doesn't need a new notification
This closes #6692.
---
.../network/partition/PipelinedSubpartition.java | 46 ++++++++++++++++------
.../partition/PipelinedSubpartitionTest.java | 45 +++++++++++++++++++++
2 files changed, 79 insertions(+), 12 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
index 91e0d4b..d2d7fdb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
@@ -36,6 +36,19 @@ import static org.apache.flink.util.Preconditions.checkState;
/**
* A pipelined in-memory only subpartition, which can be consumed once.
+ *
+ * <p>Whenever {@link #add(BufferConsumer)} adds a finished {@link BufferConsumer} or a second
+ * {@link BufferConsumer} (in which case we will assume the first one finished), we will
+ * {@link PipelinedSubpartitionView#notifyDataAvailable() notify} a read view created via
+ * {@link #createReadView(BufferAvailabilityListener)} of new data availability. Except by calling
+ * {@link #flush()} explicitly, we always only notify when the first finished buffer turns up and
+ * then, the reader has to drain the buffers via {@link #pollBuffer()} until its return value shows
+ * no more buffers being available. This results in a buffer queue which is either empty or has an
+ * unfinished {@link BufferConsumer} left from which the notifications will eventually start again.
+ *
+ * <p>Explicit calls to {@link #flush()} will force this
+ * {@link PipelinedSubpartitionView#notifyDataAvailable() notification} for any
+ * {@link BufferConsumer} present in the queue.
*/
class PipelinedSubpartition extends ResultSubpartition {
@@ -67,17 +80,6 @@ class PipelinedSubpartition extends ResultSubpartition {
}
@Override
- public void flush() {
- synchronized (buffers) {
- if (buffers.isEmpty()) {
- return;
- }
- flushRequested = !buffers.isEmpty();
- notifyDataAvailable();
- }
- }
-
- @Override
public void finish() throws IOException {
add(EventSerializer.toBufferConsumer(EndOfPartitionEvent.INSTANCE), true);
LOG.debug("{}: Finished {}.", parent.getOwningTaskName(), this);
@@ -99,7 +101,7 @@ class PipelinedSubpartition extends ResultSubpartition {
if (finish) {
isFinished = true;
- flush();
+ notifyDataAvailable();
}
else {
maybeNotifyDataAvailable();
@@ -279,6 +281,23 @@ class PipelinedSubpartition extends ResultSubpartition {
return Math.max(buffers.size(), 0);
}
+ @Override
+ public void flush() {
+ synchronized (buffers) {
+ if (buffers.isEmpty()) {
+ return;
+ }
+ if (!flushRequested) {
+ flushRequested = true; // set this before the notification!
+ // if there is more then 1 buffer, we already notified the reader
+ // (at the latest when adding the second buffer)
+ if (buffers.size() == 1) {
+ notifyDataAvailable();
+ }
+ }
+ }
+ }
+
private void maybeNotifyDataAvailable() {
// Notify only when we added first finished buffer.
if (getNumberOfFinishedBuffers() == 1) {
@@ -295,6 +314,9 @@ class PipelinedSubpartition extends ResultSubpartition {
private int getNumberOfFinishedBuffers() {
assert Thread.holdsLock(buffers);
+ // NOTE: isFinished() is not guaranteed to provide the most up-to-date state here
+ // worst-case: a single finished buffer sits around until the next flush() call
+ // (but we do not offer stronger guarantees anyway)
if (buffers.size() == 1 && buffers.peekLast().isFinished()) {
return 1;
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
index 90bdb82..b75bb7a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
@@ -47,6 +47,8 @@ import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.
import static org.apache.flink.runtime.io.network.util.TestBufferFactory.BUFFER_SIZE;
import static org.apache.flink.util.FutureUtil.waitForAll;
import static org.apache.flink.util.Preconditions.checkState;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThan;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@@ -160,7 +162,11 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
subpartition.add(createFilledBufferConsumer(1025)); // finished
subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // not finished
+ assertThat(availablityListener.getNumNotifications(), greaterThan(0L));
assertNextBuffer(readView, 1025, false, 1, false, true);
+ // not notified, but we could still access the unfinished buffer
+ assertNextBuffer(readView, 1024, false, 1, false, false);
+ assertNoNextBuffer(readView);
} finally {
subpartition.release();
}
@@ -179,10 +185,49 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
try {
subpartition.add(createFilledBufferConsumer(1025)); // finished
subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // not finished
+ long oldNumNotifications = availablityListener.getNumNotifications();
subpartition.flush();
+ // buffer queue is > 1, should already be notified, no further notification necessary
+ assertThat(oldNumNotifications, greaterThan(0L));
+ assertEquals(oldNumNotifications, availablityListener.getNumNotifications());
assertNextBuffer(readView, 1025, true, 1, false, true);
assertNextBuffer(readView, 1024, false, 1, false, false);
+ assertNoNextBuffer(readView);
+ } finally {
+ subpartition.release();
+ }
+ }
+
+ /**
+ * A flush call with a buffer size of 1 should always notify consumers (unless already flushed).
+ */
+ @Test
+ public void testFlushWithUnfinishedBufferBehindFinished2() throws Exception {
+ final ResultSubpartition subpartition = createSubpartition();
+ AwaitableBufferAvailablityListener availablityListener = new AwaitableBufferAvailablityListener();
+ ResultSubpartitionView readView = subpartition.createReadView(availablityListener);
+
+ try {
+ // no buffers -> no notification or any other effects
+ subpartition.flush();
+ assertEquals(0, availablityListener.getNumNotifications());
+
+ subpartition.add(createFilledBufferConsumer(1025)); // finished
+ subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // not finished
+
+ assertNextBuffer(readView, 1025, false, 1, false, true);
+
+ long oldNumNotifications = availablityListener.getNumNotifications();
+ subpartition.flush();
+ // buffer queue is 1 again -> need to flush
+ assertEquals(oldNumNotifications + 1, availablityListener.getNumNotifications());
+ subpartition.flush();
+ // calling again should not flush again
+ assertEquals(oldNumNotifications + 1, availablityListener.getNumNotifications());
+
+ assertNextBuffer(readView, 1024, false, 1, false, false);
+ assertNoNextBuffer(readView);
} finally {
subpartition.release();
}