You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/07/04 17:10:21 UTC
[3/9] flink git commit: [FLINK-6043] [web] Display exception timestamp
[FLINK-6043] [web] Display exception timestamp
This closes #3583.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0f91083b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0f91083b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0f91083b
Branch: refs/heads/master
Commit: 0f91083b2b497485b4375935ed7397583a03ca7c
Parents: 957cced
Author: zentol <ch...@apache.org>
Authored: Wed Mar 29 14:46:19 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Tue Jul 4 15:04:16 2017 +0200
----------------------------------------------------------------------
.../handlers/JobExceptionsHandler.java | 11 ++-
.../handlers/JobExceptionsHandlerTest.java | 5 +-
.../utils/ArchivedExecutionGraphBuilder.java | 7 +-
.../utils/ArchivedJobGenerationUtils.java | 3 +-
.../app/partials/jobs/job.exceptions.jade | 14 +++-
.../executiongraph/AccessExecutionGraph.java | 4 +-
.../executiongraph/ArchivedExecutionGraph.java | 6 +-
.../flink/runtime/executiongraph/ErrorInfo.java | 85 ++++++++++++++++++++
.../runtime/executiongraph/ExecutionGraph.java | 56 +++++++++----
.../ArchivedExecutionGraphTest.java | 2 +-
.../ExecutionGraphSuspendTest.java | 2 +-
.../query/AbstractQueryableStateITCase.java | 2 +-
12 files changed, 165 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0f91083b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
index 181b270..e31299b 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
@@ -18,8 +18,10 @@
package org.apache.flink.runtime.webmonitor.handlers;
+import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.executiongraph.ErrorInfo;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
@@ -78,9 +80,10 @@ public class JobExceptionsHandler extends AbstractExecutionGraphRequestHandler {
gen.writeStartObject();
// most important is the root failure cause
- String rootException = graph.getFailureCauseAsString();
- if (rootException != null && !rootException.equals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION)) {
- gen.writeStringField("root-exception", rootException);
+ ErrorInfo rootException = graph.getFailureCause();
+ if (rootException != null && !rootException.getExceptionAsString().equals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION)) {
+ gen.writeStringField("root-exception", rootException.getExceptionAsString());
+ gen.writeNumberField("timestamp", rootException.getTimestamp());
}
// we additionally collect all exceptions (up to a limit) that occurred in the individual tasks
@@ -105,6 +108,8 @@ public class JobExceptionsHandler extends AbstractExecutionGraphRequestHandler {
gen.writeStringField("exception", t);
gen.writeStringField("task", task.getTaskNameWithSubtaskIndex());
gen.writeStringField("location", locationString);
+ long timestamp = task.getStateTimestamp(ExecutionState.FAILED);
+ gen.writeNumberField("timestamp", timestamp == 0 ? -1 : timestamp);
gen.writeEndObject();
numExceptionsSoFar++;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0f91083b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java
index f3df225..6016d01 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
+import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -71,7 +72,8 @@ public class JobExceptionsHandlerTest {
private static void compareExceptions(AccessExecutionGraph originalJob, String json) throws IOException {
JsonNode result = ArchivedJobGenerationUtils.MAPPER.readTree(json);
- Assert.assertEquals(originalJob.getFailureCauseAsString(), result.get("root-exception").asText());
+ Assert.assertEquals(originalJob.getFailureCause().getExceptionAsString(), result.get("root-exception").asText());
+ Assert.assertEquals(originalJob.getFailureCause().getTimestamp(), result.get("timestamp").asLong());
ArrayNode exceptions = (ArrayNode) result.get("all-exceptions");
@@ -81,6 +83,7 @@ public class JobExceptionsHandlerTest {
JsonNode exception = exceptions.get(x);
Assert.assertEquals(expectedSubtask.getFailureCauseAsString(), exception.get("exception").asText());
+ Assert.assertEquals(expectedSubtask.getStateTimestamp(ExecutionState.FAILED), exception.get("timestamp").asLong());
Assert.assertEquals(expectedSubtask.getTaskNameWithSubtaskIndex(), exception.get("task").asText());
TaskManagerLocation location = expectedSubtask.getCurrentAssignedResourceLocation();
http://git-wip-us.apache.org/repos/asf/flink/blob/0f91083b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionGraphBuilder.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionGraphBuilder.java
index 935663c..57b300a 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionGraphBuilder.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionGraphBuilder.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ErrorInfo;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.util.Preconditions;
@@ -47,7 +48,7 @@ public class ArchivedExecutionGraphBuilder {
private List<ArchivedExecutionJobVertex> verticesInCreationOrder;
private long[] stateTimestamps;
private JobStatus state;
- private String failureCause;
+ private ErrorInfo failureCause;
private String jsonPlan;
private StringifiedAccumulatorResult[] archivedUserAccumulators;
private ArchivedExecutionConfig archivedExecutionConfig;
@@ -85,7 +86,7 @@ public class ArchivedExecutionGraphBuilder {
return this;
}
- public ArchivedExecutionGraphBuilder setFailureCause(String failureCause) {
+ public ArchivedExecutionGraphBuilder setFailureCause(ErrorInfo failureCause) {
this.failureCause = failureCause;
return this;
}
@@ -126,7 +127,7 @@ public class ArchivedExecutionGraphBuilder {
verticesInCreationOrder != null ? verticesInCreationOrder : new ArrayList<>(tasks.values()),
stateTimestamps != null ? stateTimestamps : new long[JobStatus.values().length],
state != null ? state : JobStatus.FINISHED,
- failureCause != null ? failureCause : "(null)",
+ failureCause,
jsonPlan != null ? jsonPlan : "{\"jobid\":\"" + jobID + "\", \"name\":\"" + jobName + "\", \"nodes\":[]}",
archivedUserAccumulators != null ? archivedUserAccumulators : new StringifiedAccumulatorResult[0],
serializedUserAccumulators != null ? serializedUserAccumulators : Collections.<String, SerializedValue<Object>>emptyMap(),
http://git-wip-us.apache.org/repos/asf/flink/blob/0f91083b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedJobGenerationUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedJobGenerationUtils.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedJobGenerationUtils.java
index dee04f2..3e4fc01 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedJobGenerationUtils.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedJobGenerationUtils.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.executiongraph.ArchivedExecution;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
+import org.apache.flink.runtime.executiongraph.ErrorInfo;
import org.apache.flink.runtime.executiongraph.IOMetrics;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -132,7 +133,7 @@ public class ArchivedJobGenerationUtils {
originalJob = new ArchivedExecutionGraphBuilder()
.setJobID(new JobID())
.setTasks(tasks)
- .setFailureCause("jobException")
+ .setFailureCause(new ErrorInfo(new Exception("jobException"), originalAttempt.getStateTimestamp(ExecutionState.FAILED)))
.setState(JobStatus.FINISHED)
.setStateTimestamps(new long[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10})
.setArchivedUserAccumulators(new StringifiedAccumulatorResult[]{acc1, acc2})
http://git-wip-us.apache.org/repos/asf/flink/blob/0f91083b/flink-runtime-web/web-dashboard/app/partials/jobs/job.exceptions.jade
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/partials/jobs/job.exceptions.jade b/flink-runtime-web/web-dashboard/app/partials/jobs/job.exceptions.jade
index 1aa10b0..eee19f5 100644
--- a/flink-runtime-web/web-dashboard/app/partials/jobs/job.exceptions.jade
+++ b/flink-runtime-web/web-dashboard/app/partials/jobs/job.exceptions.jade
@@ -20,6 +20,12 @@
.panel-title
| Root exception
+ .panel-heading.clearfix
+ .panel-info.thin.last(ng-if="exceptions['timestamp'] != -1")
+ strong Timestamp:
+ = ' '
+ | {{ exceptions['timestamp'] | amDateFormat:'YYYY-MM-DD, H:mm:ss' }}
+
.panel-body
pre.exception
| {{ exceptions['root-exception'] }}
@@ -30,8 +36,14 @@
| {{ exception.task }}
.panel-heading.clearfix
+ .panel-info.thin(ng-if="exception.timestamp != -1")
+ strong Timestamp:
+ = ' '
+ | {{ exception.timestamp | amDateFormat:'YYYY-MM-DD, H:mm:ss' }}
.panel-info.thin.last
- span {{ exception.location }}
+ strong Location:
+ = ' '
+ | {{ exception.location }}
.panel-body
pre.exception
http://git-wip-us.apache.org/repos/asf/flink/blob/0f91083b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
index 3b064c3..df4ed2e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
@@ -66,9 +66,9 @@ public interface AccessExecutionGraph {
* Returns the exception that caused the job to fail. This is the first root exception
* that was not recoverable and triggered job failure.
*
- * @return failure causing exception as a string, or {@code "(null)"}
+ * @return failure causing exception, or null
*/
- String getFailureCauseAsString();
+ ErrorInfo getFailureCause();
/**
* Returns the job vertex for the given {@link JobVertexID}.
http://git-wip-us.apache.org/repos/asf/flink/blob/0f91083b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
index b9db1e7..901b973 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
@@ -71,7 +71,7 @@ public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializabl
* The exception that caused the job to fail. This is set to the first root exception
* that was not recoverable and triggered job failure
*/
- private final String failureCause;
+ private final ErrorInfo failureCause;
// ------ Fields that are only relevant for archived execution graphs ------------
private final String jsonPlan;
@@ -93,7 +93,7 @@ public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializabl
List<ArchivedExecutionJobVertex> verticesInCreationOrder,
long[] stateTimestamps,
JobStatus state,
- String failureCause,
+ ErrorInfo failureCause,
String jsonPlan,
StringifiedAccumulatorResult[] archivedUserAccumulators,
Map<String, SerializedValue<Object>> serializedUserAccumulators,
@@ -141,7 +141,7 @@ public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializabl
}
@Override
- public String getFailureCauseAsString() {
+ public ErrorInfo getFailureCause() {
return failureCause;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0f91083b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ErrorInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ErrorInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ErrorInfo.java
new file mode 100644
index 0000000..d919bfa
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ErrorInfo.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+/**
+ * Simple container to hold an exception and the corresponding timestamp.
+ */
+public class ErrorInfo implements Serializable {
+
+ private static final long serialVersionUID = -6138942031953594202L;
+
+ private final transient Throwable exception;
+ private final long timestamp;
+
+ private volatile String exceptionAsString;
+
+ public ErrorInfo(Throwable exception, long timestamp) {
+ Preconditions.checkNotNull(exception);
+ Preconditions.checkArgument(timestamp > 0);
+
+ this.exception = exception;
+ this.timestamp = timestamp;
+ }
+
+ /**
+ * Returns the contained exception.
+ *
+ * @return contained exception, or {@code "(null)"} if either no exception was set or this object has been deserialized
+ */
+ Throwable getException() {
+ return exception;
+ }
+
+ /**
+ * Returns the contained exception as a string.
+ *
+ * @return failure causing exception as a string, or {@code "(null)"}
+ */
+ public String getExceptionAsString() {
+ if (exceptionAsString == null) {
+ exceptionAsString = ExceptionUtils.stringifyException(exception);
+ }
+ return exceptionAsString;
+ }
+
+ /**
+ * Returns the timestamp for the contained exception.
+ *
+ * @return timestamp of contained exception, or 0 if no exception was set
+ */
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ private void writeObject(ObjectOutputStream out) throws IOException {
+ // make sure that the exception was stringified so it isn't lost during serialization
+ if (exceptionAsString == null) {
+ exceptionAsString = ExceptionUtils.stringifyException(exception);
+ }
+ out.defaultWriteObject();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0f91083b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 7c13936..d4f7551 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -261,7 +261,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
/** The exception that caused the job to fail. This is set to the first root exception
* that was not recoverable and triggered job failure */
- private volatile Throwable failureCause;
+ private volatile ErrorInfo failureCause;
// ------ Fields that are relevant to the execution and need to be cleared before archiving -------
@@ -593,7 +593,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
return state;
}
- public Throwable getFailureCause() {
+ public ErrorInfo getFailureCause() {
return failureCause;
}
@@ -610,11 +610,6 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
}
@Override
- public String getFailureCauseAsString() {
- return ExceptionUtils.stringifyException(failureCause);
- }
-
- @Override
public ExecutionJobVertex getJobVertex(JobVertexID id) {
return this.tasks.get(id);
}
@@ -1034,6 +1029,25 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
* @param suspensionCause Cause of the suspension
*/
public void suspend(Throwable suspensionCause) {
+ suspend(new ErrorInfo(suspensionCause, System.currentTimeMillis()));
+ }
+
+ /**
+ * Suspends the current ExecutionGraph.
+ *
+ * The JobStatus will be directly set to SUSPENDED iff the current state is not a terminal
+ * state. All ExecutionJobVertices will be canceled and the postRunCleanup is executed.
+ *
+ * The SUSPENDED state is a local terminal state which stops the execution of the job but does
+ * not remove the job from the HA job store so that it can be recovered by another JobManager.
+ *
+ * @param errorInfo ErrorInfo containing the cause of the suspension
+ */
+ public void suspend(ErrorInfo errorInfo) {
+ Throwable suspensionCause = errorInfo != null
+ ? errorInfo.getException()
+ : null;
+
while (true) {
JobStatus currentState = state;
@@ -1041,7 +1055,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
// stay in a terminal state
return;
} else if (transitionState(currentState, JobStatus.SUSPENDED, suspensionCause)) {
- this.failureCause = suspensionCause;
+ this.failureCause = errorInfo;
// make sure no concurrent local actions interfere with the cancellation
incrementGlobalModVersion();
@@ -1061,6 +1075,10 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
}
}
+ public void failGlobal(Throwable suspensionCause) {
+ failGlobal(new ErrorInfo(suspensionCause, System.currentTimeMillis()));
+ }
+
/**
* Fails the execution graph globally. This failure will not be recovered by a specific
* failover strategy, but results in a full restart of all tasks.
@@ -1070,9 +1088,13 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
* exceptions that indicate a bug or an unexpected call race), and where a full restart is the
* safe way to get consistency back.
*
- * @param t The exception that caused the failure.
+ * @param errorInfo ErrorInfo containing the exception that caused the failure.
*/
- public void failGlobal(Throwable t) {
+ public void failGlobal(ErrorInfo errorInfo) {
+ Throwable t = errorInfo != null
+ ? errorInfo.getException()
+ : null;
+
while (true) {
JobStatus current = state;
// stay in these states
@@ -1082,14 +1104,14 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
return;
}
else if (current == JobStatus.RESTARTING) {
- this.failureCause = t;
+ this.failureCause = errorInfo;
if (tryRestartOrFail()) {
return;
}
}
- else if (transitionState(current, JobStatus.FAILING, t)) {
- this.failureCause = t;
+ else if (transitionState(current, JobStatus.FAILING,t)) {
+ this.failureCause = errorInfo;
// make sure no concurrent local actions interfere with the cancellation
incrementGlobalModVersion();
@@ -1375,6 +1397,9 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
JobStatus currentState = state;
if (currentState == JobStatus.FAILING || currentState == JobStatus.RESTARTING) {
+ Throwable failureCause = this.failureCause != null
+ ? this.failureCause.getException()
+ : null;
synchronized (progressLock) {
if (LOG.isDebugEnabled()) {
LOG.debug("Try to restart or fail the job {} ({}) if no longer possible.", getJobName(), getJobID(), failureCause);
@@ -1633,6 +1658,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
// see what this means for us. currently, the first FAILED state means -> FAILED
if (newExecutionState == ExecutionState.FAILED) {
final Throwable ex = error != null ? error : new FlinkException("Unknown Error (missing cause)");
+ long timestamp = execution.getStateTimestamp(ExecutionState.FAILED);
// by filtering out late failure calls, we can save some work in
// avoiding redundant local failover
@@ -1643,7 +1669,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
catch (Throwable t) {
// bug in the failover strategy - fall back to global failover
LOG.warn("Error in failover strategy - falling back to global restart", t);
- failGlobal(ex);
+ failGlobal(new ErrorInfo(ex, timestamp));
}
}
}
@@ -1674,7 +1700,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
archivedVerticesInCreationOrder,
stateTimestamps,
getState(),
- getFailureCauseAsString(),
+ failureCause,
getJsonPlan(),
getAccumulatorResultsStringified(),
serializedUserAccumulators,
http://git-wip-us.apache.org/repos/asf/flink/blob/0f91083b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
index 4e1d0f7..5ffc4ae 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
@@ -167,7 +167,7 @@ public class ArchivedExecutionGraphTest {
assertEquals(runtimeGraph.getJobID(), archivedGraph.getJobID());
assertEquals(runtimeGraph.getJobName(), archivedGraph.getJobName());
assertEquals(runtimeGraph.getState(), archivedGraph.getState());
- assertEquals(runtimeGraph.getFailureCauseAsString(), archivedGraph.getFailureCauseAsString());
+ assertEquals(runtimeGraph.getFailureCause().getExceptionAsString(), archivedGraph.getFailureCause().getExceptionAsString());
assertEquals(runtimeGraph.getStatusTimestamp(JobStatus.CREATED), archivedGraph.getStatusTimestamp(JobStatus.CREATED));
assertEquals(runtimeGraph.getStatusTimestamp(JobStatus.RUNNING), archivedGraph.getStatusTimestamp(JobStatus.RUNNING));
assertEquals(runtimeGraph.getStatusTimestamp(JobStatus.FAILING), archivedGraph.getStatusTimestamp(JobStatus.FAILING));
http://git-wip-us.apache.org/repos/asf/flink/blob/0f91083b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
index b3af005..b3a8c33 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
@@ -251,7 +251,7 @@ public class ExecutionGraphSuspendTest {
assertEquals(JobStatus.SUSPENDED, eg.getState());
- assertEquals(exception, eg.getFailureCause());
+ assertEquals(exception, eg.getFailureCause().getException());
}
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0f91083b/flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java
index 6c8e758..f07113d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java
@@ -365,7 +365,7 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
.mapTo(ClassTag$.MODULE$.<JobFound>apply(JobFound.class)),
deadline.timeLeft());
- String failureCause = jobFound.executionGraph().getFailureCauseAsString();
+ String failureCause = jobFound.executionGraph().getFailureCause().getExceptionAsString();
assertTrue("Not instance of SuppressRestartsException", failureCause.startsWith("org.apache.flink.runtime.execution.SuppressRestartsException"));
int causedByIndex = failureCause.indexOf("Caused by: ");