You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2019/08/27 23:59:43 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-864] add job error message in job state

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 9bf9a88  [GOBBLIN-864] add job error message in job state
9bf9a88 is described below

commit 9bf9a882427e98e7f4ef089c4ca1bde42f4b36a3
Author: Arjun <ab...@linkedin.com>
AuthorDate: Tue Aug 27 16:59:35 2019 -0700

    [GOBBLIN-864] add job error message in job state
    
    Closes #2720 from arjun4084346/storeErrorMessages
---
 .../gobblin/configuration/ConfigurationKeys.java   |  1 +
 .../cluster/ClusterEventMetadataGenerator.java     |  4 ++-
 .../gobblin/runtime/AbstractJobLauncher.java       |  5 ++-
 .../apache/gobblin/runtime/EventMetadataUtils.java | 21 ++++++++++++
 .../org/apache/gobblin/runtime/JobContext.java     |  4 ++-
 .../java/org/apache/gobblin/runtime/JobState.java  | 38 +++++++++++++++++++++-
 .../apache/gobblin/runtime/SafeDatasetCommit.java  | 11 +++++--
 7 files changed, 78 insertions(+), 6 deletions(-)

diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index f873a0d..3538c89 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -243,6 +243,7 @@ public class ConfigurationKeys {
   public static final String TASK_ATTEMPT_ID_KEY = "task.AttemptId";
   public static final String JOB_CONFIG_FILE_PATH_KEY = "job.config.path";
   public static final String TASK_FAILURE_EXCEPTION_KEY = "task.failure.exception";
+  public static final String JOB_FAILURE_EXCEPTION_KEY = "job.failure.exception";
   public static final String TASK_RETRIES_KEY = "task.retries";
   public static final String TASK_IGNORE_CLOSE_FAILURES = "task.ignoreCloseFailures";
   public static final String JOB_FAILURES_KEY = "job.failures";
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ClusterEventMetadataGenerator.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ClusterEventMetadataGenerator.java
index 842e285..05f7fb6 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ClusterEventMetadataGenerator.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ClusterEventMetadataGenerator.java
@@ -41,12 +41,14 @@ public class ClusterEventMetadataGenerator implements EventMetadataGenerator{
 
   public Map<String, String> getMetadata(JobContext jobContext, EventName eventName) {
     List<TaskState> taskStates = jobContext.getJobState().getTaskStates();
+    String taskException = EventMetadataUtils.getTaskFailureExceptions(taskStates);
+    String jobException = EventMetadataUtils.getJobFailureExceptions(jobContext.getJobState());
 
     switch (eventName) {
       case JOB_COMPLETE:
         return ImmutableMap.of(PROCESSED_COUNT_KEY, Long.toString(EventMetadataUtils.getProcessedCount(taskStates)));
       case JOB_FAILED:
-        return ImmutableMap.of(MESSAGE_KEY, EventMetadataUtils.getTaskFailureExceptions(taskStates));
+        return ImmutableMap.of(MESSAGE_KEY, taskException.length() != 0 ? taskException : jobException);
       default:
         break;
     }
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
index 05603e5..1598848 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
@@ -367,7 +367,9 @@ public abstract class AbstractJobLauncher implements JobLauncher {
         if (workUnitStream == null || workUnitStream.getWorkUnits() == null) {
           this.eventSubmitter.submit(JobEvent.WORK_UNITS_MISSING);
           jobState.setState(JobState.RunningState.FAILED);
-          throw new JobException("Failed to get work units for job " + jobId);
+          String errMsg = "Failed to get work units for job " + jobId;
+          this.jobContext.getJobState().setJobFailureMessage(errMsg);
+          throw new JobException(errMsg);
         }
 
         // No work unit to run
@@ -468,6 +470,7 @@ public abstract class AbstractJobLauncher implements JobLauncher {
         jobState.setState(JobState.RunningState.FAILED);
         String errMsg = "Failed to launch and run job " + jobId;
         LOG.error(errMsg + ": " + t, t);
+        this.jobContext.getJobState().setJobFailureException(t);
       } finally {
         try {
           TimingEvent jobCleanupTimer = this.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_CLEANUP);
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/EventMetadataUtils.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/EventMetadataUtils.java
index ba3490b..9514eb4 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/EventMetadataUtils.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/EventMetadataUtils.java
@@ -27,6 +27,7 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
  */
 public class EventMetadataUtils {
   public static final String TASK_FAILURE_MESSAGE_KEY = "task.failure.message";
+  public static final String JOB_FAILURE_MESSAGE_KEY = "job.failure.message";
 
   /**
    * Get the number of records written by all the writers
@@ -44,6 +45,26 @@ public class EventMetadataUtils {
 
   /**
    * Get failure messages
+   * @return The failure messages from the job state
+   */
+  public static String getJobFailureExceptions(JobState jobState) {
+    StringBuffer sb = new StringBuffer();
+
+    if (jobState.contains(JOB_FAILURE_MESSAGE_KEY)) {
+      sb.append(jobState.getProp(JOB_FAILURE_MESSAGE_KEY));
+    }
+    if (jobState.contains(ConfigurationKeys.JOB_FAILURE_EXCEPTION_KEY)) {
+      if (sb.length() != 0) {
+        sb.append(",");
+      }
+      sb.append(jobState.getProp(ConfigurationKeys.JOB_FAILURE_EXCEPTION_KEY));
+    }
+
+    return sb.toString();
+  }
+
+  /**
+   * Get failure messages
    * @return The concatenated failure messages from all the task states
    */
   public static String getTaskFailureExceptions(List<TaskState> taskStates) {
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
index 2ed8711..fd70141 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
@@ -476,7 +476,9 @@ public class JobContext implements Closeable {
 
       if (!IteratorExecutor.verifyAllSuccessful(result)) {
         this.jobState.setState(JobState.RunningState.FAILED);
-        throw new IOException("Failed to commit dataset state for some dataset(s) of job " + this.jobId);
+        String errMsg = "Failed to commit dataset state for some dataset(s) of job " + this.jobId;
+        this.jobState.setJobFailureMessage(errMsg);
+        throw new IOException(errMsg);
       }
     } catch (InterruptedException exc) {
       throw new IOException(exc);
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java
index 45d3876..02cf3c9 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java
@@ -29,7 +29,8 @@ import java.util.Properties;
 
 import org.apache.gobblin.metastore.DatasetStateStore;
 import org.apache.gobblin.runtime.job.JobProgress;
-import org.apache.gobblin.runtime.job.TaskProgress;
+
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.io.Text;
 
 import com.codahale.metrics.Counter;
@@ -38,6 +39,7 @@ import com.codahale.metrics.Meter;
 import com.google.common.base.Enums;
 import com.google.common.base.Optional;
 import com.google.common.base.Strings;
+import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
@@ -327,6 +329,40 @@ public class JobState extends SourceState implements JobProgress {
   }
 
   /**
+   * If not already present, set the {@link ConfigurationKeys#JOB_FAILURE_EXCEPTION_KEY} to a {@link String}
+   * representation of the given {@link Throwable}.
+   */
+  public void setJobFailureException(Throwable jobFailureException) {
+    String previousExceptions = this.getProp(ConfigurationKeys.JOB_FAILURE_EXCEPTION_KEY);
+    String currentException = Throwables.getStackTraceAsString(jobFailureException);
+    String aggregatedExceptions;
+
+    if (StringUtils.isEmpty(previousExceptions)) {
+      aggregatedExceptions = currentException;
+    } else {
+      aggregatedExceptions = currentException + "\n\n" + previousExceptions;
+    }
+
+    this.setProp(ConfigurationKeys.JOB_FAILURE_EXCEPTION_KEY, aggregatedExceptions);
+  }
+
+  /**
+   * If not already present, set the {@link EventMetadataUtils#JOB_FAILURE_MESSAGE_KEY} to the given {@link String}.
+   */
+  public void setJobFailureMessage(String jobFailureMessage) {
+    String previousMessages = this.getProp(ConfigurationKeys.JOB_FAILURE_EXCEPTION_KEY);
+    String aggregatedMessages;
+
+    if (StringUtils.isEmpty(previousMessages)) {
+      aggregatedMessages = jobFailureMessage;
+    } else {
+      aggregatedMessages = jobFailureMessage + ", " + previousMessages;
+    }
+
+    this.setProp(EventMetadataUtils.JOB_FAILURE_MESSAGE_KEY, aggregatedMessages);
+  }
+
+  /**
    * Increment the number of tasks by 1.
    */
   public void incrementTaskCount() {
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
index b271652..d9f63b1 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
@@ -326,6 +326,11 @@ final class SafeDatasetCommit implements Callable<Void> {
         // The dataset state is set to FAILED if any task failed and COMMIT_ON_FULL_SUCCESS is used
         datasetState.setState(JobState.RunningState.FAILED);
         datasetState.incrementJobFailures();
+        Optional<String> taskStateException = taskState.getTaskFailureException();
+        String errMsg = "At least one task did not committed successfully. Setting dataset state to FAILED. "
+            + (taskStateException.isPresent() ? taskStateException.get() : "Exception not set.");
+        log.warn(errMsg);
+        datasetState.setJobFailureMessage(errMsg);
         return;
       }
     }
@@ -401,8 +406,10 @@ final class SafeDatasetCommit implements Callable<Void> {
           //    dataset failed to be committed.
           datasetState.setState(JobState.RunningState.FAILED);
           Optional<String> taskStateException = taskState.getTaskFailureException();
-          log.warn("At least one task did not committed successfully. Setting dataset state to FAILED.",
-              taskStateException.isPresent() ? taskStateException.get() : "Exception not set.");
+          String errMsg = "At least one task did not committed successfully. Setting dataset state to FAILED. "
+              + (taskStateException.isPresent() ? taskStateException.get() : "Exception not set.");
+          log.warn(errMsg);
+          datasetState.setJobFailureMessage(errMsg);
         }
       }
     }