You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/12/07 16:14:09 UTC
[2/2] flink git commit: [FLINK-5275] [execgraph] Give more detailed
error message if InputChannel deployment fails
[FLINK-5275] [execgraph] Give more detailed error message if InputChannel deployment fails
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4410c04a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4410c04a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4410c04a
Branch: refs/heads/master
Commit: 4410c04a68c7b247bb3d7113e5f40f2a9c2165af
Parents: 555a687
Author: Ufuk Celebi <uc...@apache.org>
Authored: Wed Dec 7 13:48:25 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Wed Dec 7 17:14:00 2016 +0100
----------------------------------------------------------------------
.../InputChannelDeploymentDescriptor.java | 17 +++++++++++++++--
1 file changed, 15 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/4410c04a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
index 9b3ce5f..9bf3bd5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
@@ -131,8 +131,21 @@ public class InputChannelDeploymentDescriptor implements Serializable {
else if (allowLazyDeployment) {
// The producing task might not have registered the partition yet
partitionLocation = ResultPartitionLocation.createUnknown();
- } else {
- throw new ExecutionGraphException("Trying to eagerly schedule a task whose inputs are not ready.");
+ }
+ else if (producerState == ExecutionState.CANCELING
+ || producerState == ExecutionState.CANCELED
+ || producerState == ExecutionState.FAILED) {
+ String msg = "Trying to schedule a task whose inputs were canceled or failed. " +
+ "The producer is in state " + producerState + ".";
+ throw new ExecutionGraphException(msg);
+ }
+ else {
+ String msg = String.format("Trying to eagerly schedule a task whose inputs " +
+ "are not ready (partition consumable? %s, producer state: %s, producer slot: %s).",
+ consumedPartition.isConsumable(),
+ producerState,
+ producerSlot);
+ throw new ExecutionGraphException(msg);
}
final ResultPartitionID consumedPartitionId = new ResultPartitionID(