You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/05/10 13:23:37 UTC

[flink] 03/08: [hotfix][network] Refactor and simplify InputGate#getNextBufferOrEvent

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5b84dbf423a380e8e6d44694cdeb06eb968353aa
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Tue May 7 12:26:23 2019 +0200

    [hotfix][network] Refactor and simplify InputGate#getNextBufferOrEvent
    
    Previously in case of more data available, re-enquing a channel or an inputGate
    was done in a separate critical section, resulting with more complicated concurrency
    contract (critical section split into two). Side effect of this change
    is that now recursive getNextBuffer/pollNextBufferOrEvent are happening also
    under the lock, however they are non-blocking, so this shouldn't cause any issues.
---
 .../partition/consumer/SingleInputGate.java        | 21 +++++++------
 .../network/partition/consumer/UnionInputGate.java | 34 +++++++++++-----------
 2 files changed, 27 insertions(+), 28 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index d40af83..750e678 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -552,21 +552,20 @@ public class SingleInputGate implements InputGate {
 				}
 
 				currentChannel = inputChannelsWithData.remove();
-				enqueuedInputChannelsWithData.clear(currentChannel.getChannelIndex());
+
+				result = currentChannel.getNextBuffer();
+
+				if (result.isPresent() && result.get().moreAvailable()) {
+					// enqueue the currentChannel at the end to avoid starvation
+					inputChannelsWithData.add(currentChannel);
+				} else {
+					enqueuedInputChannelsWithData.clear(currentChannel.getChannelIndex());
+				}
+
 				moreAvailable = !inputChannelsWithData.isEmpty();
 			}
-
-			result = currentChannel.getNextBuffer();
 		} while (!result.isPresent());
 
-		// this channel was now removed from the non-empty channels queue
-		// we re-add it in case it has more data, because in that case no "non-empty" notification
-		// will come for that channel
-		if (result.get().moreAvailable()) {
-			queueChannel(currentChannel);
-			moreAvailable = true;
-		}
-
 		final Buffer buffer = result.get().buffer();
 		numBytesIn.inc(buffer.getSizeUnsafe());
 		if (buffer.isBuffer()) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
index 196743e..fcae79f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
@@ -184,13 +184,6 @@ public class UnionInputGate implements InputGate, InputGateListener {
 		InputGate inputGate = inputGateWithData.inputGate;
 		BufferOrEvent bufferOrEvent = inputGateWithData.bufferOrEvent;
 
-		if (bufferOrEvent.moreAvailable()) {
-			// this buffer or event was now removed from the non-empty gates queue
-			// we re-add it in case it has more data, because in that case no "non-empty" notification
-			// will come for that gate
-			queueInputGate(inputGate);
-		}
-
 		if (bufferOrEvent.isEvent()
 			&& bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class
 			&& inputGate.isFinished()) {
@@ -213,8 +206,6 @@ public class UnionInputGate implements InputGate, InputGateListener {
 
 	private Optional<InputGateWithData> waitAndGetNextInputGate(boolean blocking) throws IOException, InterruptedException {
 		while (true) {
-			InputGate inputGate;
-			boolean moreInputGatesAvailable;
 			synchronized (inputGatesWithData) {
 				while (inputGatesWithData.size() == 0) {
 					if (blocking) {
@@ -223,15 +214,24 @@ public class UnionInputGate implements InputGate, InputGateListener {
 						return Optional.empty();
 					}
 				}
-				inputGate = inputGatesWithData.remove();
-				enqueuedInputGatesWithData.remove(inputGate);
-				moreInputGatesAvailable = enqueuedInputGatesWithData.size() > 0;
-			}
+				final InputGate inputGate = inputGatesWithData.remove();
+
+				// In case of inputGatesWithData being inaccurate do not block on an empty inputGate, but just poll the data.
+				Optional<BufferOrEvent> bufferOrEvent = inputGate.pollNextBufferOrEvent();
 
-			// In case of inputGatesWithData being inaccurate do not block on an empty inputGate, but just poll the data.
-			Optional<BufferOrEvent> bufferOrEvent = inputGate.pollNextBufferOrEvent();
-			if (bufferOrEvent.isPresent()) {
-				return Optional.of(new InputGateWithData(inputGate, bufferOrEvent.get(), moreInputGatesAvailable));
+				if (bufferOrEvent.isPresent() && bufferOrEvent.get().moreAvailable()) {
+					// enqueue the inputGate at the end to avoid starvation
+					inputGatesWithData.add(inputGate);
+				} else {
+					enqueuedInputGatesWithData.remove(inputGate);
+				}
+
+				if (bufferOrEvent.isPresent()) {
+					return Optional.of(new InputGateWithData(
+						inputGate,
+						bufferOrEvent.get(),
+						!enqueuedInputGatesWithData.isEmpty()));
+				}
 			}
 		}
 	}