You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by nk...@apache.org on 2018/09/19 11:42:50 UTC

[flink] branch release-1.5 updated (915db25 -> 607c567)

This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a change to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 915db25  [hotfix][network][tests] split PipelinedSubpartitionTest for better initialization
     new 23678c9  [hotfix][network] use ConcurrentMap#putIfAbsent and Lambdas for partition request handlers
     new 607c567  [FLINK-10332][network] move data notification out of the synchronized block

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../CreditBasedPartitionRequestClientHandler.java  | 11 +-----
 .../netty/PartitionRequestClientHandler.java       |  4 +-
 .../io/network/netty/PartitionRequestQueue.java    |  7 +---
 .../network/partition/PipelinedSubpartition.java   | 44 +++++++++++-----------
 4 files changed, 26 insertions(+), 40 deletions(-)


[flink] 01/02: [hotfix][network] use ConcurrentMap#putIfAbsent and Lambdas for partition request handlers

Posted by nk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 23678c9b2b198762134ec62c06add0a834b0c0e9
Author: Nico Kruber <ni...@data-artisans.com>
AuthorDate: Sun Aug 5 00:41:02 2018 +0200

    [hotfix][network] use ConcurrentMap#putIfAbsent and Lambdas for partition request handlers
---
 .../netty/CreditBasedPartitionRequestClientHandler.java       | 11 ++---------
 .../io/network/netty/PartitionRequestClientHandler.java       |  4 +---
 .../flink/runtime/io/network/netty/PartitionRequestQueue.java |  7 +------
 3 files changed, 4 insertions(+), 18 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
index 90daf75..cc0b222 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
@@ -89,9 +89,7 @@ class CreditBasedPartitionRequestClientHandler extends ChannelInboundHandlerAdap
 	public void addInputChannel(RemoteInputChannel listener) throws IOException {
 		checkError();
 
-		if (!inputChannels.containsKey(listener.getInputChannelId())) {
-			inputChannels.put(listener.getInputChannelId(), listener);
-		}
+		inputChannels.putIfAbsent(listener.getInputChannelId(), listener);
 	}
 
 	@Override
@@ -112,12 +110,7 @@ class CreditBasedPartitionRequestClientHandler extends ChannelInboundHandlerAdap
 
 	@Override
 	public void notifyCreditAvailable(final RemoteInputChannel inputChannel) {
-		ctx.executor().execute(new Runnable() {
-			@Override
-			public void run() {
-				ctx.pipeline().fireUserEventTriggered(inputChannel);
-			}
-		});
+		ctx.executor().execute(() -> ctx.pipeline().fireUserEventTriggered(inputChannel));
 	}
 
 	// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
index 796e86f..c5ba7a4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
@@ -85,9 +85,7 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter impleme
 	public void addInputChannel(RemoteInputChannel listener) throws IOException {
 		checkError();
 
-		if (!inputChannels.containsKey(listener.getInputChannelId())) {
-			inputChannels.put(listener.getInputChannelId(), listener);
-		}
+		inputChannels.putIfAbsent(listener.getInputChannelId(), listener);
 	}
 
 	@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
index 8c05b82..c3d3d1b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
@@ -89,12 +89,7 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter {
 		// TODO This could potentially have a bad performance impact as in the
 		// worst case (network consumes faster than the producer) each buffer
 		// will trigger a separate event loop task being scheduled.
-		ctx.executor().execute(new Runnable() {
-			@Override
-			public void run() {
-				ctx.pipeline().fireUserEventTriggered(reader);
-			}
-		});
+		ctx.executor().execute(() -> ctx.pipeline().fireUserEventTriggered(reader));
 	}
 
 	/**


[flink] 02/02: [FLINK-10332][network] move data notification out of the synchronized block

Posted by nk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 607c5675a108114254c747ca3565c6f03a98434e
Author: Nico Kruber <ni...@data-artisans.com>
AuthorDate: Tue Sep 18 12:10:21 2018 +0200

    [FLINK-10332][network] move data notification out of the synchronized block
    
    None of the notifications actually rely on being under the lock and may thus
    only cause lock contention.
    
    This closes #6693.
---
 .../network/partition/PipelinedSubpartition.java   | 44 +++++++++++-----------
 1 file changed, 22 insertions(+), 22 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
index d2d7fdb..fe27d97 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
@@ -88,6 +88,7 @@ class PipelinedSubpartition extends ResultSubpartition {
 	private boolean add(BufferConsumer bufferConsumer, boolean finish) {
 		checkNotNull(bufferConsumer);
 
+		final boolean notifyDataAvailable;
 		synchronized (buffers) {
 			if (isFinished || isReleased) {
 				bufferConsumer.close();
@@ -98,14 +99,13 @@ class PipelinedSubpartition extends ResultSubpartition {
 			buffers.add(bufferConsumer);
 			updateStatistics(bufferConsumer);
 			increaseBuffersInBacklog(bufferConsumer);
+			notifyDataAvailable = shouldNotifyDataAvailable() || finish;
 
-			if (finish) {
-				isFinished = true;
-				notifyDataAvailable();
-			}
-			else {
-				maybeNotifyDataAvailable();
-			}
+			isFinished |= finish;
+		}
+
+		if (notifyDataAvailable) {
+			notifyDataAvailable();
 		}
 
 		return true;
@@ -220,6 +220,7 @@ class PipelinedSubpartition extends ResultSubpartition {
 
 	@Override
 	public PipelinedSubpartitionView createReadView(BufferAvailabilityListener availabilityListener) throws IOException {
+		final boolean notifyDataAvailable;
 		synchronized (buffers) {
 			checkState(!isReleased);
 			checkState(readView == null,
@@ -230,9 +231,10 @@ class PipelinedSubpartition extends ResultSubpartition {
 				parent.getOwningTaskName(), index, parent.getPartitionId());
 
 			readView = new PipelinedSubpartitionView(this, availabilityListener);
-			if (!buffers.isEmpty()) {
-				notifyDataAvailable();
-			}
+			notifyDataAvailable = !buffers.isEmpty();
+		}
+		if (notifyDataAvailable) {
+			notifyDataAvailable();
 		}
 
 		return readView;
@@ -283,26 +285,24 @@ class PipelinedSubpartition extends ResultSubpartition {
 
 	@Override
 	public void flush() {
+		final boolean notifyDataAvailable;
 		synchronized (buffers) {
 			if (buffers.isEmpty()) {
 				return;
 			}
-			if (!flushRequested) {
-				flushRequested = true; // set this before the notification!
-				// if there is more then 1 buffer, we already notified the reader
-				// (at the latest when adding the second buffer)
-				if (buffers.size() == 1) {
-					notifyDataAvailable();
-				}
-			}
+			// if there is more then 1 buffer, we already notified the reader
+			// (at the latest when adding the second buffer)
+			notifyDataAvailable = !flushRequested && buffers.size() == 1;
+			flushRequested = true;
+		}
+		if (notifyDataAvailable) {
+			notifyDataAvailable();
 		}
 	}
 
-	private void maybeNotifyDataAvailable() {
+	private boolean shouldNotifyDataAvailable() {
 		// Notify only when we added first finished buffer.
-		if (getNumberOfFinishedBuffers() == 1) {
-			notifyDataAvailable();
-		}
+		return readView != null && !flushRequested && getNumberOfFinishedBuffers() == 1;
 	}
 
 	private void notifyDataAvailable() {