You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/12/07 16:13:47 UTC
[2/2] flink git commit: [FLINK-5274] [network] Handle reader release
in LocalInputChannel
[FLINK-5274] [network] Handle reader release in LocalInputChannel
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1b472d23
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1b472d23
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1b472d23
Branch: refs/heads/release-1.1
Commit: 1b472d232bbbee555c9895db6e9f51fc146d9d24
Parents: 75b48ed
Author: Ufuk Celebi <uc...@apache.org>
Authored: Wed Dec 7 13:39:18 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Wed Dec 7 17:13:01 2016 +0100
----------------------------------------------------------------------
.../partition/consumer/LocalInputChannel.java | 13 ++++++
.../consumer/LocalInputChannelTest.java | 47 ++++++++++++++++++++
2 files changed, 60 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/1b472d23/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
index 51748b7..dcb6046 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.io.network.partition.consumer;
import org.apache.flink.runtime.event.TaskEvent;
+import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
@@ -179,6 +180,18 @@ public class LocalInputChannel extends InputChannel implements BufferAvailabilit
}
Buffer next = subpartitionView.getNextBuffer();
+
+ if (next == null) {
+ if (subpartitionView.isReleased()) {
+ throw new CancelTaskException("Consumed partition " + subpartitionView + " has been released.");
+ } else {
+ // This means there is a bug in the buffer availability
+ // notifications.
+ throw new IllegalStateException("Consumed partition has no buffers available. " +
+ "Number of received buffer notifications is " + numBuffersAvailable + ".");
+ }
+ }
+
long remaining = numBuffersAvailable.decrementAndGet();
if (remaining >= 0) {
http://git-wip-us.apache.org/repos/asf/flink/blob/1b472d23/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 974ac9f..5d0a106 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -344,6 +344,53 @@ public class LocalInputChannelTest {
requester.join();
}
+ /**
+ * Tests that reading from a channel when after the partition has been
+ * released are handled and don't lead to NPEs.
+ */
+ @Test
+ public void testGetNextAfterPartitionReleased() throws Exception {
+ ResultSubpartitionView reader = mock(ResultSubpartitionView.class);
+ SingleInputGate gate = mock(SingleInputGate.class);
+ ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
+
+ when(partitionManager.createSubpartitionView(
+ any(ResultPartitionID.class),
+ anyInt(),
+ any(BufferProvider.class),
+ any(BufferAvailabilityListener.class))).thenReturn(reader);
+
+ LocalInputChannel channel = new LocalInputChannel(
+ gate,
+ 0,
+ new ResultPartitionID(),
+ partitionManager,
+ new TaskEventDispatcher(),
+ new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+
+ channel.requestSubpartition(0);
+
+ // Null buffer but not released
+ when(reader.getNextBuffer()).thenReturn(null);
+ when(reader.isReleased()).thenReturn(false);
+
+ try {
+ channel.getNextBuffer();
+ fail("Did not throw expected IllegalStateException");
+ } catch (IllegalStateException ignored) {
+ }
+
+ // Null buffer and released
+ when(reader.getNextBuffer()).thenReturn(null);
+ when(reader.isReleased()).thenReturn(true);
+
+ try {
+ channel.getNextBuffer();
+ fail("Did not throw expected CancelTaskException");
+ } catch (CancelTaskException ignored) {
+ }
+ }
+
// ---------------------------------------------------------------------------------------------
private LocalInputChannel createLocalInputChannel(