You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by nk...@apache.org on 2018/09/19 11:42:52 UTC

[flink] 02/02: [FLINK-10332][network] move data notification out of the synchronized block

This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 607c5675a108114254c747ca3565c6f03a98434e
Author: Nico Kruber <ni...@data-artisans.com>
AuthorDate: Tue Sep 18 12:10:21 2018 +0200

    [FLINK-10332][network] move data notification out of the synchronized block
    
    None of the notifications actually rely on being under the lock and may thus
    only cause lock contention.
    
    This closes #6693.
---
 .../network/partition/PipelinedSubpartition.java   | 44 +++++++++++-----------
 1 file changed, 22 insertions(+), 22 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
index d2d7fdb..fe27d97 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
@@ -88,6 +88,7 @@ class PipelinedSubpartition extends ResultSubpartition {
 	private boolean add(BufferConsumer bufferConsumer, boolean finish) {
 		checkNotNull(bufferConsumer);
 
+		final boolean notifyDataAvailable;
 		synchronized (buffers) {
 			if (isFinished || isReleased) {
 				bufferConsumer.close();
@@ -98,14 +99,13 @@ class PipelinedSubpartition extends ResultSubpartition {
 			buffers.add(bufferConsumer);
 			updateStatistics(bufferConsumer);
 			increaseBuffersInBacklog(bufferConsumer);
+			notifyDataAvailable = shouldNotifyDataAvailable() || finish;
 
-			if (finish) {
-				isFinished = true;
-				notifyDataAvailable();
-			}
-			else {
-				maybeNotifyDataAvailable();
-			}
+			isFinished |= finish;
+		}
+
+		if (notifyDataAvailable) {
+			notifyDataAvailable();
 		}
 
 		return true;
@@ -220,6 +220,7 @@ class PipelinedSubpartition extends ResultSubpartition {
 
 	@Override
 	public PipelinedSubpartitionView createReadView(BufferAvailabilityListener availabilityListener) throws IOException {
+		final boolean notifyDataAvailable;
 		synchronized (buffers) {
 			checkState(!isReleased);
 			checkState(readView == null,
@@ -230,9 +231,10 @@ class PipelinedSubpartition extends ResultSubpartition {
 				parent.getOwningTaskName(), index, parent.getPartitionId());
 
 			readView = new PipelinedSubpartitionView(this, availabilityListener);
-			if (!buffers.isEmpty()) {
-				notifyDataAvailable();
-			}
+			notifyDataAvailable = !buffers.isEmpty();
+		}
+		if (notifyDataAvailable) {
+			notifyDataAvailable();
 		}
 
 		return readView;
@@ -283,26 +285,24 @@ class PipelinedSubpartition extends ResultSubpartition {
 
 	@Override
 	public void flush() {
+		final boolean notifyDataAvailable;
 		synchronized (buffers) {
 			if (buffers.isEmpty()) {
 				return;
 			}
-			if (!flushRequested) {
-				flushRequested = true; // set this before the notification!
-				// if there is more then 1 buffer, we already notified the reader
-				// (at the latest when adding the second buffer)
-				if (buffers.size() == 1) {
-					notifyDataAvailable();
-				}
-			}
+			// if there is more then 1 buffer, we already notified the reader
+			// (at the latest when adding the second buffer)
+			notifyDataAvailable = !flushRequested && buffers.size() == 1;
+			flushRequested = true;
+		}
+		if (notifyDataAvailable) {
+			notifyDataAvailable();
 		}
 	}
 
-	private void maybeNotifyDataAvailable() {
+	private boolean shouldNotifyDataAvailable() {
 		// Notify only when we added first finished buffer.
-		if (getNumberOfFinishedBuffers() == 1) {
-			notifyDataAvailable();
-		}
+		return readView != null && !flushRequested && getNumberOfFinishedBuffers() == 1;
 	}
 
 	private void notifyDataAvailable() {