You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/05/10 13:23:41 UTC

[flink] 07/08: [hotfix][network] Refactor InputGates code

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 261efa8f6a321a9303705c35681c0d67880213bb
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Thu May 9 10:42:33 2019 +0200

    [hotfix][network] Refactor InputGates code
---
 .../io/network/partition/consumer/InputGate.java   | 17 ++++++
 .../partition/consumer/SingleInputGate.java        | 51 +++++++++++------
 .../network/partition/consumer/UnionInputGate.java | 64 +++++++++++-----------
 3 files changed, 83 insertions(+), 49 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
index 83e18e9..03ac822 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
@@ -24,6 +24,8 @@ import java.io.IOException;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * An input gate consumes one or more partitions of a single produced intermediate result.
  *
@@ -112,4 +114,19 @@ public abstract class InputGate implements AutoCloseable {
 			isAvailable = new CompletableFuture<>();
 		}
 	}
+
+	/**
+	 * Simple pojo for INPUT, DATA and moreAvailable.
+	 */
+	protected static class InputWithData<INPUT, DATA> {
+		protected final INPUT input;
+		protected final DATA data;
+		protected final boolean moreAvailable;
+
+		InputWithData(INPUT input, DATA data, boolean moreAvailable) {
+			this.input = checkNotNull(input);
+			this.data = checkNotNull(data);
+			this.moreAvailable = moreAvailable;
+		}
+	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index c0830fa..19912b2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -529,12 +529,21 @@ public class SingleInputGate extends InputGate {
 		}
 
 		requestPartitions();
+		Optional<InputWithData<InputChannel, BufferAndAvailability>> next = waitAndGetNextData(blocking);
+		if (!next.isPresent()) {
+			return Optional.empty();
+		}
 
-		InputChannel currentChannel;
-		boolean moreAvailable;
-		Optional<BufferAndAvailability> result = Optional.empty();
+		InputWithData<InputChannel, BufferAndAvailability> inputWithData = next.get();
+		return Optional.of(transformToBufferOrEvent(
+			inputWithData.data.buffer(),
+			inputWithData.moreAvailable,
+			inputWithData.input));
+	}
 
-		do {
+	private Optional<InputWithData<InputChannel, BufferAndAvailability>> waitAndGetNextData(boolean blocking)
+			throws IOException, InterruptedException {
+		while (true) {
 			synchronized (inputChannelsWithData) {
 				while (inputChannelsWithData.size() == 0) {
 					if (isReleased) {
@@ -550,29 +559,38 @@ public class SingleInputGate extends InputGate {
 					}
 				}
 
-				currentChannel = inputChannelsWithData.remove();
+				InputChannel inputChannel = inputChannelsWithData.remove();
 
-				result = currentChannel.getNextBuffer();
+				Optional<BufferAndAvailability> result = inputChannel.getNextBuffer();
 
 				if (result.isPresent() && result.get().moreAvailable()) {
-					// enqueue the currentChannel at the end to avoid starvation
-					inputChannelsWithData.add(currentChannel);
+					// enqueue the inputChannel at the end to avoid starvation
+					inputChannelsWithData.add(inputChannel);
 				} else {
-					enqueuedInputChannelsWithData.clear(currentChannel.getChannelIndex());
+					enqueuedInputChannelsWithData.clear(inputChannel.getChannelIndex());
 				}
 
-				moreAvailable = !inputChannelsWithData.isEmpty();
-
-				if (!moreAvailable) {
+				if (inputChannelsWithData.isEmpty()) {
 					resetIsAvailable();
 				}
+
+				if (result.isPresent()) {
+					return Optional.of(new InputWithData<>(
+						inputChannel,
+						result.get(),
+						!inputChannelsWithData.isEmpty()));
+				}
 			}
-		} while (!result.isPresent());
+		}
+	}
 
-		final Buffer buffer = result.get().buffer();
+	private BufferOrEvent transformToBufferOrEvent(
+			Buffer buffer,
+			boolean moreAvailable,
+			InputChannel currentChannel) throws IOException, InterruptedException {
 		numBytesIn.inc(buffer.getSizeUnsafe());
 		if (buffer.isBuffer()) {
-			return Optional.of(new BufferOrEvent(buffer, currentChannel.getChannelIndex(), moreAvailable));
+			return new BufferOrEvent(buffer, currentChannel.getChannelIndex(), moreAvailable);
 		}
 		else {
 			final AbstractEvent event = EventSerializer.fromBuffer(buffer, getClass().getClassLoader());
@@ -591,11 +609,10 @@ public class SingleInputGate extends InputGate {
 				}
 
 				currentChannel.notifySubpartitionConsumed();
-
 				currentChannel.releaseAllResources();
 			}
 
-			return Optional.of(new BufferOrEvent(event, currentChannel.getChannelIndex(), moreAvailable));
+			return new BufferOrEvent(event, currentChannel.getChannelIndex(), moreAvailable);
 		}
 	}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
index 986d1bb..5019cfc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
@@ -182,36 +182,22 @@ public class UnionInputGate extends InputGate {
 		// Make sure to request the partitions, if they have not been requested before.
 		requestPartitions();
 
-		Optional<InputGateWithData> next = waitAndGetNextInputGate(blocking);
+		Optional<InputWithData<InputGate, BufferOrEvent>> next = waitAndGetNextData(blocking);
 		if (!next.isPresent()) {
 			return Optional.empty();
 		}
 
-		InputGateWithData inputGateWithData = next.get();
-		InputGate inputGate = inputGateWithData.inputGate;
-		BufferOrEvent bufferOrEvent = inputGateWithData.bufferOrEvent;
+		InputWithData<InputGate, BufferOrEvent> inputWithData = next.get();
 
-		if (bufferOrEvent.isEvent()
-			&& bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class
-			&& inputGate.isFinished()) {
-
-			checkState(!bufferOrEvent.moreAvailable());
-			if (!inputGatesWithRemainingData.remove(inputGate)) {
-				throw new IllegalStateException("Couldn't find input gate in set of remaining " +
-					"input gates.");
-			}
-		}
-
-		// Set the channel index to identify the input channel (across all unioned input gates)
-		final int channelIndexOffset = inputGateToIndexOffsetMap.get(inputGate);
-
-		bufferOrEvent.setChannelIndex(channelIndexOffset + bufferOrEvent.getChannelIndex());
-		bufferOrEvent.setMoreAvailable(bufferOrEvent.moreAvailable() || inputGateWithData.moreInputGatesAvailable);
-
-		return Optional.of(bufferOrEvent);
+		handleEndOfPartitionEvent(inputWithData.data, inputWithData.input);
+		return Optional.of(adjustForUnionInputGate(
+			inputWithData.data,
+			inputWithData.input,
+			inputWithData.moreAvailable));
 	}
 
-	private Optional<InputGateWithData> waitAndGetNextInputGate(boolean blocking) throws IOException, InterruptedException {
+	private Optional<InputWithData<InputGate, BufferOrEvent>> waitAndGetNextData(boolean blocking)
+			throws IOException, InterruptedException {
 		while (true) {
 			synchronized (inputGatesWithData) {
 				while (inputGatesWithData.size() == 0) {
@@ -242,7 +228,7 @@ public class UnionInputGate extends InputGate {
 				}
 
 				if (bufferOrEvent.isPresent()) {
-					return Optional.of(new InputGateWithData(
+					return Optional.of(new InputWithData<>(
 						inputGate,
 						bufferOrEvent.get(),
 						!inputGatesWithData.isEmpty()));
@@ -251,15 +237,29 @@ public class UnionInputGate extends InputGate {
 		}
 	}
 
-	private static class InputGateWithData {
-		private final InputGate inputGate;
-		private final BufferOrEvent bufferOrEvent;
-		private final boolean moreInputGatesAvailable;
+	private BufferOrEvent adjustForUnionInputGate(
+		BufferOrEvent bufferOrEvent,
+		InputGate inputGate,
+		boolean moreInputGatesAvailable) {
+		// Set the channel index to identify the input channel (across all unioned input gates)
+		final int channelIndexOffset = inputGateToIndexOffsetMap.get(inputGate);
+
+		bufferOrEvent.setChannelIndex(channelIndexOffset + bufferOrEvent.getChannelIndex());
+		bufferOrEvent.setMoreAvailable(bufferOrEvent.moreAvailable() || moreInputGatesAvailable);
 
-		InputGateWithData(InputGate inputGate, BufferOrEvent bufferOrEvent, boolean moreInputGatesAvailable) {
-			this.inputGate = checkNotNull(inputGate);
-			this.bufferOrEvent = checkNotNull(bufferOrEvent);
-			this.moreInputGatesAvailable = moreInputGatesAvailable;
+		return bufferOrEvent;
+	}
+
+	private void handleEndOfPartitionEvent(BufferOrEvent bufferOrEvent, InputGate inputGate) {
+		if (bufferOrEvent.isEvent()
+			&& bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class
+			&& inputGate.isFinished()) {
+
+			checkState(!bufferOrEvent.moreAvailable());
+			if (!inputGatesWithRemainingData.remove(inputGate)) {
+				throw new IllegalStateException("Couldn't find input gate in set of remaining " +
+					"input gates.");
+			}
 		}
 	}