You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2015/03/24 13:47:36 UTC
flink git commit: [runtime] Return null at input gate after all
end-of-partition events
Repository: flink
Updated Branches:
refs/heads/master 4c71e90e4 -> 8b3cbb525
[runtime] Return null at input gate after all end-of-partition events
Previously, the input gate user needed to be aware whether it is safe to query
the input gate for more data or not. If a user mistakenly queried an input gate
for data, the blocking call never returned.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8b3cbb52
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8b3cbb52
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8b3cbb52
Branch: refs/heads/master
Commit: 8b3cbb52527f77ed43f3035c88c5750abef2c5de
Parents: 4c71e90
Author: Ufuk Celebi <uc...@apache.org>
Authored: Tue Mar 24 11:13:26 2015 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Tue Mar 24 13:47:26 2015 +0100
----------------------------------------------------------------------
.../partition/consumer/SingleInputGate.java | 18 ++++++-
.../partition/consumer/UnionInputGate.java | 31 +++++++++--
.../partition/consumer/SingleInputGateTest.java | 56 ++++++++++++++++++++
.../partition/consumer/UnionInputGateTest.java | 51 ++++++++++++++----
.../io/network/util/TestInputChannel.java | 5 ++
5 files changed, 144 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8b3cbb52/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 5b97d26..867a4b9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -41,6 +41,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.BitSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
@@ -124,12 +125,16 @@ public class SingleInputGate implements InputGate {
/** Channels, which notified this input gate about available data. */
private final BlockingQueue<InputChannel> inputChannelsWithData = new LinkedBlockingQueue<InputChannel>();
+ private final BitSet channelsWithEndOfPartitionEvents;
+
/**
* Buffer pool for incoming buffers. Incoming data from remote channels is copied to buffers
* from this pool.
*/
private BufferPool bufferPool;
+ private boolean hasReceivedAllEndOfPartitionEvents;
+
/** Flag indicating whether partitions have been requested. */
private boolean requestedPartitionsFlag;
@@ -153,6 +158,7 @@ public class SingleInputGate implements InputGate {
this.numberOfInputChannels = numberOfInputChannels;
this.inputChannels = Maps.newHashMapWithExpectedSize(numberOfInputChannels);
+ this.channelsWithEndOfPartitionEvents = new BitSet(numberOfInputChannels);
}
// ------------------------------------------------------------------------
@@ -311,8 +317,12 @@ public class SingleInputGate implements InputGate {
@Override
public BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedException {
+ if (hasReceivedAllEndOfPartitionEvents) {
+ return null;
+ }
+
if (isReleased) {
- throw new IllegalStateException("The input has already been consumed. This indicates misuse of the input gate.");
+ throw new IllegalStateException("Already released.");
}
requestPartitions();
@@ -337,6 +347,12 @@ public class SingleInputGate implements InputGate {
final AbstractEvent event = EventSerializer.fromBuffer(buffer, getClass().getClassLoader());
if (event.getClass() == EndOfPartitionEvent.class) {
+ channelsWithEndOfPartitionEvents.set(currentChannel.getChannelIndex());
+
+ if (channelsWithEndOfPartitionEvents.cardinality() == numberOfInputChannels) {
+ hasReceivedAllEndOfPartitionEvents = true;
+ }
+
currentChannel.notifySubpartitionConsumed();
currentChannel.releaseAllResources();
http://git-wip-us.apache.org/repos/asf/flink/blob/8b3cbb52/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
index 5a7a5b0..1f974de 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
@@ -19,12 +19,15 @@
package org.apache.flink.runtime.io.network.partition.consumer;
import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.util.event.EventListener;
import java.io.IOException;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingQueue;
@@ -34,8 +37,8 @@ import static com.google.common.base.Preconditions.checkNotNull;
/**
* Input gate wrapper to union the input from multiple input gates.
- * <p>
- * Each input gate has input channels attached from which it reads data. At each input gate, the
+ *
+ * <p> Each input gate has input channels attached from which it reads data. At each input gate, the
* input channels have unique IDs from 0 (inclusive) to the number of input channels (exclusive).
*
* <pre>
@@ -65,6 +68,8 @@ public class UnionInputGate implements InputGate {
/** The input gates to union. */
private final InputGate[] inputGates;
+ private final Set<InputGate> inputGatesWithRemainingData;
+
/** Data availability listener across all unioned input gates. */
private final InputGateListener inputGateListener;
@@ -85,12 +90,14 @@ public class UnionInputGate implements InputGate {
checkArgument(inputGates.length > 1, "Union input gate should union at least two input gates.");
this.inputGateToIndexOffsetMap = Maps.newHashMapWithExpectedSize(inputGates.length);
+ this.inputGatesWithRemainingData = Sets.newHashSetWithExpectedSize(inputGates.length);
int currentNumberOfInputChannels = 0;
for (InputGate inputGate : inputGates) {
// The offset to use for buffer or event instances received from this input gate.
inputGateToIndexOffsetMap.put(checkNotNull(inputGate), currentNumberOfInputChannels);
+ inputGatesWithRemainingData.add(inputGate);
currentNumberOfInputChannels += inputGate.getNumberOfInputChannels();
}
@@ -133,6 +140,10 @@ public class UnionInputGate implements InputGate {
@Override
public BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedException {
+ if (inputGatesWithRemainingData.isEmpty()) {
+ return null;
+ }
+
// Make sure to request the partitions, if they have not been requested before.
requestPartitions();
@@ -140,6 +151,16 @@ public class UnionInputGate implements InputGate {
final BufferOrEvent bufferOrEvent = inputGate.getNextBufferOrEvent();
+ if (bufferOrEvent.isEvent()
+ && bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class
+ && inputGate.isFinished()) {
+
+ if (!inputGatesWithRemainingData.remove(inputGate)) {
+ throw new IllegalStateException("Couldn't find input gate in set of remaining " +
+ "input gates.");
+ }
+ }
+
// Set the channel index to identify the input channel (across all unioned input gates)
final int channelIndexOffset = inputGateToIndexOffsetMap.get(inputGate);
@@ -163,9 +184,9 @@ public class UnionInputGate implements InputGate {
/**
* Data availability listener at all unioned input gates.
- * <p>
- * The listener registers itself at each input gate and is notified for *each incoming buffer*
- * at one of the unioned input gates.
+ *
+ * <p> The listener registers itself at each input gate and is notified for *each incoming
+ * buffer* at one of the unioned input gates.
*/
private static class InputGateListener implements EventListener<InputGate> {
http://git-wip-us.apache.org/repos/asf/flink/blob/8b3cbb52/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 6cd5469..e1e3cff 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -32,11 +32,17 @@ import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
+import org.apache.flink.runtime.io.network.util.TestInputChannel;
import org.apache.flink.runtime.io.network.util.TestTaskEvent;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.junit.Test;
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Mockito.mock;
@@ -46,6 +52,44 @@ import static org.mockito.Mockito.when;
public class SingleInputGateTest {
+ /**
+ * Tests basic correctness of buffer-or-event interleaving and correct <code>null</code> return
+ * value after receiving all end-of-partition events.
+ */
+ @Test(timeout = 120 * 1000)
+ public void testBasicGetNextLogic() throws Exception {
+ // Setup
+ final SingleInputGate inputGate = new SingleInputGate(new IntermediateDataSetID(), 0, 2);
+
+ final TestInputChannel[] inputChannels = new TestInputChannel[]{
+ new TestInputChannel(inputGate, 0),
+ new TestInputChannel(inputGate, 1)
+ };
+
+ inputGate.setInputChannel(
+ new IntermediateResultPartitionID(), inputChannels[0].getInputChannel());
+
+ inputGate.setInputChannel(
+ new IntermediateResultPartitionID(), inputChannels[1].getInputChannel());
+
+ // Test
+ inputChannels[0].readBuffer();
+ inputChannels[0].readBuffer();
+ inputChannels[1].readBuffer();
+ inputChannels[1].readEndOfPartitionEvent();
+ inputChannels[0].readEndOfPartitionEvent();
+
+ verifyBufferOrEvent(inputGate, true, 0);
+ verifyBufferOrEvent(inputGate, true, 0);
+ verifyBufferOrEvent(inputGate, true, 1);
+ verifyBufferOrEvent(inputGate, false, 1);
+ verifyBufferOrEvent(inputGate, false, 0);
+
+ // Return null when the input gate has received all end-of-partition events
+ assertTrue(inputGate.isFinished());
+ assertNull(inputGate.getNextBufferOrEvent());
+ }
+
@Test
@SuppressWarnings("unchecked")
public void testBackwardsEventWithUninitializedChannel() throws Exception {
@@ -101,4 +145,16 @@ public class SingleInputGateTest {
verify(partitionManager, times(2)).getSubpartition(any(ResultPartitionID.class), anyInt(), any(Optional.class));
verify(taskEventDispatcher, times(2)).publish(any(ResultPartitionID.class), any(TaskEvent.class));
}
+
+ // ---------------------------------------------------------------------------------------------
+
+ static void verifyBufferOrEvent(
+ InputGate inputGate,
+ boolean isBuffer,
+ int channelIndex) throws IOException, InterruptedException {
+
+ final BufferOrEvent boe = inputGate.getNextBufferOrEvent();
+ assertEquals(isBuffer, boe.isBuffer());
+ assertEquals(channelIndex, boe.getChannelIndex());
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8b3cbb52/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
index 131c4f6..c7cb413 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
@@ -23,12 +23,21 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
public class UnionInputGateTest {
- @Test
- public void testChannelMapping() throws Exception {
-
+ /**
+ * Tests basic correctness of buffer-or-event interleaving and correct <code>null</code> return
+ * value after receiving all end-of-partition events.
+ *
+ * <p> For buffer-or-event instances, it is important to verify that they have been set off to
+ * the correct logical index.
+ */
+ @Test(timeout = 120 * 1000)
+ public void testBasicGetNextLogic() throws Exception {
+ // Setup
final SingleInputGate ig1 = new SingleInputGate(new IntermediateDataSetID(), 0, 3);
final SingleInputGate ig2 = new SingleInputGate(new IntermediateDataSetID(), 0, 5);
@@ -42,21 +51,41 @@ public class UnionInputGateTest {
};
inputChannels[0][0].readBuffer(); // 0 => 0
+ inputChannels[0][0].readEndOfPartitionEvent(); // 0 => 0
inputChannels[1][2].readBuffer(); // 2 => 5
+ inputChannels[1][2].readEndOfPartitionEvent(); // 2 => 5
inputChannels[1][0].readBuffer(); // 0 => 3
inputChannels[1][1].readBuffer(); // 1 => 4
inputChannels[0][1].readBuffer(); // 1 => 1
inputChannels[1][3].readBuffer(); // 3 => 6
+ inputChannels[0][1].readEndOfPartitionEvent(); // 1 => 1
+ inputChannels[1][3].readEndOfPartitionEvent(); // 3 => 6
inputChannels[0][2].readBuffer(); // 1 => 2
+ inputChannels[0][2].readEndOfPartitionEvent(); // 1 => 2
inputChannels[1][4].readBuffer(); // 4 => 7
+ inputChannels[1][4].readEndOfPartitionEvent(); // 4 => 7
+ inputChannels[1][1].readEndOfPartitionEvent(); // 0 => 3
+ inputChannels[1][0].readEndOfPartitionEvent(); // 0 => 3
+
+ SingleInputGateTest.verifyBufferOrEvent(union, true, 0);
+ SingleInputGateTest.verifyBufferOrEvent(union, false, 0);
+ SingleInputGateTest.verifyBufferOrEvent(union, true, 5);
+ SingleInputGateTest.verifyBufferOrEvent(union, false, 5);
+ SingleInputGateTest.verifyBufferOrEvent(union, true, 3);
+ SingleInputGateTest.verifyBufferOrEvent(union, true, 4);
+ SingleInputGateTest.verifyBufferOrEvent(union, true, 1);
+ SingleInputGateTest.verifyBufferOrEvent(union, true, 6);
+ SingleInputGateTest.verifyBufferOrEvent(union, false, 1);
+ SingleInputGateTest.verifyBufferOrEvent(union, false, 6);
+ SingleInputGateTest.verifyBufferOrEvent(union, true, 2);
+ SingleInputGateTest.verifyBufferOrEvent(union, false, 2);
+ SingleInputGateTest.verifyBufferOrEvent(union, true, 7);
+ SingleInputGateTest.verifyBufferOrEvent(union, false, 7);
+ SingleInputGateTest.verifyBufferOrEvent(union, false, 4);
+ SingleInputGateTest.verifyBufferOrEvent(union, false, 3);
- assertEquals(0, union.getNextBufferOrEvent().getChannelIndex());
- assertEquals(5, union.getNextBufferOrEvent().getChannelIndex());
- assertEquals(3, union.getNextBufferOrEvent().getChannelIndex());
- assertEquals(4, union.getNextBufferOrEvent().getChannelIndex());
- assertEquals(1, union.getNextBufferOrEvent().getChannelIndex());
- assertEquals(6, union.getNextBufferOrEvent().getChannelIndex());
- assertEquals(2, union.getNextBufferOrEvent().getChannelIndex());
- assertEquals(7, union.getNextBufferOrEvent().getChannelIndex());
+ // Return null when the input gate has received all end-of-partition events
+ assertTrue(union.isFinished());
+ assertNull(union.getNextBufferOrEvent());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8b3cbb52/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestInputChannel.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestInputChannel.java
index 306de4c..0e9e8e7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestInputChannel.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestInputChannel.java
@@ -113,6 +113,11 @@ public class TestInputChannel {
// ------------------------------------------------------------------------
+ /**
+ * Creates test input channels and attaches them to the specified input gate.
+ *
+ * @return The created test input channels.
+ */
public static TestInputChannel[] createInputChannels(SingleInputGate inputGate, int numberOfInputChannels) {
checkNotNull(inputGate);
checkArgument(numberOfInputChannels > 0);