You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by nk...@apache.org on 2018/08/30 20:58:17 UTC
[flink] 01/03: [hotfix][network] simplify moreAvailable/wasEmpty
logic
This is an automated email from the ASF dual-hosted git repository.
nkruber pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git
commit f574a8fc1fafb761983d7a04a72cd6181fe74276
Author: Nico Kruber <ni...@data-artisans.com>
AuthorDate: Tue Aug 21 17:16:29 2018 +0200
[hotfix][network] simplify moreAvailable/wasEmpty logic
If we only need the status of a queue being empty or not, we do not need to
acquire the size.
---
.../io/network/partition/consumer/RemoteInputChannel.java | 10 +++++-----
.../runtime/io/network/partition/consumer/SingleInputGate.java | 2 +-
2 files changed, 6 insertions(+), 6 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index 28f3020..79d25c6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -191,16 +191,16 @@ public class RemoteInputChannel extends InputChannel implements BufferRecycler,
checkError();
final Buffer next;
- final int remaining;
+ final boolean moreAvailable;
synchronized (receivedBuffers) {
next = receivedBuffers.poll();
- remaining = receivedBuffers.size();
+ moreAvailable = !receivedBuffers.isEmpty();
}
numBytesIn.inc(next.getSizeUnsafe());
numBuffersIn.inc();
- return Optional.of(new BufferAndAvailability(next, remaining > 0, getSenderBacklog()));
+ return Optional.of(new BufferAndAvailability(next, moreAvailable, getSenderBacklog()));
}
// ------------------------------------------------------------------------
@@ -516,12 +516,12 @@ public class RemoteInputChannel extends InputChannel implements BufferRecycler,
synchronized (receivedBuffers) {
if (!isReleased.get()) {
if (expectedSequenceNumber == sequenceNumber) {
- int available = receivedBuffers.size();
+ final boolean wasEmpty = receivedBuffers.isEmpty();
receivedBuffers.add(buffer);
expectedSequenceNumber++;
- if (available == 0) {
+ if (wasEmpty) {
notifyChannelNonEmpty();
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 2e7d076..f51dc74 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -545,7 +545,7 @@ public class SingleInputGate implements InputGate {
currentChannel = inputChannelsWithData.remove();
enqueuedInputChannelsWithData.clear(currentChannel.getChannelIndex());
- moreAvailable = inputChannelsWithData.size() > 0;
+ moreAvailable = !inputChannelsWithData.isEmpty();
}
result = currentChannel.getNextBuffer();