You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/12/07 16:13:47 UTC

[2/2] flink git commit: [FLINK-5274] [network] Handle reader release in LocalInputChannel

[FLINK-5274] [network] Handle reader release in LocalInputChannel


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1b472d23
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1b472d23
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1b472d23

Branch: refs/heads/release-1.1
Commit: 1b472d232bbbee555c9895db6e9f51fc146d9d24
Parents: 75b48ed
Author: Ufuk Celebi <uc...@apache.org>
Authored: Wed Dec 7 13:39:18 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Wed Dec 7 17:13:01 2016 +0100

----------------------------------------------------------------------
 .../partition/consumer/LocalInputChannel.java   | 13 ++++++
 .../consumer/LocalInputChannelTest.java         | 47 ++++++++++++++++++++
 2 files changed, 60 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1b472d23/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
index 51748b7..dcb6046 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.io.network.partition.consumer;
 
 import org.apache.flink.runtime.event.TaskEvent;
+import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
@@ -179,6 +180,18 @@ public class LocalInputChannel extends InputChannel implements BufferAvailabilit
 		}
 
 		Buffer next = subpartitionView.getNextBuffer();
+
+		if (next == null) {
+			if (subpartitionView.isReleased()) {
+				throw new CancelTaskException("Consumed partition " + subpartitionView + " has been released.");
+			} else {
+				// This means there is a bug in the buffer availability
+				// notifications.
+				throw new IllegalStateException("Consumed partition has no buffers available. " +
+					"Number of received buffer notifications is " + numBuffersAvailable + ".");
+			}
+		}
+
 		long remaining = numBuffersAvailable.decrementAndGet();
 
 		if (remaining >= 0) {

http://git-wip-us.apache.org/repos/asf/flink/blob/1b472d23/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 974ac9f..5d0a106 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -344,6 +344,53 @@ public class LocalInputChannelTest {
 		requester.join();
 	}
 
+	/**
+	 * Tests that reading from a channel when after the partition has been
+	 * released are handled and don't lead to NPEs.
+	 */
+	@Test
+	public void testGetNextAfterPartitionReleased() throws Exception {
+		ResultSubpartitionView reader = mock(ResultSubpartitionView.class);
+		SingleInputGate gate = mock(SingleInputGate.class);
+		ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
+
+		when(partitionManager.createSubpartitionView(
+				any(ResultPartitionID.class),
+				anyInt(),
+				any(BufferProvider.class),
+				any(BufferAvailabilityListener.class))).thenReturn(reader);
+
+		LocalInputChannel channel = new LocalInputChannel(
+			gate,
+			0,
+			new ResultPartitionID(),
+			partitionManager,
+			new TaskEventDispatcher(),
+			new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+
+		channel.requestSubpartition(0);
+
+		// Null buffer but not released
+		when(reader.getNextBuffer()).thenReturn(null);
+		when(reader.isReleased()).thenReturn(false);
+
+		try {
+			channel.getNextBuffer();
+			fail("Did not throw expected IllegalStateException");
+		} catch (IllegalStateException ignored) {
+		}
+
+		// Null buffer and released
+		when(reader.getNextBuffer()).thenReturn(null);
+		when(reader.isReleased()).thenReturn(true);
+
+		try {
+			channel.getNextBuffer();
+			fail("Did not throw expected CancelTaskException");
+		} catch (CancelTaskException ignored) {
+		}
+	}
+
 	// ---------------------------------------------------------------------------------------------
 
 	private LocalInputChannel createLocalInputChannel(