You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by nk...@apache.org on 2018/09/19 11:42:51 UTC
[flink] 01/02: [hotfix][network] use ConcurrentMap#putIfAbsent and
Lambdas for partition request handlers
This is an automated email from the ASF dual-hosted git repository.
nkruber pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 23678c9b2b198762134ec62c06add0a834b0c0e9
Author: Nico Kruber <ni...@data-artisans.com>
AuthorDate: Sun Aug 5 00:41:02 2018 +0200
[hotfix][network] use ConcurrentMap#putIfAbsent and Lambdas for partition request handlers
---
.../netty/CreditBasedPartitionRequestClientHandler.java | 11 ++---------
.../io/network/netty/PartitionRequestClientHandler.java | 4 +---
.../flink/runtime/io/network/netty/PartitionRequestQueue.java | 7 +------
3 files changed, 4 insertions(+), 18 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
index 90daf75..cc0b222 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
@@ -89,9 +89,7 @@ class CreditBasedPartitionRequestClientHandler extends ChannelInboundHandlerAdap
public void addInputChannel(RemoteInputChannel listener) throws IOException {
checkError();
- if (!inputChannels.containsKey(listener.getInputChannelId())) {
- inputChannels.put(listener.getInputChannelId(), listener);
- }
+ inputChannels.putIfAbsent(listener.getInputChannelId(), listener);
}
@Override
@@ -112,12 +110,7 @@ class CreditBasedPartitionRequestClientHandler extends ChannelInboundHandlerAdap
@Override
public void notifyCreditAvailable(final RemoteInputChannel inputChannel) {
- ctx.executor().execute(new Runnable() {
- @Override
- public void run() {
- ctx.pipeline().fireUserEventTriggered(inputChannel);
- }
- });
+ ctx.executor().execute(() -> ctx.pipeline().fireUserEventTriggered(inputChannel));
}
// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
index 796e86f..c5ba7a4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
@@ -85,9 +85,7 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter impleme
public void addInputChannel(RemoteInputChannel listener) throws IOException {
checkError();
- if (!inputChannels.containsKey(listener.getInputChannelId())) {
- inputChannels.put(listener.getInputChannelId(), listener);
- }
+ inputChannels.putIfAbsent(listener.getInputChannelId(), listener);
}
@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
index 8c05b82..c3d3d1b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
@@ -89,12 +89,7 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter {
// TODO This could potentially have a bad performance impact as in the
// worst case (network consumes faster than the producer) each buffer
// will trigger a separate event loop task being scheduled.
- ctx.executor().execute(new Runnable() {
- @Override
- public void run() {
- ctx.pipeline().fireUserEventTriggered(reader);
- }
- });
+ ctx.executor().execute(() -> ctx.pipeline().fireUserEventTriggered(reader));
}
/**