You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/05/10 13:23:36 UTC
[flink] 02/08: [hotfix][network] Implement
UnionInputGate#pollNextBufferOrEvent method
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit cd13fdfa0c8b0db7c0087d9fe40f1522ccb5d122
Author: sunhaibotb <su...@163.com>
AuthorDate: Tue May 7 13:52:50 2019 +0200
[hotfix][network] Implement UnionInputGate#pollNextBufferOrEvent method
---
.../network/partition/consumer/UnionInputGate.java | 31 +++++++++++++++-------
.../partition/consumer/SingleInputGateTest.java | 12 +--------
2 files changed, 23 insertions(+), 20 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
index ea83004..196743e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
@@ -159,6 +159,15 @@ public class UnionInputGate implements InputGate, InputGateListener {
@Override
public Optional<BufferOrEvent> getNextBufferOrEvent() throws IOException, InterruptedException {
+ return getNextBufferOrEvent(true);
+ }
+
+ @Override
+ public Optional<BufferOrEvent> pollNextBufferOrEvent() throws IOException, InterruptedException {
+ return getNextBufferOrEvent(false);
+ }
+
+ private Optional<BufferOrEvent> getNextBufferOrEvent(boolean blocking) throws IOException, InterruptedException {
if (inputGatesWithRemainingData.isEmpty()) {
return Optional.empty();
}
@@ -166,7 +175,12 @@ public class UnionInputGate implements InputGate, InputGateListener {
// Make sure to request the partitions, if they have not been requested before.
requestPartitions();
- InputGateWithData inputGateWithData = waitAndGetNextInputGate();
+ Optional<InputGateWithData> next = waitAndGetNextInputGate(blocking);
+ if (!next.isPresent()) {
+ return Optional.empty();
+ }
+
+ InputGateWithData inputGateWithData = next.get();
InputGate inputGate = inputGateWithData.inputGate;
BufferOrEvent bufferOrEvent = inputGateWithData.bufferOrEvent;
@@ -197,18 +211,17 @@ public class UnionInputGate implements InputGate, InputGateListener {
return Optional.of(bufferOrEvent);
}
- @Override
- public Optional<BufferOrEvent> pollNextBufferOrEvent() throws UnsupportedOperationException {
- throw new UnsupportedOperationException();
- }
-
- private InputGateWithData waitAndGetNextInputGate() throws IOException, InterruptedException {
+ private Optional<InputGateWithData> waitAndGetNextInputGate(boolean blocking) throws IOException, InterruptedException {
while (true) {
InputGate inputGate;
boolean moreInputGatesAvailable;
synchronized (inputGatesWithData) {
while (inputGatesWithData.size() == 0) {
- inputGatesWithData.wait();
+ if (blocking) {
+ inputGatesWithData.wait();
+ } else {
+ return Optional.empty();
+ }
}
inputGate = inputGatesWithData.remove();
enqueuedInputGatesWithData.remove(inputGate);
@@ -218,7 +231,7 @@ public class UnionInputGate implements InputGate, InputGateListener {
// In case of inputGatesWithData being inaccurate do not block on an empty inputGate, but just poll the data.
Optional<BufferOrEvent> bufferOrEvent = inputGate.pollNextBufferOrEvent();
if (bufferOrEvent.isPresent()) {
- return new InputGateWithData(inputGate, bufferOrEvent.get(), moreInputGatesAvailable);
+ return Optional.of(new InputGateWithData(inputGate, bufferOrEvent.get(), moreInputGatesAvailable));
}
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 71e4f5a..d82d571 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -619,17 +619,7 @@ public class SingleInputGateTest {
assertEquals(expectedChannelIndex, bufferOrEvent.get().getChannelIndex());
assertEquals(expectedMoreAvailable, bufferOrEvent.get().moreAvailable());
if (!expectedMoreAvailable) {
- try {
- assertFalse(inputGate.pollNextBufferOrEvent().isPresent());
- }
- catch (UnsupportedOperationException ex) {
- /**
- * {@link UnionInputGate#pollNextBufferOrEvent()} is unsupported at the moment.
- */
- if (!(inputGate instanceof UnionInputGate)) {
- throw ex;
- }
- }
+ assertFalse(inputGate.pollNextBufferOrEvent().isPresent());
}
}
}