You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/04 18:39:15 UTC

[3/3] flink git commit: [jobmanager] Improve error message in case task deployment times out.

[jobmanager] Improve error message in case task deployment times out.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9c641374
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9c641374
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9c641374

Branch: refs/heads/master
Commit: 9c6413740b759d0e8715053b2339e2c02b38124d
Parents: 5385e48
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Mar 3 22:16:34 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Mar 4 18:20:37 2015 +0100

----------------------------------------------------------------------
 .../flink/runtime/executiongraph/Execution.java     | 16 ++++++++++++----
 1 file changed, 12 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9c641374/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index bb75088..bd76089 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -57,6 +57,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 import static com.google.common.base.Preconditions.checkArgument;
@@ -205,7 +206,9 @@ public class Execution implements Serializable {
 	 *       to be scheduled immediately and no resource is available. If the task is accepted by the schedule, any
 	 *       error sets the vertex state to failed and triggers the recovery logic.
 	 * 
-	 * @param scheduler
+	 * @param scheduler The scheduler to use to schedule this execution attempt.
+	 * @param queued Flag to indicate whether the scheduler may queue this task if it cannot
+	 *               immediately deploy it.
 	 * 
 	 * @throws IllegalStateException Thrown, if the vertex is not in CREATED state, which is the only state that permits scheduling.
 	 * @throws NoResourceAvailableException Thrown is no queued scheduling is allowed and no resources are currently available.
@@ -329,14 +332,19 @@ public class Execution implements Serializable {
 				@Override
 				public void onComplete(Throwable failure, Object success) throws Throwable {
 					if (failure != null) {
-						markFailed(failure);
+						if (failure instanceof TimeoutException) {
+							markFailed(new Exception("Cannot deploy task - TaskManager not responding.", failure));
+						}
+						else {
+							markFailed(failure);
+						}
 					}
 					else {
 						if (success == null) {
 							markFailed(new Exception("Failed to deploy the task to slot " + slot + ": TaskOperationResult was null"));
 						}
 
-						if(success instanceof TaskOperationResult) {
+						if (success instanceof TaskOperationResult) {
 							TaskOperationResult result = (TaskOperationResult) success;
 
 							if (!result.executionID().equals(attemptId)) {
@@ -349,7 +357,7 @@ public class Execution implements Serializable {
 										getVertexWithAttempt() + " to slot " + slot + ": " + result
 										.description()));
 							}
-						}else {
+						} else {
 							markFailed(new Exception("Failed to deploy the task to slot " + slot +
 									": Response was not of type TaskOperationResult"));
 						}