You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/05/10 13:23:39 UTC

[flink] 05/08: [hotfix][network] Replace inputGatesWithData and enqueuedInputGatesWithData fields with single LinkedHashSet

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a0751329da85dba31c5959c78cee7598dcddaabe
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Tue May 7 14:28:21 2019 +0200

    [hotfix][network] Replace inputGatesWithData and enqueuedInputGatesWithData fields with single LinkedHashSet
---
 .../network/partition/consumer/UnionInputGate.java | 24 ++++++++++------------
 1 file changed, 11 insertions(+), 13 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
index fcae79f..7d457ee 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
@@ -25,8 +25,8 @@ import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
 import org.apache.flink.shaded.guava18.com.google.common.collect.Sets;
 
 import java.io.IOException;
-import java.util.ArrayDeque;
-import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
@@ -70,13 +70,11 @@ public class UnionInputGate implements InputGate, InputGateListener {
 
 	private final Set<InputGate> inputGatesWithRemainingData;
 
-	/** Gates, which notified this input gate about available data. */
-	private final ArrayDeque<InputGate> inputGatesWithData = new ArrayDeque<>();
-
 	/**
-	 * Guardian against enqueuing an {@link InputGate} multiple times on {@code inputGatesWithData}.
+	 * Gates, which notified this input gate about available data. We are using it as a FIFO
+	 * queue of {@link InputGate}s to avoid starvation and provide some basic fairness.
 	 */
-	private final Set<InputGate> enqueuedInputGatesWithData = new HashSet<>();
+	private final LinkedHashSet<InputGate> inputGatesWithData = new LinkedHashSet<>();
 
 	/** The total number of input channels across all unioned input gates. */
 	private final int totalNumberOfInputChannels;
@@ -214,7 +212,10 @@ public class UnionInputGate implements InputGate, InputGateListener {
 						return Optional.empty();
 					}
 				}
-				final InputGate inputGate = inputGatesWithData.remove();
+
+				Iterator<InputGate> inputGateIterator = inputGatesWithData.iterator();
+				final InputGate inputGate = inputGateIterator.next();
+				inputGateIterator.remove();
 
 				// In case of inputGatesWithData being inaccurate do not block on an empty inputGate, but just poll the data.
 				Optional<BufferOrEvent> bufferOrEvent = inputGate.pollNextBufferOrEvent();
@@ -222,15 +223,13 @@ public class UnionInputGate implements InputGate, InputGateListener {
 				if (bufferOrEvent.isPresent() && bufferOrEvent.get().moreAvailable()) {
 					// enqueue the inputGate at the end to avoid starvation
 					inputGatesWithData.add(inputGate);
-				} else {
-					enqueuedInputGatesWithData.remove(inputGate);
 				}
 
 				if (bufferOrEvent.isPresent()) {
 					return Optional.of(new InputGateWithData(
 						inputGate,
 						bufferOrEvent.get(),
-						!enqueuedInputGatesWithData.isEmpty()));
+						!inputGatesWithData.isEmpty()));
 				}
 			}
 		}
@@ -290,14 +289,13 @@ public class UnionInputGate implements InputGate, InputGateListener {
 		int availableInputGates;
 
 		synchronized (inputGatesWithData) {
-			if (enqueuedInputGatesWithData.contains(inputGate)) {
+			if (inputGatesWithData.contains(inputGate)) {
 				return;
 			}
 
 			availableInputGates = inputGatesWithData.size();
 
 			inputGatesWithData.add(inputGate);
-			enqueuedInputGatesWithData.add(inputGate);
 
 			if (availableInputGates == 0) {
 				inputGatesWithData.notifyAll();