You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/05/10 13:23:36 UTC

[flink] 02/08: [hotfix][network] Implement UnionInputGate#pollNextBufferOrEvent method

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit cd13fdfa0c8b0db7c0087d9fe40f1522ccb5d122
Author: sunhaibotb <su...@163.com>
AuthorDate: Tue May 7 13:52:50 2019 +0200

    [hotfix][network] Implement UnionInputGate#pollNextBufferOrEvent method
---
 .../network/partition/consumer/UnionInputGate.java | 31 +++++++++++++++-------
 .../partition/consumer/SingleInputGateTest.java    | 12 +--------
 2 files changed, 23 insertions(+), 20 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
index ea83004..196743e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
@@ -159,6 +159,15 @@ public class UnionInputGate implements InputGate, InputGateListener {
 
 	@Override
 	public Optional<BufferOrEvent> getNextBufferOrEvent() throws IOException, InterruptedException {
+		return getNextBufferOrEvent(true);
+	}
+
+	@Override
+	public Optional<BufferOrEvent> pollNextBufferOrEvent() throws IOException, InterruptedException {
+		return getNextBufferOrEvent(false);
+	}
+
+	private Optional<BufferOrEvent> getNextBufferOrEvent(boolean blocking) throws IOException, InterruptedException {
 		if (inputGatesWithRemainingData.isEmpty()) {
 			return Optional.empty();
 		}
@@ -166,7 +175,12 @@ public class UnionInputGate implements InputGate, InputGateListener {
 		// Make sure to request the partitions, if they have not been requested before.
 		requestPartitions();
 
-		InputGateWithData inputGateWithData = waitAndGetNextInputGate();
+		Optional<InputGateWithData> next = waitAndGetNextInputGate(blocking);
+		if (!next.isPresent()) {
+			return Optional.empty();
+		}
+
+		InputGateWithData inputGateWithData = next.get();
 		InputGate inputGate = inputGateWithData.inputGate;
 		BufferOrEvent bufferOrEvent = inputGateWithData.bufferOrEvent;
 
@@ -197,18 +211,17 @@ public class UnionInputGate implements InputGate, InputGateListener {
 		return Optional.of(bufferOrEvent);
 	}
 
-	@Override
-	public Optional<BufferOrEvent> pollNextBufferOrEvent() throws UnsupportedOperationException {
-		throw new UnsupportedOperationException();
-	}
-
-	private InputGateWithData waitAndGetNextInputGate() throws IOException, InterruptedException {
+	private Optional<InputGateWithData> waitAndGetNextInputGate(boolean blocking) throws IOException, InterruptedException {
 		while (true) {
 			InputGate inputGate;
 			boolean moreInputGatesAvailable;
 			synchronized (inputGatesWithData) {
 				while (inputGatesWithData.size() == 0) {
-					inputGatesWithData.wait();
+					if (blocking) {
+						inputGatesWithData.wait();
+					} else {
+						return Optional.empty();
+					}
 				}
 				inputGate = inputGatesWithData.remove();
 				enqueuedInputGatesWithData.remove(inputGate);
@@ -218,7 +231,7 @@ public class UnionInputGate implements InputGate, InputGateListener {
 			// In case of inputGatesWithData being inaccurate do not block on an empty inputGate, but just poll the data.
 			Optional<BufferOrEvent> bufferOrEvent = inputGate.pollNextBufferOrEvent();
 			if (bufferOrEvent.isPresent()) {
-				return new InputGateWithData(inputGate, bufferOrEvent.get(), moreInputGatesAvailable);
+				return Optional.of(new InputGateWithData(inputGate, bufferOrEvent.get(), moreInputGatesAvailable));
 			}
 		}
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 71e4f5a..d82d571 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -619,17 +619,7 @@ public class SingleInputGateTest {
 		assertEquals(expectedChannelIndex, bufferOrEvent.get().getChannelIndex());
 		assertEquals(expectedMoreAvailable, bufferOrEvent.get().moreAvailable());
 		if (!expectedMoreAvailable) {
-			try {
-				assertFalse(inputGate.pollNextBufferOrEvent().isPresent());
-			}
-			catch (UnsupportedOperationException ex) {
-				/**
-				 * {@link UnionInputGate#pollNextBufferOrEvent()} is unsupported at the moment.
-				 */
-				if (!(inputGate instanceof UnionInputGate)) {
-					throw ex;
-				}
-			}
+			assertFalse(inputGate.pollNextBufferOrEvent().isPresent());
 		}
 	}
 }