You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ib...@apache.org on 2017/11/07 23:25:54 UTC

incubator-gobblin git commit: [GOBBLIN-273] Add job failure monitoring

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master c385f1ddd -> 6198120e3


[GOBBLIN-273] Add job failure monitoring

Closes #2125 from zxcware/msg


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/6198120e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/6198120e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/6198120e

Branch: refs/heads/master
Commit: 6198120e3f9f969064ee1bc2686f38f41ec32172
Parents: c385f1d
Author: zhchen <zh...@linkedin.com>
Authored: Tue Nov 7 15:25:46 2017 -0800
Committer: Issac Buenrostro <ib...@apache.org>
Committed: Tue Nov 7 15:25:46 2017 -0800

----------------------------------------------------------------------
 .../configuration/ConfigurationKeys.java        |   1 +
 .../apache/gobblin/metrics/MetricContext.java   |  25 ++--
 .../metrics/event/FailureEventBuilder.java      | 116 +++++++++++++++++++
 .../reporter/FileFailureEventReporter.java      |  94 +++++++++++++++
 .../reporter/OutputStreamEventReporter.java     |   3 +-
 .../reporter/FileFailureEventReporterTest.java  |  69 +++++++++++
 .../apache/gobblin/metrics/GobblinMetrics.java  |  41 +++++++
 .../gobblin/r2/R2RestResponseHandler.java       |  25 ++--
 .../apache/gobblin/writer/AsyncHttpWriter.java  |  29 ++++-
 .../org/apache/gobblin/runtime/JobState.java    |  26 ++++-
 .../gobblin/runtime/SafeDatasetCommit.java      |  16 +++
 .../java/org/apache/gobblin/runtime/Task.java   |  11 ++
 .../plugins/email/EmailNotificationPlugin.java  |   4 +-
 .../gobblin/runtime/TaskContinuousTest.java     |   2 +
 .../org/apache/gobblin/runtime/TaskTest.java    |   2 +
 15 files changed, 437 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6198120e/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 0ef8416..c8de615 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -618,6 +618,7 @@ public class ConfigurationKeys {
   public static final String METRICS_LOG_DIR_KEY = METRICS_CONFIGURATIONS_PREFIX + "log.dir";
   public static final String METRICS_FILE_SUFFIX = METRICS_CONFIGURATIONS_PREFIX + "reporting.file.suffix";
   public static final String DEFAULT_METRICS_FILE_SUFFIX = "";
+  public static final String FAILURE_LOG_DIR_KEY =  "failure.log.dir";
 
   // JMX-based reporting
   public static final String METRICS_REPORTING_JMX_ENABLED_KEY =

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6198120e/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContext.java
----------------------------------------------------------------------
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContext.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContext.java
index e3dd939..dcc1029 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContext.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContext.java
@@ -184,6 +184,20 @@ public class MetricContext extends MetricRegistry implements ReportableContext,
   }
 
   /**
+   * Inject the tags of this {@link MetricContext} to the given {@link GobblinTrackingEvent}
+   */
+  private void injectTagsToEvent(GobblinTrackingEvent event) {
+    Map<String, String> originalMetadata = event.getMetadata();
+    Map<String, Object> tags = getTagMap();
+    Map<String, String> newMetadata = Maps.newHashMap();
+    for(Map.Entry<String, Object> entry : tags.entrySet()) {
+      newMetadata.put(entry.getKey(), entry.getValue().toString());
+    }
+    newMetadata.putAll(originalMetadata);
+    event.setMetadata(newMetadata);
+  }
+
+  /**
    * Submit {@link org.apache.gobblin.metrics.GobblinTrackingEvent} to all notification listeners attached to this or any
    * ancestor {@link org.apache.gobblin.metrics.MetricContext}s. The argument for this method is mutated by the method, so it
    * should not be reused by the caller.
@@ -193,16 +207,7 @@ public class MetricContext extends MetricRegistry implements ReportableContext,
    */
   public void submitEvent(GobblinTrackingEvent nonReusableEvent) {
     nonReusableEvent.setTimestamp(System.currentTimeMillis());
-
-    // Inject metric context tags into event metadata.
-    Map<String, String> originalMetadata = nonReusableEvent.getMetadata();
-    Map<String, Object> tags = getTagMap();
-    Map<String, String> newMetadata = Maps.newHashMap();
-    for(Map.Entry<String, Object> entry : tags.entrySet()) {
-      newMetadata.put(entry.getKey(), entry.getValue().toString());
-    }
-    newMetadata.putAll(originalMetadata);
-    nonReusableEvent.setMetadata(newMetadata);
+    injectTagsToEvent(nonReusableEvent);
 
     EventNotification notification = new EventNotification(nonReusableEvent);
     sendNotification(notification);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6198120e/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/FailureEventBuilder.java
----------------------------------------------------------------------
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/FailureEventBuilder.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/FailureEventBuilder.java
new file mode 100644
index 0000000..89f83f5
--- /dev/null
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/FailureEventBuilder.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.event;
+
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.gobblin.metrics.GobblinTrackingEvent;
+import org.apache.gobblin.metrics.MetricContext;
+
+import com.google.common.collect.Maps;
+
+import lombok.Getter;
+
+
+/**
+ * A failure event builds a specific {@link GobblinTrackingEvent} whose metadata has
+ * {@value EventSubmitter#EVENT_TYPE} to be {@value #EVENT_TYPE}
+ *
+ * <p>
+ * Note: A {@link FailureEventBuilder} instance is not reusable
+ */
+public class FailureEventBuilder {
+  private static final String EVENT_TYPE = "FailureEvent";
+  private static final String EVENT_NAMESPACE = "gobblin.event";
+  private static final String ROOT_CAUSE = "rootException";
+
+  @Getter
+  private final String name;
+  @Getter
+  private final String namespace;
+  private final Map<String, String> metadata;
+
+  private Throwable rootCause;
+
+  public FailureEventBuilder(String name) {
+    this(name, EVENT_NAMESPACE);
+  }
+
+  public FailureEventBuilder(String name, String namespace) {
+    this.name = name;
+    this.namespace = namespace;
+    metadata = Maps.newHashMap();
+    metadata.put(EventSubmitter.EVENT_TYPE, EVENT_TYPE);
+  }
+
+  /**
+   * Given an throwable, get its root cause and set as a metadata
+   */
+  public void setRootCause(Throwable t) {
+    rootCause = getRootCause(t);
+  }
+
+  /**
+   * Add a metadata pair
+   */
+  public void addMetadata(String key, String value) {
+    metadata.put(key, value);
+  }
+
+  /**
+   * Add additional metadata
+   */
+  public void addAdditionalMetadata(Map<String, String> additionalMetadata) {
+    metadata.putAll(additionalMetadata);
+  }
+
+  /**
+   * Build as {@link GobblinTrackingEvent}
+   */
+  public GobblinTrackingEvent build() {
+    if (rootCause != null) {
+      metadata.put(ROOT_CAUSE, ExceptionUtils.getStackTrace(rootCause));
+    }
+    return new GobblinTrackingEvent(0L, EVENT_NAMESPACE, name, metadata);
+  }
+
+  /**
+   * Submit the event
+   */
+  public void submit(MetricContext context) {
+    context.submitEvent(build());
+  }
+
+  /**
+   * Check if the given {@link GobblinTrackingEvent} is a failiure event
+   */
+  public static boolean isFailureEvent(GobblinTrackingEvent event) {
+    String eventType = event.getMetadata().get(EventSubmitter.EVENT_TYPE);
+    return StringUtils.isNotEmpty(eventType) && eventType.equals(EVENT_TYPE);
+  }
+
+  private static Throwable getRootCause(Throwable t) {
+    Throwable rootCause = ExceptionUtils.getRootCause(t);
+    if (rootCause == null) {
+      rootCause = t;
+    }
+    return rootCause;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6198120e/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/FileFailureEventReporter.java
----------------------------------------------------------------------
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/FileFailureEventReporter.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/FileFailureEventReporter.java
new file mode 100644
index 0000000..42e858e
--- /dev/null
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/FileFailureEventReporter.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.reporter;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.util.Queue;
+
+import org.apache.gobblin.metrics.GobblinTrackingEvent;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.FailureEventBuilder;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Charsets;
+
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * An {@link OutputStreamEventReporter} reports only failure event build by {@link FailureEventBuilder}. It won't create
+ * the failure log file until a failure event is processed
+ */
+@Slf4j
+public class FileFailureEventReporter extends OutputStreamEventReporter {
+  private final FileSystem fs;
+  private final Path failureLogFile;
+  private volatile boolean hasSetupOutputStream;
+
+  public FileFailureEventReporter(MetricContext context, FileSystem fs, Path failureLogFile)
+      throws IOException {
+    super(OutputStreamEventReporter.forContext(context));
+    this.fs = fs;
+    this.failureLogFile = failureLogFile;
+    hasSetupOutputStream = false;
+  }
+
+  @Override
+  public void addEventToReportingQueue(GobblinTrackingEvent event) {
+    if (FailureEventBuilder.isFailureEvent(event)) {
+      super.addEventToReportingQueue(event);
+    }
+  }
+
+  @Override
+  public void reportEventQueue(Queue<GobblinTrackingEvent> queue) {
+    if (queue.size() > 0) {
+      setupOutputStream();
+      super.reportEventQueue(queue);
+    }
+  }
+
+  /**
+   * Set up the {@link OutputStream} to the {@link #failureLogFile} only once
+   */
+  private void setupOutputStream() {
+    synchronized (failureLogFile) {
+      // Setup is done by some thread
+      if (hasSetupOutputStream) {
+        return;
+      }
+
+      try {
+        boolean append = false;
+        if (fs.exists(failureLogFile)) {
+          log.info("Failure log file %s already exists, appending to it", failureLogFile);
+          append = true;
+        }
+        OutputStream outputStream = append ? fs.append(failureLogFile) : fs.create(failureLogFile);
+        output = this.closer.register(new PrintStream(outputStream, false, Charsets.UTF_8.toString()));
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      } finally {
+        hasSetupOutputStream = true;
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6198120e/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/OutputStreamEventReporter.java
----------------------------------------------------------------------
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/OutputStreamEventReporter.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/OutputStreamEventReporter.java
index e0415fe..48c4f57 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/OutputStreamEventReporter.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/OutputStreamEventReporter.java
@@ -48,7 +48,8 @@ public class OutputStreamEventReporter extends EventReporter {
   private static final Logger LOGGER = LoggerFactory.getLogger(OutputStreamEventReporter.class);
   private static final int CONSOLE_WIDTH = 80;
 
-  private final PrintStream output;
+  protected PrintStream output;
+
   protected final AvroSerializer<GobblinTrackingEvent> serializer;
   private final ByteArrayOutputStream outputBuffer;
   private final PrintStream outputBufferPrintStream;

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6198120e/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/reporter/FileFailureEventReporterTest.java
----------------------------------------------------------------------
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/reporter/FileFailureEventReporterTest.java b/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/reporter/FileFailureEventReporterTest.java
new file mode 100644
index 0000000..389e6ab
--- /dev/null
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/reporter/FileFailureEventReporterTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.reporter;
+
+import java.io.IOException;
+
+import org.apache.gobblin.metrics.GobblinTrackingEvent;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.FailureEventBuilder;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.testng.annotations.Test;
+
+import avro.shaded.com.google.common.collect.Maps;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
+
+
+public class FileFailureEventReporterTest {
+  @Test
+  public void testReport()
+      throws IOException {
+    MetricContext testContext = MetricContext.builder(getClass().getCanonicalName()).build();
+    FileSystem fs = mock(FileSystem.class);
+    Path failureLogPath = mock(Path.class);
+    FSDataOutputStream outputStream = mock(FSDataOutputStream.class);
+
+    FileFailureEventReporter reporter = new FileFailureEventReporter(testContext, fs, failureLogPath);
+    when(fs.exists(any())).thenReturn(true);
+    when(fs.append(any())).thenReturn(outputStream);
+
+    final String eventName = "testEvent";
+    final String eventNamespace = "testNamespace";
+    GobblinTrackingEvent event =
+        new GobblinTrackingEvent(0L, eventNamespace, eventName, Maps.newHashMap());
+
+    // Noop on normal event
+    testContext.submitEvent(event);
+    verify(fs, never()).append(failureLogPath);
+    verify(outputStream, never()).write(anyByte());
+
+    // Process failure event
+    FailureEventBuilder failureEvent = new FailureEventBuilder(eventName, eventNamespace);
+    failureEvent.submit(testContext);
+    reporter.report();
+    // Failure log output is setup
+    verify(fs, times(1)).append(failureLogPath);
+    // Report successfully
+    doAnswer( invocation -> null ).when(outputStream).write(any(byte[].class), anyInt(), anyInt());
+    verify(outputStream, times(1)).write(any(byte[].class), anyInt(), anyInt());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6198120e/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java
----------------------------------------------------------------------
diff --git a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java
index e123158..ae20031 100644
--- a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java
+++ b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java
@@ -26,6 +26,7 @@ import java.util.Properties;
 import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.gobblin.metrics.reporter.FileFailureEventReporter;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -405,6 +406,7 @@ public class GobblinMetrics {
     buildGraphiteMetricReporter(properties);
     buildInfluxDBMetricReporter(properties);
     buildCustomMetricReporters(properties);
+    buildFileFailureEventReporter(properties);
 
     // Start reporters that implement org.apache.gobblin.metrics.report.ScheduledReporter
     RootMetricContext.get().startReporting();
@@ -491,7 +493,9 @@ public class GobblinMetrics {
       }
 
       OutputStream output = append ? fs.append(metricLogFile) : fs.create(metricLogFile, true);
+      // Add metrics reporter
       OutputStreamReporter.Factory.newBuilder().outputTo(output).build(properties);
+      // Set up events reporter at the same time!!
       this.codahaleScheduledReporters.add(this.codahaleReportersCloser
           .register(OutputStreamEventReporter.forContext(RootMetricContext.get()).outputTo(output).build()));
 
@@ -501,6 +505,43 @@ public class GobblinMetrics {
     }
   }
 
+  private void buildFileFailureEventReporter(Properties properties) {
+    if (!properties.containsKey(ConfigurationKeys.FAILURE_LOG_DIR_KEY)) {
+      LOGGER.error(
+          "Not reporting failure to log files because " + ConfigurationKeys.FAILURE_LOG_DIR_KEY + " is undefined");
+      return;
+    }
+
+    try {
+      String fsUri = properties.getProperty(ConfigurationKeys.FS_URI_KEY, ConfigurationKeys.LOCAL_FS_URI);
+      FileSystem fs = FileSystem.get(URI.create(fsUri), new Configuration());
+
+      // Each job gets its own log subdirectory
+      Path failureLogDir = new Path(properties.getProperty(ConfigurationKeys.FAILURE_LOG_DIR_KEY), this.getName());
+      if (!fs.exists(failureLogDir) && !fs.mkdirs(failureLogDir)) {
+        LOGGER.error("Failed to create failure log directory for metrics " + this.getName());
+        return;
+      }
+
+      // Add a suffix to file name if specified in properties.
+      String metricsFileSuffix =
+          properties.getProperty(ConfigurationKeys.METRICS_FILE_SUFFIX, ConfigurationKeys.DEFAULT_METRICS_FILE_SUFFIX);
+      if (!Strings.isNullOrEmpty(metricsFileSuffix) && !metricsFileSuffix.startsWith(".")) {
+        metricsFileSuffix = "." + metricsFileSuffix;
+      }
+
+      // Each job run gets its own failure log file
+      Path failureLogFile =
+          new Path(failureLogDir, this.id + metricsFileSuffix + ".failure.log");
+      this.codahaleScheduledReporters.add(this.codahaleReportersCloser
+          .register(new FileFailureEventReporter(RootMetricContext.get(), fs, failureLogFile)));
+
+      LOGGER.info("Will start reporting failure to directory " + failureLogDir);
+    } catch (IOException ioe) {
+      LOGGER.error("Failed to build file failure event reporter for job " + this.id, ioe);
+    }
+  }
+
   private void buildJmxMetricReporter(Properties properties) {
     if (!Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_JMX_ENABLED_KEY,
         ConfigurationKeys.DEFAULT_METRICS_REPORTING_JMX_ENABLED))) {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6198120e/gobblin-modules/gobblin-http/src/main/java/org/apache/gobblin/r2/R2RestResponseHandler.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-http/src/main/java/org/apache/gobblin/r2/R2RestResponseHandler.java b/gobblin-modules/gobblin-http/src/main/java/org/apache/gobblin/r2/R2RestResponseHandler.java
index c947cd4..0ae794b 100644
--- a/gobblin-modules/gobblin-http/src/main/java/org/apache/gobblin/r2/R2RestResponseHandler.java
+++ b/gobblin-modules/gobblin-http/src/main/java/org/apache/gobblin/r2/R2RestResponseHandler.java
@@ -19,12 +19,14 @@ package org.apache.gobblin.r2;
 import com.google.common.collect.Maps;
 import com.linkedin.r2.message.rest.RestRequest;
 import com.linkedin.r2.message.rest.RestResponse;
+
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.http.ResponseHandler;
 import org.apache.gobblin.http.StatusType;
 import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.GobblinTrackingEvent;
 import org.apache.gobblin.metrics.MetricContext;
-import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.FailureEventBuilder;
 import org.apache.gobblin.net.Request;
 import org.apache.gobblin.utils.HttpConstants;
 import org.apache.gobblin.utils.HttpUtils;
@@ -48,10 +50,9 @@ public class R2RestResponseHandler implements ResponseHandler<RestRequest, RestR
 
   public static final String CONTENT_TYPE_HEADER = "Content-Type";
   private final String R2_RESPONSE_EVENT_NAMESPACE = "r2.response";
-  private final String R2_FAILED_REQUEST = "R2FailedRequest";
+  private final String R2_FAILED_REQUEST_EVENT = "r2FailedRequest";
   private final Set<String> errorCodeWhitelist;
-  MetricContext metricsContext;
-  EventSubmitter eventSubmitter;
+  private MetricContext metricsContext;
 
   public R2RestResponseHandler() {
     this(new HashSet<>(), Instrumented.getMetricContext(new State(), R2RestResponseHandler.class));
@@ -60,7 +61,6 @@ public class R2RestResponseHandler implements ResponseHandler<RestRequest, RestR
   public R2RestResponseHandler(Set<String> errorCodeWhitelist, MetricContext metricContext) {
     this.errorCodeWhitelist = errorCodeWhitelist;
     this.metricsContext = metricContext;
-    eventSubmitter = new EventSubmitter.Builder(metricsContext, R2_RESPONSE_EVENT_NAMESPACE).build();
   }
 
   @Override
@@ -74,11 +74,20 @@ public class R2RestResponseHandler implements ResponseHandler<RestRequest, RestR
       status.setContent(response.getEntity());
       status.setContentType(response.getHeader(CONTENT_TYPE_HEADER));
     } else {
+      log.info("Receive an unsuccessful response with status code: " + statusCode);
+
       Map<String, String> metadata = Maps.newHashMap();
-      metadata.put(HttpConstants.REQUEST, request.toString());
       metadata.put(HttpConstants.STATUS_CODE, String.valueOf(statusCode));
-      eventSubmitter.submit(R2_FAILED_REQUEST, metadata);
-      log.info("Receive an unsuccessful response with status code: " + statusCode);
+      metadata.put(HttpConstants.REQUEST, request.toString());
+      if (status.getType() != StatusType.CONTINUE) {
+        FailureEventBuilder failureEvent = new FailureEventBuilder(R2_FAILED_REQUEST_EVENT);
+        failureEvent.addAdditionalMetadata(metadata);
+        failureEvent.submit(metricsContext);
+      } else {
+        GobblinTrackingEvent event =
+            new GobblinTrackingEvent(0L, R2_RESPONSE_EVENT_NAMESPACE, R2_FAILED_REQUEST_EVENT, metadata);
+        metricsContext.submitEvent(event);
+      }
     }
 
     return status;

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6198120e/gobblin-modules/gobblin-http/src/main/java/org/apache/gobblin/writer/AsyncHttpWriter.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-http/src/main/java/org/apache/gobblin/writer/AsyncHttpWriter.java b/gobblin-modules/gobblin-http/src/main/java/org/apache/gobblin/writer/AsyncHttpWriter.java
index 3396de9..6c1b105 100644
--- a/gobblin-modules/gobblin-http/src/main/java/org/apache/gobblin/writer/AsyncHttpWriter.java
+++ b/gobblin-modules/gobblin-http/src/main/java/org/apache/gobblin/writer/AsyncHttpWriter.java
@@ -20,6 +20,9 @@ package org.apache.gobblin.writer;
 import java.io.IOException;
 import java.util.Queue;
 
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.FailureEventBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,6 +48,8 @@ import org.apache.gobblin.http.ResponseStatus;
 @Slf4j
 public class AsyncHttpWriter<D, RQ, RP> extends AbstractAsyncDataWriter<D> {
   private static final Logger LOG = LoggerFactory.getLogger(AsyncHttpWriter.class);
+  private static final String ASYNC_REQUEST = "asyncRequest";
+  private static final String FATAL_ASYNC_HTTP_WRITE_EVENT = "fatalAsyncHttpWrite";
 
   public static final int DEFAULT_MAX_ATTEMPTS = 3;
 
@@ -53,12 +58,15 @@ public class AsyncHttpWriter<D, RQ, RP> extends AbstractAsyncDataWriter<D> {
   private final AsyncRequestBuilder<D, RQ> requestBuilder;
   private final int maxAttempts;
 
+  private final MetricContext context;
+
   public AsyncHttpWriter(AsyncHttpWriterBuilder builder) {
     super(builder.getQueueCapacity());
     this.httpClient = builder.getClient();
     this.requestBuilder = builder.getAsyncRequestBuilder();
     this.responseHandler = builder.getResponseHandler();
     this.maxAttempts = builder.getMaxAttempts();
+    this.context = Instrumented.getMetricContext(builder.getState(), AsyncHttpWriter.class);
   }
 
   @Override
@@ -82,8 +90,9 @@ public class AsyncHttpWriter<D, RQ, RP> extends AbstractAsyncDataWriter<D> {
           LOG.error("Fail to send request");
           LOG.info(asyncRequest.toString());
 
-          onFailure(asyncRequest, e);
-          throw new DispatchException("Write failed on IOException", e);
+          DispatchException de = new DispatchException("Write failed on IOException", e);
+          onFailure(asyncRequest, de);
+          throw de;
         } else {
           continue;
         }
@@ -152,13 +161,29 @@ public class AsyncHttpWriter<D, RQ, RP> extends AbstractAsyncDataWriter<D> {
 
   /**
    * Callback on failing to send the asyncRequest
+   *
+   * @deprecated Use {@link #onFailure(AsyncRequest, DispatchException)}
    */
+  @Deprecated
   protected void onFailure(AsyncRequest<D, RQ> asyncRequest, Throwable throwable) {
     for (AsyncRequest.Thunk thunk: asyncRequest.getThunks()) {
       thunk.callback.onFailure(throwable);
     }
   }
 
+  protected void onFailure(AsyncRequest<D, RQ> asyncRequest, DispatchException exception) {
+    if (exception.isFatal()) {
+      // Report failure event
+      FailureEventBuilder failureEvent = new FailureEventBuilder(FATAL_ASYNC_HTTP_WRITE_EVENT);
+      failureEvent.setRootCause(exception);
+      failureEvent.addMetadata(ASYNC_REQUEST, asyncRequest.toString());
+      failureEvent.submit(context);
+    }
+    for (AsyncRequest.Thunk thunk : asyncRequest.getThunks()) {
+      thunk.callback.onFailure(exception);
+    }
+  }
+
   @Override
   public void close()
       throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6198120e/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java
index 3bd3006..6c16de1 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java
@@ -554,11 +554,7 @@ public class JobState extends SourceState {
   public void toJson(JsonWriter jsonWriter, boolean keepConfig)
       throws IOException {
     jsonWriter.beginObject();
-
-    jsonWriter.name("job name").value(this.getJobName()).name("job id").value(this.getJobId()).name("job state")
-        .value(this.getState().name()).name("start time").value(this.getStartTime()).name("end time")
-        .value(this.getEndTime()).name("duration").value(this.getDuration()).name("tasks").value(this.getTaskCount())
-        .name("completed tasks").value(this.getCompletedTasks());
+    writeStateSummary(jsonWriter);
 
     jsonWriter.name("task states");
     jsonWriter.beginArray();
@@ -578,6 +574,19 @@ public class JobState extends SourceState {
     jsonWriter.endObject();
   }
 
+  /**
+   * Write a summary to the json document
+   *
+   * @param jsonWriter a {@link com.google.gson.stream.JsonWriter}
+   *                   used to write the json document
+   */
+  protected void writeStateSummary(JsonWriter jsonWriter) throws IOException {
+    jsonWriter.name("job name").value(this.getJobName()).name("job id").value(this.getJobId()).name("job state")
+        .value(this.getState().name()).name("start time").value(this.getStartTime()).name("end time")
+        .value(this.getEndTime()).name("duration").value(this.getDuration()).name("tasks").value(this.getTaskCount())
+        .name("completed tasks").value(this.getCompletedTasks());
+  }
+
   protected void propsToJson(JsonWriter jsonWriter)
       throws IOException {
     jsonWriter.beginObject();
@@ -805,5 +814,12 @@ public class JobState extends SourceState {
     public void overrideWith(Properties properties) {
       throw new UnsupportedOperationException();
     }
+
+    @Override
+    protected void writeStateSummary(JsonWriter jsonWriter)
+        throws IOException {
+      super.writeStateSummary(jsonWriter);
+      jsonWriter.name("datasetUrn").value(getDatasetUrn());
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6198120e/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
index 07d167b..d6a1b58 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
@@ -39,6 +39,7 @@ import org.apache.gobblin.lineage.LineageException;
 import org.apache.gobblin.lineage.LineageInfo;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.FailureEventBuilder;
 import org.apache.gobblin.publisher.CommitSequencePublisher;
 import org.apache.gobblin.publisher.DataPublisher;
 import org.apache.gobblin.publisher.UnpublishedHandling;
@@ -63,6 +64,9 @@ final class SafeDatasetCommit implements Callable<Void> {
 
   private static final Object GLOBAL_LOCK = new Object();
 
+  private static final String DATASET_STATE = "datasetState";
+  private static final String FAILED_DATASET_EVENT = "failedDataset";
+
   private final boolean shouldCommitDataInJob;
   private final boolean isJobCancelled;
   private final DeliverySemantics deliverySemantics;
@@ -100,6 +104,8 @@ final class SafeDatasetCommit implements Callable<Void> {
       log.error("Failed to instantiate data publisher for dataset %s of job %s.", this.datasetUrn,
           this.jobContext.getJobId(), roe);
       throw new RuntimeException(roe);
+    } finally {
+      maySubmitFailureEvent(datasetState);
     }
 
     if (this.isJobCancelled) {
@@ -163,7 +169,9 @@ final class SafeDatasetCommit implements Callable<Void> {
     } finally {
       try {
         finalizeDatasetState(datasetState, datasetUrn);
+        maySubmitFailureEvent(datasetState);
         submitLineageEvent(datasetState.getTaskStates());
+
         if (commitSequenceBuilder.isPresent()) {
           buildAndExecuteCommitSequence(commitSequenceBuilder.get(), datasetState, datasetUrn);
           datasetState.setState(JobState.RunningState.COMMITTED);
@@ -181,6 +189,14 @@ final class SafeDatasetCommit implements Callable<Void> {
     return null;
   }
 
+  private void maySubmitFailureEvent(JobState.DatasetState datasetState) {
+    if (datasetState.getState() == JobState.RunningState.FAILED) {
+      FailureEventBuilder failureEvent = new FailureEventBuilder(FAILED_DATASET_EVENT);
+      failureEvent.addMetadata(DATASET_STATE, datasetState.toString());
+      failureEvent.submit(metricContext);
+    }
+  }
+
   private void submitLineageEvent(Collection<TaskState> states) {
     if (states.size() == 0) {
       return;

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6198120e/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
index 65cf611..3265ab8 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
@@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.lang3.BooleanUtils;
+import org.apache.gobblin.metrics.event.FailureEventBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
@@ -114,6 +115,9 @@ public class Task implements TaskIFace {
 
   private static final Logger LOG = LoggerFactory.getLogger(Task.class);
 
+  private static final String TASK_STATE = "taskState";
+  private static final String FAILED_TASK_EVENT = "failedTask";
+
   private final String jobId;
   private final String taskId;
   private final String taskKey;
@@ -448,6 +452,7 @@ public class Task implements TaskIFace {
           }
         } catch (Exception e) {
           if (!(e instanceof DataConversionException) && !(e.getCause() instanceof DataConversionException)) {
+            LOG.error("Processing record incurs an unexpected exception: ", e);
             throw new RuntimeException(e.getCause());
           }
           errRecords++;
@@ -511,6 +516,12 @@ public class Task implements TaskIFace {
     LOG.error(String.format("Task %s failed", this.taskId), t);
     this.taskState.setWorkingState(WorkUnitState.WorkingState.FAILED);
     this.taskState.setProp(ConfigurationKeys.TASK_FAILURE_EXCEPTION_KEY, Throwables.getStackTraceAsString(t));
+
+    // Send task failure event
+    FailureEventBuilder failureEvent = new FailureEventBuilder(FAILED_TASK_EVENT);
+    failureEvent.setRootCause(t);
+    failureEvent.addMetadata(TASK_STATE, this.taskState.toString());
+    failureEvent.submit(taskContext.getTaskMetrics().getMetricContext());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6198120e/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/plugins/email/EmailNotificationPlugin.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/plugins/email/EmailNotificationPlugin.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/plugins/email/EmailNotificationPlugin.java
index 3cfc67b..5122391 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/plugins/email/EmailNotificationPlugin.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/plugins/email/EmailNotificationPlugin.java
@@ -117,7 +117,9 @@ public class EmailNotificationPlugin extends BaseIdlePluginImpl {
     }
 
     private static String getEmailBody(JobExecutionState state, RunningState previousStatus, RunningState newStatus) {
-      return new StringBuilder().append("RunningState: ").append(newStatus.toString()).append("\n")
+      return new StringBuilder().append("JobId: ")
+          .append(state.getJobSpec().getConfig().getString(ConfigurationKeys.JOB_ID_KEY))
+          .append("RunningState: ").append(newStatus.toString()).append("\n")
           .append("JobExecutionState: ").append(state.getJobSpec().toLongString()).append("\n")
           .append("ExecutionMetadata: ").append(state.getExecutionMetadata()).toString();
     }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6198120e/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TaskContinuousTest.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TaskContinuousTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TaskContinuousTest.java
index f4f3dfb..ef57684 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TaskContinuousTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TaskContinuousTest.java
@@ -31,6 +31,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.gobblin.runtime.util.TaskMetrics;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.testng.Assert;
@@ -348,6 +349,7 @@ public class TaskContinuousTest {
 
     // Create a mock TaskContext
     TaskContext mockTaskContext = mock(TaskContext.class);
+    when(mockTaskContext.getTaskMetrics()).thenReturn(TaskMetrics.get(taskState));
     when(mockTaskContext.getExtractor()).thenReturn(mockExtractor);
     when(mockTaskContext.getRawSourceExtractor()).thenReturn(mockExtractor);
     when(mockTaskContext.getWatermarkStorage()).thenReturn(mockWatermarkStorage);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6198120e/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TaskTest.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TaskTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TaskTest.java
index 184943d..84c4f0a 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TaskTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TaskTest.java
@@ -35,6 +35,7 @@ import java.util.Random;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.gobblin.runtime.util.TaskMetrics;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.testng.Assert;
@@ -109,6 +110,7 @@ public class TaskTest {
     taskState.addAll(overrides);
     // Create a mock TaskContext
     TaskContext mockTaskContext = mock(TaskContext.class);
+    when(mockTaskContext.getTaskMetrics()).thenReturn(TaskMetrics.get(taskState));
     when(mockTaskContext.getExtractor()).thenReturn(new FailOnceExtractor());
     when(mockTaskContext.getForkOperator()).thenReturn(new IdentityForkOperator());
     when(mockTaskContext.getTaskState()).thenReturn(taskState);