You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/12/07 16:13:46 UTC

[1/2] flink git commit: [FLINK-5275] [execgraph] Give more detailed error message if InputChannel deployment fails

Repository: flink
Updated Branches:
  refs/heads/release-1.1 75b48edd1 -> 4526005d2


[FLINK-5275] [execgraph] Give more detailed error message if InputChannel deployment fails


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4526005d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4526005d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4526005d

Branch: refs/heads/release-1.1
Commit: 4526005d29b697446a6d3a87b22fb0c33912713d
Parents: 1b472d2
Author: Ufuk Celebi <uc...@apache.org>
Authored: Wed Dec 7 13:48:25 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Wed Dec 7 17:13:01 2016 +0100

----------------------------------------------------------------------
 .../InputChannelDeploymentDescriptor.java          | 17 +++++++++++++++--
 1 file changed, 15 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4526005d/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
index 24b95ea..dc97bf3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
@@ -123,8 +123,21 @@ public class InputChannelDeploymentDescriptor implements Serializable {
 			else if (allowLazyDeployment) {
 				// The producing task might not have registered the partition yet
 				partitionLocation = ResultPartitionLocation.createUnknown();
-			} else {
-				throw new IllegalStateException("Trying to eagerly schedule a task whose inputs are not ready.");
+			}
+			else if (producerState == ExecutionState.CANCELING
+						|| producerState == ExecutionState.CANCELED
+						|| producerState == ExecutionState.FAILED) {
+				String msg = "Trying to schedule a task whose inputs were canceled or failed. " +
+					"The producer is in state " + producerState + ".";
+				throw new IllegalStateException(msg);
+			}
+			else {
+				String msg = String.format("Trying to eagerly schedule a task whose inputs " +
+					"are not ready (partition consumable? %s, producer state: %s, producer slot: %s).",
+						consumedPartition.isConsumable(),
+						producerState,
+						producerSlot);
+				throw new IllegalStateException(msg);
 			}
 
 			final ResultPartitionID consumedPartitionId = new ResultPartitionID(


[2/2] flink git commit: [FLINK-5274] [network] Handle reader release in LocalInputChannel

Posted by uc...@apache.org.
[FLINK-5274] [network] Handle reader release in LocalInputChannel


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1b472d23
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1b472d23
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1b472d23

Branch: refs/heads/release-1.1
Commit: 1b472d232bbbee555c9895db6e9f51fc146d9d24
Parents: 75b48ed
Author: Ufuk Celebi <uc...@apache.org>
Authored: Wed Dec 7 13:39:18 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Wed Dec 7 17:13:01 2016 +0100

----------------------------------------------------------------------
 .../partition/consumer/LocalInputChannel.java   | 13 ++++++
 .../consumer/LocalInputChannelTest.java         | 47 ++++++++++++++++++++
 2 files changed, 60 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1b472d23/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
index 51748b7..dcb6046 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.io.network.partition.consumer;
 
 import org.apache.flink.runtime.event.TaskEvent;
+import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
@@ -179,6 +180,18 @@ public class LocalInputChannel extends InputChannel implements BufferAvailabilit
 		}
 
 		Buffer next = subpartitionView.getNextBuffer();
+
+		if (next == null) {
+			if (subpartitionView.isReleased()) {
+				throw new CancelTaskException("Consumed partition " + subpartitionView + " has been released.");
+			} else {
+				// This means there is a bug in the buffer availability
+				// notifications.
+				throw new IllegalStateException("Consumed partition has no buffers available. " +
+					"Number of received buffer notifications is " + numBuffersAvailable + ".");
+			}
+		}
+
 		long remaining = numBuffersAvailable.decrementAndGet();
 
 		if (remaining >= 0) {

http://git-wip-us.apache.org/repos/asf/flink/blob/1b472d23/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 974ac9f..5d0a106 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -344,6 +344,53 @@ public class LocalInputChannelTest {
 		requester.join();
 	}
 
+	/**
+	 * Tests that reading from a channel when after the partition has been
+	 * released are handled and don't lead to NPEs.
+	 */
+	@Test
+	public void testGetNextAfterPartitionReleased() throws Exception {
+		ResultSubpartitionView reader = mock(ResultSubpartitionView.class);
+		SingleInputGate gate = mock(SingleInputGate.class);
+		ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
+
+		when(partitionManager.createSubpartitionView(
+				any(ResultPartitionID.class),
+				anyInt(),
+				any(BufferProvider.class),
+				any(BufferAvailabilityListener.class))).thenReturn(reader);
+
+		LocalInputChannel channel = new LocalInputChannel(
+			gate,
+			0,
+			new ResultPartitionID(),
+			partitionManager,
+			new TaskEventDispatcher(),
+			new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+
+		channel.requestSubpartition(0);
+
+		// Null buffer but not released
+		when(reader.getNextBuffer()).thenReturn(null);
+		when(reader.isReleased()).thenReturn(false);
+
+		try {
+			channel.getNextBuffer();
+			fail("Did not throw expected IllegalStateException");
+		} catch (IllegalStateException ignored) {
+		}
+
+		// Null buffer and released
+		when(reader.getNextBuffer()).thenReturn(null);
+		when(reader.isReleased()).thenReturn(true);
+
+		try {
+			channel.getNextBuffer();
+			fail("Did not throw expected CancelTaskException");
+		} catch (CancelTaskException ignored) {
+		}
+	}
+
 	// ---------------------------------------------------------------------------------------------
 
 	private LocalInputChannel createLocalInputChannel(