You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2015/03/24 13:47:36 UTC

flink git commit: [runtime] Return null at input gate after all end-of-partition events

Repository: flink
Updated Branches:
  refs/heads/master 4c71e90e4 -> 8b3cbb525


[runtime] Return null at input gate after all end-of-partition events

Previously, the input gate user needed to be aware whether it is safe to query
the input gate for more data or not. If a user mistakenly queried an input gate
for data, the blocking call never returned.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8b3cbb52
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8b3cbb52
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8b3cbb52

Branch: refs/heads/master
Commit: 8b3cbb52527f77ed43f3035c88c5750abef2c5de
Parents: 4c71e90
Author: Ufuk Celebi <uc...@apache.org>
Authored: Tue Mar 24 11:13:26 2015 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Tue Mar 24 13:47:26 2015 +0100

----------------------------------------------------------------------
 .../partition/consumer/SingleInputGate.java     | 18 ++++++-
 .../partition/consumer/UnionInputGate.java      | 31 +++++++++--
 .../partition/consumer/SingleInputGateTest.java | 56 ++++++++++++++++++++
 .../partition/consumer/UnionInputGateTest.java  | 51 ++++++++++++++----
 .../io/network/util/TestInputChannel.java       |  5 ++
 5 files changed, 144 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8b3cbb52/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 5b97d26..867a4b9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -41,6 +41,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.BitSet;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
@@ -124,12 +125,16 @@ public class SingleInputGate implements InputGate {
 	/** Channels, which notified this input gate about available data. */
 	private final BlockingQueue<InputChannel> inputChannelsWithData = new LinkedBlockingQueue<InputChannel>();
 
+	private final BitSet channelsWithEndOfPartitionEvents;
+
 	/**
 	 * Buffer pool for incoming buffers. Incoming data from remote channels is copied to buffers
 	 * from this pool.
 	 */
 	private BufferPool bufferPool;
 
+	private boolean hasReceivedAllEndOfPartitionEvents;
+
 	/** Flag indicating whether partitions have been requested. */
 	private boolean requestedPartitionsFlag;
 
@@ -153,6 +158,7 @@ public class SingleInputGate implements InputGate {
 		this.numberOfInputChannels = numberOfInputChannels;
 
 		this.inputChannels = Maps.newHashMapWithExpectedSize(numberOfInputChannels);
+		this.channelsWithEndOfPartitionEvents = new BitSet(numberOfInputChannels);
 	}
 
 	// ------------------------------------------------------------------------
@@ -311,8 +317,12 @@ public class SingleInputGate implements InputGate {
 	@Override
 	public BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedException {
 
+		if (hasReceivedAllEndOfPartitionEvents) {
+			return null;
+		}
+
 		if (isReleased) {
-			throw new IllegalStateException("The input has already been consumed. This indicates misuse of the input gate.");
+			throw new IllegalStateException("Already released.");
 		}
 
 		requestPartitions();
@@ -337,6 +347,12 @@ public class SingleInputGate implements InputGate {
 			final AbstractEvent event = EventSerializer.fromBuffer(buffer, getClass().getClassLoader());
 
 			if (event.getClass() == EndOfPartitionEvent.class) {
+				channelsWithEndOfPartitionEvents.set(currentChannel.getChannelIndex());
+
+				if (channelsWithEndOfPartitionEvents.cardinality() == numberOfInputChannels) {
+					hasReceivedAllEndOfPartitionEvents = true;
+				}
+
 				currentChannel.notifySubpartitionConsumed();
 
 				currentChannel.releaseAllResources();

http://git-wip-us.apache.org/repos/asf/flink/blob/8b3cbb52/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
index 5a7a5b0..1f974de 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
@@ -19,12 +19,15 @@
 package org.apache.flink.runtime.io.network.partition.consumer;
 
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.util.event.EventListener;
 
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -34,8 +37,8 @@ import static com.google.common.base.Preconditions.checkNotNull;
 
 /**
  * Input gate wrapper to union the input from multiple input gates.
- * <p>
- * Each input gate has input channels attached from which it reads data. At each input gate, the
+ *
+ * <p> Each input gate has input channels attached from which it reads data. At each input gate, the
  * input channels have unique IDs from 0 (inclusive) to the number of input channels (exclusive).
  *
  * <pre>
@@ -65,6 +68,8 @@ public class UnionInputGate implements InputGate {
 	/** The input gates to union. */
 	private final InputGate[] inputGates;
 
+	private final Set<InputGate> inputGatesWithRemainingData;
+
 	/** Data availability listener across all unioned input gates. */
 	private final InputGateListener inputGateListener;
 
@@ -85,12 +90,14 @@ public class UnionInputGate implements InputGate {
 		checkArgument(inputGates.length > 1, "Union input gate should union at least two input gates.");
 
 		this.inputGateToIndexOffsetMap = Maps.newHashMapWithExpectedSize(inputGates.length);
+		this.inputGatesWithRemainingData = Sets.newHashSetWithExpectedSize(inputGates.length);
 
 		int currentNumberOfInputChannels = 0;
 
 		for (InputGate inputGate : inputGates) {
 			// The offset to use for buffer or event instances received from this input gate.
 			inputGateToIndexOffsetMap.put(checkNotNull(inputGate), currentNumberOfInputChannels);
+			inputGatesWithRemainingData.add(inputGate);
 
 			currentNumberOfInputChannels += inputGate.getNumberOfInputChannels();
 		}
@@ -133,6 +140,10 @@ public class UnionInputGate implements InputGate {
 	@Override
 	public BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedException {
 
+		if (inputGatesWithRemainingData.isEmpty()) {
+			return null;
+		}
+
 		// Make sure to request the partitions, if they have not been requested before.
 		requestPartitions();
 
@@ -140,6 +151,16 @@ public class UnionInputGate implements InputGate {
 
 		final BufferOrEvent bufferOrEvent = inputGate.getNextBufferOrEvent();
 
+		if (bufferOrEvent.isEvent()
+				&& bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class
+				&& inputGate.isFinished()) {
+
+			if (!inputGatesWithRemainingData.remove(inputGate)) {
+				throw new IllegalStateException("Couldn't find input gate in set of remaining " +
+						"input gates.");
+			}
+		}
+
 		// Set the channel index to identify the input channel (across all unioned input gates)
 		final int channelIndexOffset = inputGateToIndexOffsetMap.get(inputGate);
 
@@ -163,9 +184,9 @@ public class UnionInputGate implements InputGate {
 
 	/**
 	 * Data availability listener at all unioned input gates.
-	 * <p>
-	 * The listener registers itself at each input gate and is notified for *each incoming buffer*
-	 * at one of the unioned input gates.
+	 *
+	 * <p> The listener registers itself at each input gate and is notified for *each incoming
+	 * buffer* at one of the unioned input gates.
 	 */
 	private static class InputGateListener implements EventListener<InputGate> {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8b3cbb52/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 6cd5469..e1e3cff 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -32,11 +32,17 @@ import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
+import org.apache.flink.runtime.io.network.util.TestInputChannel;
 import org.apache.flink.runtime.io.network.util.TestTaskEvent;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.junit.Test;
 
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyInt;
 import static org.mockito.Mockito.mock;
@@ -46,6 +52,44 @@ import static org.mockito.Mockito.when;
 
 public class SingleInputGateTest {
 
+	/**
+	 * Tests basic correctness of buffer-or-event interleaving and correct <code>null</code> return
+	 * value after receiving all end-of-partition events.
+	 */
+	@Test(timeout = 120 * 1000)
+	public void testBasicGetNextLogic() throws Exception {
+		// Setup
+		final SingleInputGate inputGate = new SingleInputGate(new IntermediateDataSetID(), 0, 2);
+
+		final TestInputChannel[] inputChannels = new TestInputChannel[]{
+				new TestInputChannel(inputGate, 0),
+				new TestInputChannel(inputGate, 1)
+		};
+
+		inputGate.setInputChannel(
+				new IntermediateResultPartitionID(), inputChannels[0].getInputChannel());
+
+		inputGate.setInputChannel(
+				new IntermediateResultPartitionID(), inputChannels[1].getInputChannel());
+
+		// Test
+		inputChannels[0].readBuffer();
+		inputChannels[0].readBuffer();
+		inputChannels[1].readBuffer();
+		inputChannels[1].readEndOfPartitionEvent();
+		inputChannels[0].readEndOfPartitionEvent();
+
+		verifyBufferOrEvent(inputGate, true, 0);
+		verifyBufferOrEvent(inputGate, true, 0);
+		verifyBufferOrEvent(inputGate, true, 1);
+		verifyBufferOrEvent(inputGate, false, 1);
+		verifyBufferOrEvent(inputGate, false, 0);
+
+		// Return null when the input gate has received all end-of-partition events
+		assertTrue(inputGate.isFinished());
+		assertNull(inputGate.getNextBufferOrEvent());
+	}
+
 	@Test
 	@SuppressWarnings("unchecked")
 	public void testBackwardsEventWithUninitializedChannel() throws Exception {
@@ -101,4 +145,16 @@ public class SingleInputGateTest {
 		verify(partitionManager, times(2)).getSubpartition(any(ResultPartitionID.class), anyInt(), any(Optional.class));
 		verify(taskEventDispatcher, times(2)).publish(any(ResultPartitionID.class), any(TaskEvent.class));
 	}
+
+	// ---------------------------------------------------------------------------------------------
+
+	static void verifyBufferOrEvent(
+			InputGate inputGate,
+			boolean isBuffer,
+			int channelIndex) throws IOException, InterruptedException {
+
+		final BufferOrEvent boe = inputGate.getNextBufferOrEvent();
+		assertEquals(isBuffer, boe.isBuffer());
+		assertEquals(channelIndex, boe.getChannelIndex());
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8b3cbb52/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
index 131c4f6..c7cb413 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
@@ -23,12 +23,21 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 public class UnionInputGateTest {
 
-	@Test
-	public void testChannelMapping() throws Exception {
-
+	/**
+	 * Tests basic correctness of buffer-or-event interleaving and correct <code>null</code> return
+	 * value after receiving all end-of-partition events.
+	 *
+	 * <p> For buffer-or-event instances, it is important to verify that they have been set off to
+	 * the correct logical index.
+	 */
+	@Test(timeout = 120 * 1000)
+	public void testBasicGetNextLogic() throws Exception {
+		// Setup
 		final SingleInputGate ig1 = new SingleInputGate(new IntermediateDataSetID(), 0, 3);
 		final SingleInputGate ig2 = new SingleInputGate(new IntermediateDataSetID(), 0, 5);
 
@@ -42,21 +51,41 @@ public class UnionInputGateTest {
 		};
 
 		inputChannels[0][0].readBuffer(); // 0 => 0
+		inputChannels[0][0].readEndOfPartitionEvent(); // 0 => 0
 		inputChannels[1][2].readBuffer(); // 2 => 5
+		inputChannels[1][2].readEndOfPartitionEvent(); // 2 => 5
 		inputChannels[1][0].readBuffer(); // 0 => 3
 		inputChannels[1][1].readBuffer(); // 1 => 4
 		inputChannels[0][1].readBuffer(); // 1 => 1
 		inputChannels[1][3].readBuffer(); // 3 => 6
+		inputChannels[0][1].readEndOfPartitionEvent(); // 1 => 1
+		inputChannels[1][3].readEndOfPartitionEvent(); // 3 => 6
 		inputChannels[0][2].readBuffer(); // 1 => 2
+		inputChannels[0][2].readEndOfPartitionEvent(); // 1 => 2
 		inputChannels[1][4].readBuffer(); // 4 => 7
+		inputChannels[1][4].readEndOfPartitionEvent(); // 4 => 7
+		inputChannels[1][1].readEndOfPartitionEvent(); // 0 => 3
+		inputChannels[1][0].readEndOfPartitionEvent(); // 0 => 3
+
+		SingleInputGateTest.verifyBufferOrEvent(union, true, 0);
+		SingleInputGateTest.verifyBufferOrEvent(union, false, 0);
+		SingleInputGateTest.verifyBufferOrEvent(union, true, 5);
+		SingleInputGateTest.verifyBufferOrEvent(union, false, 5);
+		SingleInputGateTest.verifyBufferOrEvent(union, true, 3);
+		SingleInputGateTest.verifyBufferOrEvent(union, true, 4);
+		SingleInputGateTest.verifyBufferOrEvent(union, true, 1);
+		SingleInputGateTest.verifyBufferOrEvent(union, true, 6);
+		SingleInputGateTest.verifyBufferOrEvent(union, false, 1);
+		SingleInputGateTest.verifyBufferOrEvent(union, false, 6);
+		SingleInputGateTest.verifyBufferOrEvent(union, true, 2);
+		SingleInputGateTest.verifyBufferOrEvent(union, false, 2);
+		SingleInputGateTest.verifyBufferOrEvent(union, true, 7);
+		SingleInputGateTest.verifyBufferOrEvent(union, false, 7);
+		SingleInputGateTest.verifyBufferOrEvent(union, false, 4);
+		SingleInputGateTest.verifyBufferOrEvent(union, false, 3);
 
-		assertEquals(0, union.getNextBufferOrEvent().getChannelIndex());
-		assertEquals(5, union.getNextBufferOrEvent().getChannelIndex());
-		assertEquals(3, union.getNextBufferOrEvent().getChannelIndex());
-		assertEquals(4, union.getNextBufferOrEvent().getChannelIndex());
-		assertEquals(1, union.getNextBufferOrEvent().getChannelIndex());
-		assertEquals(6, union.getNextBufferOrEvent().getChannelIndex());
-		assertEquals(2, union.getNextBufferOrEvent().getChannelIndex());
-		assertEquals(7, union.getNextBufferOrEvent().getChannelIndex());
+		// Return null when the input gate has received all end-of-partition events
+		assertTrue(union.isFinished());
+		assertNull(union.getNextBufferOrEvent());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8b3cbb52/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestInputChannel.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestInputChannel.java
index 306de4c..0e9e8e7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestInputChannel.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestInputChannel.java
@@ -113,6 +113,11 @@ public class TestInputChannel {
 
 	// ------------------------------------------------------------------------
 
+	/**
+	 * Creates test input channels and attaches them to the specified input gate.
+	 *
+	 * @return The created test input channels.
+	 */
 	public static TestInputChannel[] createInputChannels(SingleInputGate inputGate, int numberOfInputChannels) {
 		checkNotNull(inputGate);
 		checkArgument(numberOfInputChannels > 0);