You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by nk...@apache.org on 2018/08/30 20:58:17 UTC

[flink] 01/03: [hotfix][network] simplify moreAvailable/wasEmpty logic

This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f574a8fc1fafb761983d7a04a72cd6181fe74276
Author: Nico Kruber <ni...@data-artisans.com>
AuthorDate: Tue Aug 21 17:16:29 2018 +0200

    [hotfix][network] simplify moreAvailable/wasEmpty logic
    
    If we only need the status of a queue being empty or not, we do not need to
    acquire the size.
---
 .../io/network/partition/consumer/RemoteInputChannel.java      | 10 +++++-----
 .../runtime/io/network/partition/consumer/SingleInputGate.java |  2 +-
 2 files changed, 6 insertions(+), 6 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index 28f3020..79d25c6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -191,16 +191,16 @@ public class RemoteInputChannel extends InputChannel implements BufferRecycler,
 		checkError();
 
 		final Buffer next;
-		final int remaining;
+		final boolean moreAvailable;
 
 		synchronized (receivedBuffers) {
 			next = receivedBuffers.poll();
-			remaining = receivedBuffers.size();
+			moreAvailable = !receivedBuffers.isEmpty();
 		}
 
 		numBytesIn.inc(next.getSizeUnsafe());
 		numBuffersIn.inc();
-		return Optional.of(new BufferAndAvailability(next, remaining > 0, getSenderBacklog()));
+		return Optional.of(new BufferAndAvailability(next, moreAvailable, getSenderBacklog()));
 	}
 
 	// ------------------------------------------------------------------------
@@ -516,12 +516,12 @@ public class RemoteInputChannel extends InputChannel implements BufferRecycler,
 			synchronized (receivedBuffers) {
 				if (!isReleased.get()) {
 					if (expectedSequenceNumber == sequenceNumber) {
-						int available = receivedBuffers.size();
+						final boolean wasEmpty = receivedBuffers.isEmpty();
 
 						receivedBuffers.add(buffer);
 						expectedSequenceNumber++;
 
-						if (available == 0) {
+						if (wasEmpty) {
 							notifyChannelNonEmpty();
 						}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 2e7d076..f51dc74 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -545,7 +545,7 @@ public class SingleInputGate implements InputGate {
 
 				currentChannel = inputChannelsWithData.remove();
 				enqueuedInputChannelsWithData.clear(currentChannel.getChannelIndex());
-				moreAvailable = inputChannelsWithData.size() > 0;
+				moreAvailable = !inputChannelsWithData.isEmpty();
 			}
 
 			result = currentChannel.getNextBuffer();