You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/05/10 13:23:39 UTC
[flink] 05/08: [hotfix][network] Replace inputGatesWithData and
enqueuedInputGatesWithData fields with single LinkedHashSet
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit a0751329da85dba31c5959c78cee7598dcddaabe
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Tue May 7 14:28:21 2019 +0200
[hotfix][network] Replace inputGatesWithData and enqueuedInputGatesWithData fields with single LinkedHashSet
---
.../network/partition/consumer/UnionInputGate.java | 24 ++++++++++------------
1 file changed, 11 insertions(+), 13 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
index fcae79f..7d457ee 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
@@ -25,8 +25,8 @@ import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
import org.apache.flink.shaded.guava18.com.google.common.collect.Sets;
import java.io.IOException;
-import java.util.ArrayDeque;
-import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@@ -70,13 +70,11 @@ public class UnionInputGate implements InputGate, InputGateListener {
private final Set<InputGate> inputGatesWithRemainingData;
- /** Gates, which notified this input gate about available data. */
- private final ArrayDeque<InputGate> inputGatesWithData = new ArrayDeque<>();
-
/**
- * Guardian against enqueuing an {@link InputGate} multiple times on {@code inputGatesWithData}.
+ * Gates, which notified this input gate about available data. We are using it as a FIFO
+ * queue of {@link InputGate}s to avoid starvation and provide some basic fairness.
*/
- private final Set<InputGate> enqueuedInputGatesWithData = new HashSet<>();
+ private final LinkedHashSet<InputGate> inputGatesWithData = new LinkedHashSet<>();
/** The total number of input channels across all unioned input gates. */
private final int totalNumberOfInputChannels;
@@ -214,7 +212,10 @@ public class UnionInputGate implements InputGate, InputGateListener {
return Optional.empty();
}
}
- final InputGate inputGate = inputGatesWithData.remove();
+
+ Iterator<InputGate> inputGateIterator = inputGatesWithData.iterator();
+ final InputGate inputGate = inputGateIterator.next();
+ inputGateIterator.remove();
// In case of inputGatesWithData being inaccurate do not block on an empty inputGate, but just poll the data.
Optional<BufferOrEvent> bufferOrEvent = inputGate.pollNextBufferOrEvent();
@@ -222,15 +223,13 @@ public class UnionInputGate implements InputGate, InputGateListener {
if (bufferOrEvent.isPresent() && bufferOrEvent.get().moreAvailable()) {
// enqueue the inputGate at the end to avoid starvation
inputGatesWithData.add(inputGate);
- } else {
- enqueuedInputGatesWithData.remove(inputGate);
}
if (bufferOrEvent.isPresent()) {
return Optional.of(new InputGateWithData(
inputGate,
bufferOrEvent.get(),
- !enqueuedInputGatesWithData.isEmpty()));
+ !inputGatesWithData.isEmpty()));
}
}
}
@@ -290,14 +289,13 @@ public class UnionInputGate implements InputGate, InputGateListener {
int availableInputGates;
synchronized (inputGatesWithData) {
- if (enqueuedInputGatesWithData.contains(inputGate)) {
+ if (inputGatesWithData.contains(inputGate)) {
return;
}
availableInputGates = inputGatesWithData.size();
inputGatesWithData.add(inputGate);
- enqueuedInputGatesWithData.add(inputGate);
if (availableInputGates == 0) {
inputGatesWithData.notifyAll();