You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/05/10 13:23:41 UTC
[flink] 07/08: [hotfix][network] Refactor InputGates code
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 261efa8f6a321a9303705c35681c0d67880213bb
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Thu May 9 10:42:33 2019 +0200
[hotfix][network] Refactor InputGates code
---
.../io/network/partition/consumer/InputGate.java | 17 ++++++
.../partition/consumer/SingleInputGate.java | 51 +++++++++++------
.../network/partition/consumer/UnionInputGate.java | 64 +++++++++++-----------
3 files changed, 83 insertions(+), 49 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
index 83e18e9..03ac822 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
@@ -24,6 +24,8 @@ import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
* An input gate consumes one or more partitions of a single produced intermediate result.
*
@@ -112,4 +114,19 @@ public abstract class InputGate implements AutoCloseable {
isAvailable = new CompletableFuture<>();
}
}
+
+ /**
+ * Simple pojo for INPUT, DATA and moreAvailable.
+ */
+ protected static class InputWithData<INPUT, DATA> {
+ protected final INPUT input;
+ protected final DATA data;
+ protected final boolean moreAvailable;
+
+ InputWithData(INPUT input, DATA data, boolean moreAvailable) {
+ this.input = checkNotNull(input);
+ this.data = checkNotNull(data);
+ this.moreAvailable = moreAvailable;
+ }
+ }
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index c0830fa..19912b2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -529,12 +529,21 @@ public class SingleInputGate extends InputGate {
}
requestPartitions();
+ Optional<InputWithData<InputChannel, BufferAndAvailability>> next = waitAndGetNextData(blocking);
+ if (!next.isPresent()) {
+ return Optional.empty();
+ }
- InputChannel currentChannel;
- boolean moreAvailable;
- Optional<BufferAndAvailability> result = Optional.empty();
+ InputWithData<InputChannel, BufferAndAvailability> inputWithData = next.get();
+ return Optional.of(transformToBufferOrEvent(
+ inputWithData.data.buffer(),
+ inputWithData.moreAvailable,
+ inputWithData.input));
+ }
- do {
+ private Optional<InputWithData<InputChannel, BufferAndAvailability>> waitAndGetNextData(boolean blocking)
+ throws IOException, InterruptedException {
+ while (true) {
synchronized (inputChannelsWithData) {
while (inputChannelsWithData.size() == 0) {
if (isReleased) {
@@ -550,29 +559,38 @@ public class SingleInputGate extends InputGate {
}
}
- currentChannel = inputChannelsWithData.remove();
+ InputChannel inputChannel = inputChannelsWithData.remove();
- result = currentChannel.getNextBuffer();
+ Optional<BufferAndAvailability> result = inputChannel.getNextBuffer();
if (result.isPresent() && result.get().moreAvailable()) {
- // enqueue the currentChannel at the end to avoid starvation
- inputChannelsWithData.add(currentChannel);
+ // enqueue the inputChannel at the end to avoid starvation
+ inputChannelsWithData.add(inputChannel);
} else {
- enqueuedInputChannelsWithData.clear(currentChannel.getChannelIndex());
+ enqueuedInputChannelsWithData.clear(inputChannel.getChannelIndex());
}
- moreAvailable = !inputChannelsWithData.isEmpty();
-
- if (!moreAvailable) {
+ if (inputChannelsWithData.isEmpty()) {
resetIsAvailable();
}
+
+ if (result.isPresent()) {
+ return Optional.of(new InputWithData<>(
+ inputChannel,
+ result.get(),
+ !inputChannelsWithData.isEmpty()));
+ }
}
- } while (!result.isPresent());
+ }
+ }
- final Buffer buffer = result.get().buffer();
+ private BufferOrEvent transformToBufferOrEvent(
+ Buffer buffer,
+ boolean moreAvailable,
+ InputChannel currentChannel) throws IOException, InterruptedException {
numBytesIn.inc(buffer.getSizeUnsafe());
if (buffer.isBuffer()) {
- return Optional.of(new BufferOrEvent(buffer, currentChannel.getChannelIndex(), moreAvailable));
+ return new BufferOrEvent(buffer, currentChannel.getChannelIndex(), moreAvailable);
}
else {
final AbstractEvent event = EventSerializer.fromBuffer(buffer, getClass().getClassLoader());
@@ -591,11 +609,10 @@ public class SingleInputGate extends InputGate {
}
currentChannel.notifySubpartitionConsumed();
-
currentChannel.releaseAllResources();
}
- return Optional.of(new BufferOrEvent(event, currentChannel.getChannelIndex(), moreAvailable));
+ return new BufferOrEvent(event, currentChannel.getChannelIndex(), moreAvailable);
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
index 986d1bb..5019cfc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
@@ -182,36 +182,22 @@ public class UnionInputGate extends InputGate {
// Make sure to request the partitions, if they have not been requested before.
requestPartitions();
- Optional<InputGateWithData> next = waitAndGetNextInputGate(blocking);
+ Optional<InputWithData<InputGate, BufferOrEvent>> next = waitAndGetNextData(blocking);
if (!next.isPresent()) {
return Optional.empty();
}
- InputGateWithData inputGateWithData = next.get();
- InputGate inputGate = inputGateWithData.inputGate;
- BufferOrEvent bufferOrEvent = inputGateWithData.bufferOrEvent;
+ InputWithData<InputGate, BufferOrEvent> inputWithData = next.get();
- if (bufferOrEvent.isEvent()
- && bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class
- && inputGate.isFinished()) {
-
- checkState(!bufferOrEvent.moreAvailable());
- if (!inputGatesWithRemainingData.remove(inputGate)) {
- throw new IllegalStateException("Couldn't find input gate in set of remaining " +
- "input gates.");
- }
- }
-
- // Set the channel index to identify the input channel (across all unioned input gates)
- final int channelIndexOffset = inputGateToIndexOffsetMap.get(inputGate);
-
- bufferOrEvent.setChannelIndex(channelIndexOffset + bufferOrEvent.getChannelIndex());
- bufferOrEvent.setMoreAvailable(bufferOrEvent.moreAvailable() || inputGateWithData.moreInputGatesAvailable);
-
- return Optional.of(bufferOrEvent);
+ handleEndOfPartitionEvent(inputWithData.data, inputWithData.input);
+ return Optional.of(adjustForUnionInputGate(
+ inputWithData.data,
+ inputWithData.input,
+ inputWithData.moreAvailable));
}
- private Optional<InputGateWithData> waitAndGetNextInputGate(boolean blocking) throws IOException, InterruptedException {
+ private Optional<InputWithData<InputGate, BufferOrEvent>> waitAndGetNextData(boolean blocking)
+ throws IOException, InterruptedException {
while (true) {
synchronized (inputGatesWithData) {
while (inputGatesWithData.size() == 0) {
@@ -242,7 +228,7 @@ public class UnionInputGate extends InputGate {
}
if (bufferOrEvent.isPresent()) {
- return Optional.of(new InputGateWithData(
+ return Optional.of(new InputWithData<>(
inputGate,
bufferOrEvent.get(),
!inputGatesWithData.isEmpty()));
@@ -251,15 +237,29 @@ public class UnionInputGate extends InputGate {
}
}
- private static class InputGateWithData {
- private final InputGate inputGate;
- private final BufferOrEvent bufferOrEvent;
- private final boolean moreInputGatesAvailable;
+ private BufferOrEvent adjustForUnionInputGate(
+ BufferOrEvent bufferOrEvent,
+ InputGate inputGate,
+ boolean moreInputGatesAvailable) {
+ // Set the channel index to identify the input channel (across all unioned input gates)
+ final int channelIndexOffset = inputGateToIndexOffsetMap.get(inputGate);
+
+ bufferOrEvent.setChannelIndex(channelIndexOffset + bufferOrEvent.getChannelIndex());
+ bufferOrEvent.setMoreAvailable(bufferOrEvent.moreAvailable() || moreInputGatesAvailable);
- InputGateWithData(InputGate inputGate, BufferOrEvent bufferOrEvent, boolean moreInputGatesAvailable) {
- this.inputGate = checkNotNull(inputGate);
- this.bufferOrEvent = checkNotNull(bufferOrEvent);
- this.moreInputGatesAvailable = moreInputGatesAvailable;
+ return bufferOrEvent;
+ }
+
+ private void handleEndOfPartitionEvent(BufferOrEvent bufferOrEvent, InputGate inputGate) {
+ if (bufferOrEvent.isEvent()
+ && bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class
+ && inputGate.isFinished()) {
+
+ checkState(!bufferOrEvent.moreAvailable());
+ if (!inputGatesWithRemainingData.remove(inputGate)) {
+ throw new IllegalStateException("Couldn't find input gate in set of remaining " +
+ "input gates.");
+ }
}
}