You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2019/08/27 23:59:43 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-864] add job
error message in job state
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 9bf9a88 [GOBBLIN-864] add job error message in job state
9bf9a88 is described below
commit 9bf9a882427e98e7f4ef089c4ca1bde42f4b36a3
Author: Arjun <ab...@linkedin.com>
AuthorDate: Tue Aug 27 16:59:35 2019 -0700
[GOBBLIN-864] add job error message in job state
Closes #2720 from arjun4084346/storeErrorMessages
---
.../gobblin/configuration/ConfigurationKeys.java | 1 +
.../cluster/ClusterEventMetadataGenerator.java | 4 ++-
.../gobblin/runtime/AbstractJobLauncher.java | 5 ++-
.../apache/gobblin/runtime/EventMetadataUtils.java | 21 ++++++++++++
.../org/apache/gobblin/runtime/JobContext.java | 4 ++-
.../java/org/apache/gobblin/runtime/JobState.java | 38 +++++++++++++++++++++-
.../apache/gobblin/runtime/SafeDatasetCommit.java | 11 +++++--
7 files changed, 78 insertions(+), 6 deletions(-)
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index f873a0d..3538c89 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -243,6 +243,7 @@ public class ConfigurationKeys {
public static final String TASK_ATTEMPT_ID_KEY = "task.AttemptId";
public static final String JOB_CONFIG_FILE_PATH_KEY = "job.config.path";
public static final String TASK_FAILURE_EXCEPTION_KEY = "task.failure.exception";
+ public static final String JOB_FAILURE_EXCEPTION_KEY = "job.failure.exception";
public static final String TASK_RETRIES_KEY = "task.retries";
public static final String TASK_IGNORE_CLOSE_FAILURES = "task.ignoreCloseFailures";
public static final String JOB_FAILURES_KEY = "job.failures";
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ClusterEventMetadataGenerator.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ClusterEventMetadataGenerator.java
index 842e285..05f7fb6 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ClusterEventMetadataGenerator.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ClusterEventMetadataGenerator.java
@@ -41,12 +41,14 @@ public class ClusterEventMetadataGenerator implements EventMetadataGenerator{
public Map<String, String> getMetadata(JobContext jobContext, EventName eventName) {
List<TaskState> taskStates = jobContext.getJobState().getTaskStates();
+ String taskException = EventMetadataUtils.getTaskFailureExceptions(taskStates);
+ String jobException = EventMetadataUtils.getJobFailureExceptions(jobContext.getJobState());
switch (eventName) {
case JOB_COMPLETE:
return ImmutableMap.of(PROCESSED_COUNT_KEY, Long.toString(EventMetadataUtils.getProcessedCount(taskStates)));
case JOB_FAILED:
- return ImmutableMap.of(MESSAGE_KEY, EventMetadataUtils.getTaskFailureExceptions(taskStates));
+ return ImmutableMap.of(MESSAGE_KEY, taskException.length() != 0 ? taskException : jobException);
default:
break;
}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
index 05603e5..1598848 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
@@ -367,7 +367,9 @@ public abstract class AbstractJobLauncher implements JobLauncher {
if (workUnitStream == null || workUnitStream.getWorkUnits() == null) {
this.eventSubmitter.submit(JobEvent.WORK_UNITS_MISSING);
jobState.setState(JobState.RunningState.FAILED);
- throw new JobException("Failed to get work units for job " + jobId);
+ String errMsg = "Failed to get work units for job " + jobId;
+ this.jobContext.getJobState().setJobFailureMessage(errMsg);
+ throw new JobException(errMsg);
}
// No work unit to run
@@ -468,6 +470,7 @@ public abstract class AbstractJobLauncher implements JobLauncher {
jobState.setState(JobState.RunningState.FAILED);
String errMsg = "Failed to launch and run job " + jobId;
LOG.error(errMsg + ": " + t, t);
+ this.jobContext.getJobState().setJobFailureException(t);
} finally {
try {
TimingEvent jobCleanupTimer = this.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_CLEANUP);
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/EventMetadataUtils.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/EventMetadataUtils.java
index ba3490b..9514eb4 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/EventMetadataUtils.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/EventMetadataUtils.java
@@ -27,6 +27,7 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
*/
public class EventMetadataUtils {
public static final String TASK_FAILURE_MESSAGE_KEY = "task.failure.message";
+ public static final String JOB_FAILURE_MESSAGE_KEY = "job.failure.message";
/**
* Get the number of records written by all the writers
@@ -44,6 +45,26 @@ public class EventMetadataUtils {
/**
* Get failure messages
+ * @return The failure messages from the job state
+ */
+ public static String getJobFailureExceptions(JobState jobState) {
+ StringBuffer sb = new StringBuffer();
+
+ if (jobState.contains(JOB_FAILURE_MESSAGE_KEY)) {
+ sb.append(jobState.getProp(JOB_FAILURE_MESSAGE_KEY));
+ }
+ if (jobState.contains(ConfigurationKeys.JOB_FAILURE_EXCEPTION_KEY)) {
+ if (sb.length() != 0) {
+ sb.append(",");
+ }
+ sb.append(jobState.getProp(ConfigurationKeys.JOB_FAILURE_EXCEPTION_KEY));
+ }
+
+ return sb.toString();
+ }
+
+ /**
+ * Get failure messages
* @return The concatenated failure messages from all the task states
*/
public static String getTaskFailureExceptions(List<TaskState> taskStates) {
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
index 2ed8711..fd70141 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
@@ -476,7 +476,9 @@ public class JobContext implements Closeable {
if (!IteratorExecutor.verifyAllSuccessful(result)) {
this.jobState.setState(JobState.RunningState.FAILED);
- throw new IOException("Failed to commit dataset state for some dataset(s) of job " + this.jobId);
+ String errMsg = "Failed to commit dataset state for some dataset(s) of job " + this.jobId;
+ this.jobState.setJobFailureMessage(errMsg);
+ throw new IOException(errMsg);
}
} catch (InterruptedException exc) {
throw new IOException(exc);
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java
index 45d3876..02cf3c9 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java
@@ -29,7 +29,8 @@ import java.util.Properties;
import org.apache.gobblin.metastore.DatasetStateStore;
import org.apache.gobblin.runtime.job.JobProgress;
-import org.apache.gobblin.runtime.job.TaskProgress;
+
+import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.io.Text;
import com.codahale.metrics.Counter;
@@ -38,6 +39,7 @@ import com.codahale.metrics.Meter;
import com.google.common.base.Enums;
import com.google.common.base.Optional;
import com.google.common.base.Strings;
+import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
@@ -327,6 +329,40 @@ public class JobState extends SourceState implements JobProgress {
}
/**
+ * If not already present, set the {@link ConfigurationKeys#JOB_FAILURE_EXCEPTION_KEY} to a {@link String}
+ * representation of the given {@link Throwable}.
+ */
+ public void setJobFailureException(Throwable jobFailureException) {
+ String previousExceptions = this.getProp(ConfigurationKeys.JOB_FAILURE_EXCEPTION_KEY);
+ String currentException = Throwables.getStackTraceAsString(jobFailureException);
+ String aggregatedExceptions;
+
+ if (StringUtils.isEmpty(previousExceptions)) {
+ aggregatedExceptions = currentException;
+ } else {
+ aggregatedExceptions = currentException + "\n\n" + previousExceptions;
+ }
+
+ this.setProp(ConfigurationKeys.JOB_FAILURE_EXCEPTION_KEY, aggregatedExceptions);
+ }
+
+ /**
+ * If not already present, set the {@link EventMetadataUtils#JOB_FAILURE_MESSAGE_KEY} to the given {@link String}.
+ */
+ public void setJobFailureMessage(String jobFailureMessage) {
+ String previousMessages = this.getProp(ConfigurationKeys.JOB_FAILURE_EXCEPTION_KEY);
+ String aggregatedMessages;
+
+ if (StringUtils.isEmpty(previousMessages)) {
+ aggregatedMessages = jobFailureMessage;
+ } else {
+ aggregatedMessages = jobFailureMessage + ", " + previousMessages;
+ }
+
+ this.setProp(EventMetadataUtils.JOB_FAILURE_MESSAGE_KEY, aggregatedMessages);
+ }
+
+ /**
* Increment the number of tasks by 1.
*/
public void incrementTaskCount() {
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
index b271652..d9f63b1 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
@@ -326,6 +326,11 @@ final class SafeDatasetCommit implements Callable<Void> {
// The dataset state is set to FAILED if any task failed and COMMIT_ON_FULL_SUCCESS is used
datasetState.setState(JobState.RunningState.FAILED);
datasetState.incrementJobFailures();
+ Optional<String> taskStateException = taskState.getTaskFailureException();
+ String errMsg = "At least one task did not committed successfully. Setting dataset state to FAILED. "
+ + (taskStateException.isPresent() ? taskStateException.get() : "Exception not set.");
+ log.warn(errMsg);
+ datasetState.setJobFailureMessage(errMsg);
return;
}
}
@@ -401,8 +406,10 @@ final class SafeDatasetCommit implements Callable<Void> {
// dataset failed to be committed.
datasetState.setState(JobState.RunningState.FAILED);
Optional<String> taskStateException = taskState.getTaskFailureException();
- log.warn("At least one task did not committed successfully. Setting dataset state to FAILED.",
- taskStateException.isPresent() ? taskStateException.get() : "Exception not set.");
+ String errMsg = "At least one task did not committed successfully. Setting dataset state to FAILED. "
+ + (taskStateException.isPresent() ? taskStateException.get() : "Exception not set.");
+ log.warn(errMsg);
+ datasetState.setJobFailureMessage(errMsg);
}
}
}