You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/05/10 13:23:37 UTC
[flink] 03/08: [hotfix][network] Refactor and simplify
InputGate#getNextBufferOrEvent
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 5b84dbf423a380e8e6d44694cdeb06eb968353aa
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Tue May 7 12:26:23 2019 +0200
[hotfix][network] Refactor and simplify InputGate#getNextBufferOrEvent
Previously in case of more data available, re-enquing a channel or an inputGate
was done in a separate critical section, resulting with more complicated concurrency
contract (critical section split into two). Side effect of this change
is that now recursive getNextBuffer/pollNextBufferOrEvent are happening also
under the lock, however they are non-blocking, so this shouldn't cause any issues.
---
.../partition/consumer/SingleInputGate.java | 21 +++++++------
.../network/partition/consumer/UnionInputGate.java | 34 +++++++++++-----------
2 files changed, 27 insertions(+), 28 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index d40af83..750e678 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -552,21 +552,20 @@ public class SingleInputGate implements InputGate {
}
currentChannel = inputChannelsWithData.remove();
- enqueuedInputChannelsWithData.clear(currentChannel.getChannelIndex());
+
+ result = currentChannel.getNextBuffer();
+
+ if (result.isPresent() && result.get().moreAvailable()) {
+ // enqueue the currentChannel at the end to avoid starvation
+ inputChannelsWithData.add(currentChannel);
+ } else {
+ enqueuedInputChannelsWithData.clear(currentChannel.getChannelIndex());
+ }
+
moreAvailable = !inputChannelsWithData.isEmpty();
}
-
- result = currentChannel.getNextBuffer();
} while (!result.isPresent());
- // this channel was now removed from the non-empty channels queue
- // we re-add it in case it has more data, because in that case no "non-empty" notification
- // will come for that channel
- if (result.get().moreAvailable()) {
- queueChannel(currentChannel);
- moreAvailable = true;
- }
-
final Buffer buffer = result.get().buffer();
numBytesIn.inc(buffer.getSizeUnsafe());
if (buffer.isBuffer()) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
index 196743e..fcae79f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
@@ -184,13 +184,6 @@ public class UnionInputGate implements InputGate, InputGateListener {
InputGate inputGate = inputGateWithData.inputGate;
BufferOrEvent bufferOrEvent = inputGateWithData.bufferOrEvent;
- if (bufferOrEvent.moreAvailable()) {
- // this buffer or event was now removed from the non-empty gates queue
- // we re-add it in case it has more data, because in that case no "non-empty" notification
- // will come for that gate
- queueInputGate(inputGate);
- }
-
if (bufferOrEvent.isEvent()
&& bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class
&& inputGate.isFinished()) {
@@ -213,8 +206,6 @@ public class UnionInputGate implements InputGate, InputGateListener {
private Optional<InputGateWithData> waitAndGetNextInputGate(boolean blocking) throws IOException, InterruptedException {
while (true) {
- InputGate inputGate;
- boolean moreInputGatesAvailable;
synchronized (inputGatesWithData) {
while (inputGatesWithData.size() == 0) {
if (blocking) {
@@ -223,15 +214,24 @@ public class UnionInputGate implements InputGate, InputGateListener {
return Optional.empty();
}
}
- inputGate = inputGatesWithData.remove();
- enqueuedInputGatesWithData.remove(inputGate);
- moreInputGatesAvailable = enqueuedInputGatesWithData.size() > 0;
- }
+ final InputGate inputGate = inputGatesWithData.remove();
+
+ // In case of inputGatesWithData being inaccurate do not block on an empty inputGate, but just poll the data.
+ Optional<BufferOrEvent> bufferOrEvent = inputGate.pollNextBufferOrEvent();
- // In case of inputGatesWithData being inaccurate do not block on an empty inputGate, but just poll the data.
- Optional<BufferOrEvent> bufferOrEvent = inputGate.pollNextBufferOrEvent();
- if (bufferOrEvent.isPresent()) {
- return Optional.of(new InputGateWithData(inputGate, bufferOrEvent.get(), moreInputGatesAvailable));
+ if (bufferOrEvent.isPresent() && bufferOrEvent.get().moreAvailable()) {
+ // enqueue the inputGate at the end to avoid starvation
+ inputGatesWithData.add(inputGate);
+ } else {
+ enqueuedInputGatesWithData.remove(inputGate);
+ }
+
+ if (bufferOrEvent.isPresent()) {
+ return Optional.of(new InputGateWithData(
+ inputGate,
+ bufferOrEvent.get(),
+ !enqueuedInputGatesWithData.isEmpty()));
+ }
}
}
}