You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/12/07 16:14:09 UTC

[2/2] flink git commit: [FLINK-5275] [execgraph] Give more detailed error message if InputChannel deployment fails

[FLINK-5275] [execgraph] Give more detailed error message if InputChannel deployment fails


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4410c04a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4410c04a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4410c04a

Branch: refs/heads/master
Commit: 4410c04a68c7b247bb3d7113e5f40f2a9c2165af
Parents: 555a687
Author: Ufuk Celebi <uc...@apache.org>
Authored: Wed Dec 7 13:48:25 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Wed Dec 7 17:14:00 2016 +0100

----------------------------------------------------------------------
 .../InputChannelDeploymentDescriptor.java          | 17 +++++++++++++++--
 1 file changed, 15 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4410c04a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
index 9b3ce5f..9bf3bd5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
@@ -131,8 +131,21 @@ public class InputChannelDeploymentDescriptor implements Serializable {
 			else if (allowLazyDeployment) {
 				// The producing task might not have registered the partition yet
 				partitionLocation = ResultPartitionLocation.createUnknown();
-			} else {
-				throw new ExecutionGraphException("Trying to eagerly schedule a task whose inputs are not ready.");
+			}
+			else if (producerState == ExecutionState.CANCELING
+						|| producerState == ExecutionState.CANCELED
+						|| producerState == ExecutionState.FAILED) {
+				String msg = "Trying to schedule a task whose inputs were canceled or failed. " +
+					"The producer is in state " + producerState + ".";
+				throw new ExecutionGraphException(msg);
+			}
+			else {
+				String msg = String.format("Trying to eagerly schedule a task whose inputs " +
+					"are not ready (partition consumable? %s, producer state: %s, producer slot: %s).",
+						consumedPartition.isConsumable(),
+						producerState,
+						producerSlot);
+				throw new ExecutionGraphException(msg);
 			}
 
 			final ResultPartitionID consumedPartitionId = new ResultPartitionID(