You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by nk...@apache.org on 2018/09/19 19:32:55 UTC

[flink] 10/11: [hotfix][network] use ConcurrentMap#putIfAbsent and Lambdas for partition request handlers

This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bda07fc1f709f20ce5c3d52df36944e21b869b52
Author: Nico Kruber <ni...@data-artisans.com>
AuthorDate: Sun Aug 5 00:41:02 2018 +0200

    [hotfix][network] use ConcurrentMap#putIfAbsent and Lambdas for partition request handlers
---
 .../netty/CreditBasedPartitionRequestClientHandler.java       | 11 ++---------
 .../io/network/netty/PartitionRequestClientHandler.java       |  4 +---
 .../flink/runtime/io/network/netty/PartitionRequestQueue.java |  7 +------
 3 files changed, 4 insertions(+), 18 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
index 90daf75..cc0b222 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
@@ -89,9 +89,7 @@ class CreditBasedPartitionRequestClientHandler extends ChannelInboundHandlerAdap
 	public void addInputChannel(RemoteInputChannel listener) throws IOException {
 		checkError();
 
-		if (!inputChannels.containsKey(listener.getInputChannelId())) {
-			inputChannels.put(listener.getInputChannelId(), listener);
-		}
+		inputChannels.putIfAbsent(listener.getInputChannelId(), listener);
 	}
 
 	@Override
@@ -112,12 +110,7 @@ class CreditBasedPartitionRequestClientHandler extends ChannelInboundHandlerAdap
 
 	@Override
 	public void notifyCreditAvailable(final RemoteInputChannel inputChannel) {
-		ctx.executor().execute(new Runnable() {
-			@Override
-			public void run() {
-				ctx.pipeline().fireUserEventTriggered(inputChannel);
-			}
-		});
+		ctx.executor().execute(() -> ctx.pipeline().fireUserEventTriggered(inputChannel));
 	}
 
 	// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
index 796e86f..c5ba7a4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
@@ -85,9 +85,7 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter impleme
 	public void addInputChannel(RemoteInputChannel listener) throws IOException {
 		checkError();
 
-		if (!inputChannels.containsKey(listener.getInputChannelId())) {
-			inputChannels.put(listener.getInputChannelId(), listener);
-		}
+		inputChannels.putIfAbsent(listener.getInputChannelId(), listener);
 	}
 
 	@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
index 8c05b82..c3d3d1b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
@@ -89,12 +89,7 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter {
 		// TODO This could potentially have a bad performance impact as in the
 		// worst case (network consumes faster than the producer) each buffer
 		// will trigger a separate event loop task being scheduled.
-		ctx.executor().execute(new Runnable() {
-			@Override
-			public void run() {
-				ctx.pipeline().fireUserEventTriggered(reader);
-			}
-		});
+		ctx.executor().execute(() -> ctx.pipeline().fireUserEventTriggered(reader));
 	}
 
 	/**