You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ib...@apache.org on 2017/11/07 23:25:54 UTC
incubator-gobblin git commit: [GOBBLIN-273] Add job failure monitoring
Repository: incubator-gobblin
Updated Branches:
refs/heads/master c385f1ddd -> 6198120e3
[GOBBLIN-273] Add job failure monitoring
Closes #2125 from zxcware/msg
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/6198120e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/6198120e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/6198120e
Branch: refs/heads/master
Commit: 6198120e3f9f969064ee1bc2686f38f41ec32172
Parents: c385f1d
Author: zhchen <zh...@linkedin.com>
Authored: Tue Nov 7 15:25:46 2017 -0800
Committer: Issac Buenrostro <ib...@apache.org>
Committed: Tue Nov 7 15:25:46 2017 -0800
----------------------------------------------------------------------
.../configuration/ConfigurationKeys.java | 1 +
.../apache/gobblin/metrics/MetricContext.java | 25 ++--
.../metrics/event/FailureEventBuilder.java | 116 +++++++++++++++++++
.../reporter/FileFailureEventReporter.java | 94 +++++++++++++++
.../reporter/OutputStreamEventReporter.java | 3 +-
.../reporter/FileFailureEventReporterTest.java | 69 +++++++++++
.../apache/gobblin/metrics/GobblinMetrics.java | 41 +++++++
.../gobblin/r2/R2RestResponseHandler.java | 25 ++--
.../apache/gobblin/writer/AsyncHttpWriter.java | 29 ++++-
.../org/apache/gobblin/runtime/JobState.java | 26 ++++-
.../gobblin/runtime/SafeDatasetCommit.java | 16 +++
.../java/org/apache/gobblin/runtime/Task.java | 11 ++
.../plugins/email/EmailNotificationPlugin.java | 4 +-
.../gobblin/runtime/TaskContinuousTest.java | 2 +
.../org/apache/gobblin/runtime/TaskTest.java | 2 +
15 files changed, 437 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6198120e/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 0ef8416..c8de615 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -618,6 +618,7 @@ public class ConfigurationKeys {
public static final String METRICS_LOG_DIR_KEY = METRICS_CONFIGURATIONS_PREFIX + "log.dir";
public static final String METRICS_FILE_SUFFIX = METRICS_CONFIGURATIONS_PREFIX + "reporting.file.suffix";
public static final String DEFAULT_METRICS_FILE_SUFFIX = "";
+ public static final String FAILURE_LOG_DIR_KEY = "failure.log.dir";
// JMX-based reporting
public static final String METRICS_REPORTING_JMX_ENABLED_KEY =
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6198120e/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContext.java
----------------------------------------------------------------------
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContext.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContext.java
index e3dd939..dcc1029 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContext.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContext.java
@@ -184,6 +184,20 @@ public class MetricContext extends MetricRegistry implements ReportableContext,
}
/**
+ * Inject the tags of this {@link MetricContext} to the given {@link GobblinTrackingEvent}
+ */
+ private void injectTagsToEvent(GobblinTrackingEvent event) {
+ Map<String, String> originalMetadata = event.getMetadata();
+ Map<String, Object> tags = getTagMap();
+ Map<String, String> newMetadata = Maps.newHashMap();
+ for(Map.Entry<String, Object> entry : tags.entrySet()) {
+ newMetadata.put(entry.getKey(), entry.getValue().toString());
+ }
+ newMetadata.putAll(originalMetadata);
+ event.setMetadata(newMetadata);
+ }
+
+ /**
* Submit {@link org.apache.gobblin.metrics.GobblinTrackingEvent} to all notification listeners attached to this or any
* ancestor {@link org.apache.gobblin.metrics.MetricContext}s. The argument for this method is mutated by the method, so it
* should not be reused by the caller.
@@ -193,16 +207,7 @@ public class MetricContext extends MetricRegistry implements ReportableContext,
*/
public void submitEvent(GobblinTrackingEvent nonReusableEvent) {
nonReusableEvent.setTimestamp(System.currentTimeMillis());
-
- // Inject metric context tags into event metadata.
- Map<String, String> originalMetadata = nonReusableEvent.getMetadata();
- Map<String, Object> tags = getTagMap();
- Map<String, String> newMetadata = Maps.newHashMap();
- for(Map.Entry<String, Object> entry : tags.entrySet()) {
- newMetadata.put(entry.getKey(), entry.getValue().toString());
- }
- newMetadata.putAll(originalMetadata);
- nonReusableEvent.setMetadata(newMetadata);
+ injectTagsToEvent(nonReusableEvent);
EventNotification notification = new EventNotification(nonReusableEvent);
sendNotification(notification);
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6198120e/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/FailureEventBuilder.java
----------------------------------------------------------------------
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/FailureEventBuilder.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/FailureEventBuilder.java
new file mode 100644
index 0000000..89f83f5
--- /dev/null
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/FailureEventBuilder.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.event;
+
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.gobblin.metrics.GobblinTrackingEvent;
+import org.apache.gobblin.metrics.MetricContext;
+
+import com.google.common.collect.Maps;
+
+import lombok.Getter;
+
+
+/**
+ * A failure event builds a specific {@link GobblinTrackingEvent} whose metadata has
+ * {@value EventSubmitter#EVENT_TYPE} to be {@value #EVENT_TYPE}
+ *
+ * <p>
+ * Note: A {@link FailureEventBuilder} instance is not reusable
+ */
+public class FailureEventBuilder {
+ private static final String EVENT_TYPE = "FailureEvent";
+ private static final String EVENT_NAMESPACE = "gobblin.event";
+ private static final String ROOT_CAUSE = "rootException";
+
+ @Getter
+ private final String name;
+ @Getter
+ private final String namespace;
+ private final Map<String, String> metadata;
+
+ private Throwable rootCause;
+
+ public FailureEventBuilder(String name) {
+ this(name, EVENT_NAMESPACE);
+ }
+
+ public FailureEventBuilder(String name, String namespace) {
+ this.name = name;
+ this.namespace = namespace;
+ metadata = Maps.newHashMap();
+ metadata.put(EventSubmitter.EVENT_TYPE, EVENT_TYPE);
+ }
+
+ /**
+ * Given an throwable, get its root cause and set as a metadata
+ */
+ public void setRootCause(Throwable t) {
+ rootCause = getRootCause(t);
+ }
+
+ /**
+ * Add a metadata pair
+ */
+ public void addMetadata(String key, String value) {
+ metadata.put(key, value);
+ }
+
+ /**
+ * Add additional metadata
+ */
+ public void addAdditionalMetadata(Map<String, String> additionalMetadata) {
+ metadata.putAll(additionalMetadata);
+ }
+
+ /**
+ * Build as {@link GobblinTrackingEvent}
+ */
+ public GobblinTrackingEvent build() {
+ if (rootCause != null) {
+ metadata.put(ROOT_CAUSE, ExceptionUtils.getStackTrace(rootCause));
+ }
+ return new GobblinTrackingEvent(0L, EVENT_NAMESPACE, name, metadata);
+ }
+
+ /**
+ * Submit the event
+ */
+ public void submit(MetricContext context) {
+ context.submitEvent(build());
+ }
+
+ /**
+ * Check if the given {@link GobblinTrackingEvent} is a failiure event
+ */
+ public static boolean isFailureEvent(GobblinTrackingEvent event) {
+ String eventType = event.getMetadata().get(EventSubmitter.EVENT_TYPE);
+ return StringUtils.isNotEmpty(eventType) && eventType.equals(EVENT_TYPE);
+ }
+
+ private static Throwable getRootCause(Throwable t) {
+ Throwable rootCause = ExceptionUtils.getRootCause(t);
+ if (rootCause == null) {
+ rootCause = t;
+ }
+ return rootCause;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6198120e/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/FileFailureEventReporter.java
----------------------------------------------------------------------
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/FileFailureEventReporter.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/FileFailureEventReporter.java
new file mode 100644
index 0000000..42e858e
--- /dev/null
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/FileFailureEventReporter.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.reporter;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.util.Queue;
+
+import org.apache.gobblin.metrics.GobblinTrackingEvent;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.FailureEventBuilder;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Charsets;
+
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * An {@link OutputStreamEventReporter} reports only failure event build by {@link FailureEventBuilder}. It won't create
+ * the failure log file until a failure event is processed
+ */
+@Slf4j
+public class FileFailureEventReporter extends OutputStreamEventReporter {
+ private final FileSystem fs;
+ private final Path failureLogFile;
+ private volatile boolean hasSetupOutputStream;
+
+ public FileFailureEventReporter(MetricContext context, FileSystem fs, Path failureLogFile)
+ throws IOException {
+ super(OutputStreamEventReporter.forContext(context));
+ this.fs = fs;
+ this.failureLogFile = failureLogFile;
+ hasSetupOutputStream = false;
+ }
+
+ @Override
+ public void addEventToReportingQueue(GobblinTrackingEvent event) {
+ if (FailureEventBuilder.isFailureEvent(event)) {
+ super.addEventToReportingQueue(event);
+ }
+ }
+
+ @Override
+ public void reportEventQueue(Queue<GobblinTrackingEvent> queue) {
+ if (queue.size() > 0) {
+ setupOutputStream();
+ super.reportEventQueue(queue);
+ }
+ }
+
+ /**
+ * Set up the {@link OutputStream} to the {@link #failureLogFile} only once
+ */
+ private void setupOutputStream() {
+ synchronized (failureLogFile) {
+ // Setup is done by some thread
+ if (hasSetupOutputStream) {
+ return;
+ }
+
+ try {
+ boolean append = false;
+ if (fs.exists(failureLogFile)) {
+ log.info("Failure log file %s already exists, appending to it", failureLogFile);
+ append = true;
+ }
+ OutputStream outputStream = append ? fs.append(failureLogFile) : fs.create(failureLogFile);
+ output = this.closer.register(new PrintStream(outputStream, false, Charsets.UTF_8.toString()));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } finally {
+ hasSetupOutputStream = true;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6198120e/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/OutputStreamEventReporter.java
----------------------------------------------------------------------
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/OutputStreamEventReporter.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/OutputStreamEventReporter.java
index e0415fe..48c4f57 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/OutputStreamEventReporter.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/OutputStreamEventReporter.java
@@ -48,7 +48,8 @@ public class OutputStreamEventReporter extends EventReporter {
private static final Logger LOGGER = LoggerFactory.getLogger(OutputStreamEventReporter.class);
private static final int CONSOLE_WIDTH = 80;
- private final PrintStream output;
+ protected PrintStream output;
+
protected final AvroSerializer<GobblinTrackingEvent> serializer;
private final ByteArrayOutputStream outputBuffer;
private final PrintStream outputBufferPrintStream;
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6198120e/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/reporter/FileFailureEventReporterTest.java
----------------------------------------------------------------------
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/reporter/FileFailureEventReporterTest.java b/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/reporter/FileFailureEventReporterTest.java
new file mode 100644
index 0000000..389e6ab
--- /dev/null
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/reporter/FileFailureEventReporterTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.reporter;
+
+import java.io.IOException;
+
+import org.apache.gobblin.metrics.GobblinTrackingEvent;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.FailureEventBuilder;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.testng.annotations.Test;
+
+import avro.shaded.com.google.common.collect.Maps;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
+
+
+public class FileFailureEventReporterTest {
+ @Test
+ public void testReport()
+ throws IOException {
+ MetricContext testContext = MetricContext.builder(getClass().getCanonicalName()).build();
+ FileSystem fs = mock(FileSystem.class);
+ Path failureLogPath = mock(Path.class);
+ FSDataOutputStream outputStream = mock(FSDataOutputStream.class);
+
+ FileFailureEventReporter reporter = new FileFailureEventReporter(testContext, fs, failureLogPath);
+ when(fs.exists(any())).thenReturn(true);
+ when(fs.append(any())).thenReturn(outputStream);
+
+ final String eventName = "testEvent";
+ final String eventNamespace = "testNamespace";
+ GobblinTrackingEvent event =
+ new GobblinTrackingEvent(0L, eventNamespace, eventName, Maps.newHashMap());
+
+ // Noop on normal event
+ testContext.submitEvent(event);
+ verify(fs, never()).append(failureLogPath);
+ verify(outputStream, never()).write(anyByte());
+
+ // Process failure event
+ FailureEventBuilder failureEvent = new FailureEventBuilder(eventName, eventNamespace);
+ failureEvent.submit(testContext);
+ reporter.report();
+ // Failure log output is setup
+ verify(fs, times(1)).append(failureLogPath);
+ // Report successfully
+ doAnswer( invocation -> null ).when(outputStream).write(any(byte[].class), anyInt(), anyInt());
+ verify(outputStream, times(1)).write(any(byte[].class), anyInt(), anyInt());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6198120e/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java
----------------------------------------------------------------------
diff --git a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java
index e123158..ae20031 100644
--- a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java
+++ b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java
@@ -26,6 +26,7 @@ import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
+import org.apache.gobblin.metrics.reporter.FileFailureEventReporter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -405,6 +406,7 @@ public class GobblinMetrics {
buildGraphiteMetricReporter(properties);
buildInfluxDBMetricReporter(properties);
buildCustomMetricReporters(properties);
+ buildFileFailureEventReporter(properties);
// Start reporters that implement org.apache.gobblin.metrics.report.ScheduledReporter
RootMetricContext.get().startReporting();
@@ -491,7 +493,9 @@ public class GobblinMetrics {
}
OutputStream output = append ? fs.append(metricLogFile) : fs.create(metricLogFile, true);
+ // Add metrics reporter
OutputStreamReporter.Factory.newBuilder().outputTo(output).build(properties);
+ // Set up events reporter at the same time!!
this.codahaleScheduledReporters.add(this.codahaleReportersCloser
.register(OutputStreamEventReporter.forContext(RootMetricContext.get()).outputTo(output).build()));
@@ -501,6 +505,43 @@ public class GobblinMetrics {
}
}
+ private void buildFileFailureEventReporter(Properties properties) {
+ if (!properties.containsKey(ConfigurationKeys.FAILURE_LOG_DIR_KEY)) {
+ LOGGER.error(
+ "Not reporting failure to log files because " + ConfigurationKeys.FAILURE_LOG_DIR_KEY + " is undefined");
+ return;
+ }
+
+ try {
+ String fsUri = properties.getProperty(ConfigurationKeys.FS_URI_KEY, ConfigurationKeys.LOCAL_FS_URI);
+ FileSystem fs = FileSystem.get(URI.create(fsUri), new Configuration());
+
+ // Each job gets its own log subdirectory
+ Path failureLogDir = new Path(properties.getProperty(ConfigurationKeys.FAILURE_LOG_DIR_KEY), this.getName());
+ if (!fs.exists(failureLogDir) && !fs.mkdirs(failureLogDir)) {
+ LOGGER.error("Failed to create failure log directory for metrics " + this.getName());
+ return;
+ }
+
+ // Add a suffix to file name if specified in properties.
+ String metricsFileSuffix =
+ properties.getProperty(ConfigurationKeys.METRICS_FILE_SUFFIX, ConfigurationKeys.DEFAULT_METRICS_FILE_SUFFIX);
+ if (!Strings.isNullOrEmpty(metricsFileSuffix) && !metricsFileSuffix.startsWith(".")) {
+ metricsFileSuffix = "." + metricsFileSuffix;
+ }
+
+ // Each job run gets its own failure log file
+ Path failureLogFile =
+ new Path(failureLogDir, this.id + metricsFileSuffix + ".failure.log");
+ this.codahaleScheduledReporters.add(this.codahaleReportersCloser
+ .register(new FileFailureEventReporter(RootMetricContext.get(), fs, failureLogFile)));
+
+ LOGGER.info("Will start reporting failure to directory " + failureLogDir);
+ } catch (IOException ioe) {
+ LOGGER.error("Failed to build file failure event reporter for job " + this.id, ioe);
+ }
+ }
+
private void buildJmxMetricReporter(Properties properties) {
if (!Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_JMX_ENABLED_KEY,
ConfigurationKeys.DEFAULT_METRICS_REPORTING_JMX_ENABLED))) {
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6198120e/gobblin-modules/gobblin-http/src/main/java/org/apache/gobblin/r2/R2RestResponseHandler.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-http/src/main/java/org/apache/gobblin/r2/R2RestResponseHandler.java b/gobblin-modules/gobblin-http/src/main/java/org/apache/gobblin/r2/R2RestResponseHandler.java
index c947cd4..0ae794b 100644
--- a/gobblin-modules/gobblin-http/src/main/java/org/apache/gobblin/r2/R2RestResponseHandler.java
+++ b/gobblin-modules/gobblin-http/src/main/java/org/apache/gobblin/r2/R2RestResponseHandler.java
@@ -19,12 +19,14 @@ package org.apache.gobblin.r2;
import com.google.common.collect.Maps;
import com.linkedin.r2.message.rest.RestRequest;
import com.linkedin.r2.message.rest.RestResponse;
+
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.http.ResponseHandler;
import org.apache.gobblin.http.StatusType;
import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.GobblinTrackingEvent;
import org.apache.gobblin.metrics.MetricContext;
-import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.FailureEventBuilder;
import org.apache.gobblin.net.Request;
import org.apache.gobblin.utils.HttpConstants;
import org.apache.gobblin.utils.HttpUtils;
@@ -48,10 +50,9 @@ public class R2RestResponseHandler implements ResponseHandler<RestRequest, RestR
public static final String CONTENT_TYPE_HEADER = "Content-Type";
private final String R2_RESPONSE_EVENT_NAMESPACE = "r2.response";
- private final String R2_FAILED_REQUEST = "R2FailedRequest";
+ private final String R2_FAILED_REQUEST_EVENT = "r2FailedRequest";
private final Set<String> errorCodeWhitelist;
- MetricContext metricsContext;
- EventSubmitter eventSubmitter;
+ private MetricContext metricsContext;
public R2RestResponseHandler() {
this(new HashSet<>(), Instrumented.getMetricContext(new State(), R2RestResponseHandler.class));
@@ -60,7 +61,6 @@ public class R2RestResponseHandler implements ResponseHandler<RestRequest, RestR
public R2RestResponseHandler(Set<String> errorCodeWhitelist, MetricContext metricContext) {
this.errorCodeWhitelist = errorCodeWhitelist;
this.metricsContext = metricContext;
- eventSubmitter = new EventSubmitter.Builder(metricsContext, R2_RESPONSE_EVENT_NAMESPACE).build();
}
@Override
@@ -74,11 +74,20 @@ public class R2RestResponseHandler implements ResponseHandler<RestRequest, RestR
status.setContent(response.getEntity());
status.setContentType(response.getHeader(CONTENT_TYPE_HEADER));
} else {
+ log.info("Receive an unsuccessful response with status code: " + statusCode);
+
Map<String, String> metadata = Maps.newHashMap();
- metadata.put(HttpConstants.REQUEST, request.toString());
metadata.put(HttpConstants.STATUS_CODE, String.valueOf(statusCode));
- eventSubmitter.submit(R2_FAILED_REQUEST, metadata);
- log.info("Receive an unsuccessful response with status code: " + statusCode);
+ metadata.put(HttpConstants.REQUEST, request.toString());
+ if (status.getType() != StatusType.CONTINUE) {
+ FailureEventBuilder failureEvent = new FailureEventBuilder(R2_FAILED_REQUEST_EVENT);
+ failureEvent.addAdditionalMetadata(metadata);
+ failureEvent.submit(metricsContext);
+ } else {
+ GobblinTrackingEvent event =
+ new GobblinTrackingEvent(0L, R2_RESPONSE_EVENT_NAMESPACE, R2_FAILED_REQUEST_EVENT, metadata);
+ metricsContext.submitEvent(event);
+ }
}
return status;
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6198120e/gobblin-modules/gobblin-http/src/main/java/org/apache/gobblin/writer/AsyncHttpWriter.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-http/src/main/java/org/apache/gobblin/writer/AsyncHttpWriter.java b/gobblin-modules/gobblin-http/src/main/java/org/apache/gobblin/writer/AsyncHttpWriter.java
index 3396de9..6c1b105 100644
--- a/gobblin-modules/gobblin-http/src/main/java/org/apache/gobblin/writer/AsyncHttpWriter.java
+++ b/gobblin-modules/gobblin-http/src/main/java/org/apache/gobblin/writer/AsyncHttpWriter.java
@@ -20,6 +20,9 @@ package org.apache.gobblin.writer;
import java.io.IOException;
import java.util.Queue;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.FailureEventBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,6 +48,8 @@ import org.apache.gobblin.http.ResponseStatus;
@Slf4j
public class AsyncHttpWriter<D, RQ, RP> extends AbstractAsyncDataWriter<D> {
private static final Logger LOG = LoggerFactory.getLogger(AsyncHttpWriter.class);
+ private static final String ASYNC_REQUEST = "asyncRequest";
+ private static final String FATAL_ASYNC_HTTP_WRITE_EVENT = "fatalAsyncHttpWrite";
public static final int DEFAULT_MAX_ATTEMPTS = 3;
@@ -53,12 +58,15 @@ public class AsyncHttpWriter<D, RQ, RP> extends AbstractAsyncDataWriter<D> {
private final AsyncRequestBuilder<D, RQ> requestBuilder;
private final int maxAttempts;
+ private final MetricContext context;
+
public AsyncHttpWriter(AsyncHttpWriterBuilder builder) {
super(builder.getQueueCapacity());
this.httpClient = builder.getClient();
this.requestBuilder = builder.getAsyncRequestBuilder();
this.responseHandler = builder.getResponseHandler();
this.maxAttempts = builder.getMaxAttempts();
+ this.context = Instrumented.getMetricContext(builder.getState(), AsyncHttpWriter.class);
}
@Override
@@ -82,8 +90,9 @@ public class AsyncHttpWriter<D, RQ, RP> extends AbstractAsyncDataWriter<D> {
LOG.error("Fail to send request");
LOG.info(asyncRequest.toString());
- onFailure(asyncRequest, e);
- throw new DispatchException("Write failed on IOException", e);
+ DispatchException de = new DispatchException("Write failed on IOException", e);
+ onFailure(asyncRequest, de);
+ throw de;
} else {
continue;
}
@@ -152,13 +161,29 @@ public class AsyncHttpWriter<D, RQ, RP> extends AbstractAsyncDataWriter<D> {
/**
* Callback on failing to send the asyncRequest
+ *
+ * @deprecated Use {@link #onFailure(AsyncRequest, DispatchException)}
*/
+ @Deprecated
protected void onFailure(AsyncRequest<D, RQ> asyncRequest, Throwable throwable) {
for (AsyncRequest.Thunk thunk: asyncRequest.getThunks()) {
thunk.callback.onFailure(throwable);
}
}
+ protected void onFailure(AsyncRequest<D, RQ> asyncRequest, DispatchException exception) {
+ if (exception.isFatal()) {
+ // Report failure event
+ FailureEventBuilder failureEvent = new FailureEventBuilder(FATAL_ASYNC_HTTP_WRITE_EVENT);
+ failureEvent.setRootCause(exception);
+ failureEvent.addMetadata(ASYNC_REQUEST, asyncRequest.toString());
+ failureEvent.submit(context);
+ }
+ for (AsyncRequest.Thunk thunk : asyncRequest.getThunks()) {
+ thunk.callback.onFailure(exception);
+ }
+ }
+
@Override
public void close()
throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6198120e/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java
index 3bd3006..6c16de1 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java
@@ -554,11 +554,7 @@ public class JobState extends SourceState {
public void toJson(JsonWriter jsonWriter, boolean keepConfig)
throws IOException {
jsonWriter.beginObject();
-
- jsonWriter.name("job name").value(this.getJobName()).name("job id").value(this.getJobId()).name("job state")
- .value(this.getState().name()).name("start time").value(this.getStartTime()).name("end time")
- .value(this.getEndTime()).name("duration").value(this.getDuration()).name("tasks").value(this.getTaskCount())
- .name("completed tasks").value(this.getCompletedTasks());
+ writeStateSummary(jsonWriter);
jsonWriter.name("task states");
jsonWriter.beginArray();
@@ -578,6 +574,19 @@ public class JobState extends SourceState {
jsonWriter.endObject();
}
+ /**
+ * Write a summary to the json document
+ *
+ * @param jsonWriter a {@link com.google.gson.stream.JsonWriter}
+ * used to write the json document
+ */
+ protected void writeStateSummary(JsonWriter jsonWriter) throws IOException {
+ jsonWriter.name("job name").value(this.getJobName()).name("job id").value(this.getJobId()).name("job state")
+ .value(this.getState().name()).name("start time").value(this.getStartTime()).name("end time")
+ .value(this.getEndTime()).name("duration").value(this.getDuration()).name("tasks").value(this.getTaskCount())
+ .name("completed tasks").value(this.getCompletedTasks());
+ }
+
protected void propsToJson(JsonWriter jsonWriter)
throws IOException {
jsonWriter.beginObject();
@@ -805,5 +814,12 @@ public class JobState extends SourceState {
public void overrideWith(Properties properties) {
throw new UnsupportedOperationException();
}
+
+ @Override
+ protected void writeStateSummary(JsonWriter jsonWriter)
+ throws IOException {
+ super.writeStateSummary(jsonWriter);
+ jsonWriter.name("datasetUrn").value(getDatasetUrn());
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6198120e/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
index 07d167b..d6a1b58 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
@@ -39,6 +39,7 @@ import org.apache.gobblin.lineage.LineageException;
import org.apache.gobblin.lineage.LineageInfo;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.FailureEventBuilder;
import org.apache.gobblin.publisher.CommitSequencePublisher;
import org.apache.gobblin.publisher.DataPublisher;
import org.apache.gobblin.publisher.UnpublishedHandling;
@@ -63,6 +64,9 @@ final class SafeDatasetCommit implements Callable<Void> {
private static final Object GLOBAL_LOCK = new Object();
+ private static final String DATASET_STATE = "datasetState";
+ private static final String FAILED_DATASET_EVENT = "failedDataset";
+
private final boolean shouldCommitDataInJob;
private final boolean isJobCancelled;
private final DeliverySemantics deliverySemantics;
@@ -100,6 +104,8 @@ final class SafeDatasetCommit implements Callable<Void> {
log.error("Failed to instantiate data publisher for dataset %s of job %s.", this.datasetUrn,
this.jobContext.getJobId(), roe);
throw new RuntimeException(roe);
+ } finally {
+ maySubmitFailureEvent(datasetState);
}
if (this.isJobCancelled) {
@@ -163,7 +169,9 @@ final class SafeDatasetCommit implements Callable<Void> {
} finally {
try {
finalizeDatasetState(datasetState, datasetUrn);
+ maySubmitFailureEvent(datasetState);
submitLineageEvent(datasetState.getTaskStates());
+
if (commitSequenceBuilder.isPresent()) {
buildAndExecuteCommitSequence(commitSequenceBuilder.get(), datasetState, datasetUrn);
datasetState.setState(JobState.RunningState.COMMITTED);
@@ -181,6 +189,14 @@ final class SafeDatasetCommit implements Callable<Void> {
return null;
}
+ private void maySubmitFailureEvent(JobState.DatasetState datasetState) {
+ if (datasetState.getState() == JobState.RunningState.FAILED) {
+ FailureEventBuilder failureEvent = new FailureEventBuilder(FAILED_DATASET_EVENT);
+ failureEvent.addMetadata(DATASET_STATE, datasetState.toString());
+ failureEvent.submit(metricContext);
+ }
+ }
+
private void submitLineageEvent(Collection<TaskState> states) {
if (states.size() == 0) {
return;
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6198120e/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
index 65cf611..3265ab8 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
@@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.BooleanUtils;
+import org.apache.gobblin.metrics.event.FailureEventBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
@@ -114,6 +115,9 @@ public class Task implements TaskIFace {
private static final Logger LOG = LoggerFactory.getLogger(Task.class);
+ private static final String TASK_STATE = "taskState";
+ private static final String FAILED_TASK_EVENT = "failedTask";
+
private final String jobId;
private final String taskId;
private final String taskKey;
@@ -448,6 +452,7 @@ public class Task implements TaskIFace {
}
} catch (Exception e) {
if (!(e instanceof DataConversionException) && !(e.getCause() instanceof DataConversionException)) {
+ LOG.error("Processing record incurs an unexpected exception: ", e);
throw new RuntimeException(e.getCause());
}
errRecords++;
@@ -511,6 +516,12 @@ public class Task implements TaskIFace {
LOG.error(String.format("Task %s failed", this.taskId), t);
this.taskState.setWorkingState(WorkUnitState.WorkingState.FAILED);
this.taskState.setProp(ConfigurationKeys.TASK_FAILURE_EXCEPTION_KEY, Throwables.getStackTraceAsString(t));
+
+ // Send task failure event
+ FailureEventBuilder failureEvent = new FailureEventBuilder(FAILED_TASK_EVENT);
+ failureEvent.setRootCause(t);
+ failureEvent.addMetadata(TASK_STATE, this.taskState.toString());
+ failureEvent.submit(taskContext.getTaskMetrics().getMetricContext());
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6198120e/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/plugins/email/EmailNotificationPlugin.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/plugins/email/EmailNotificationPlugin.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/plugins/email/EmailNotificationPlugin.java
index 3cfc67b..5122391 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/plugins/email/EmailNotificationPlugin.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/plugins/email/EmailNotificationPlugin.java
@@ -117,7 +117,9 @@ public class EmailNotificationPlugin extends BaseIdlePluginImpl {
}
private static String getEmailBody(JobExecutionState state, RunningState previousStatus, RunningState newStatus) {
- return new StringBuilder().append("RunningState: ").append(newStatus.toString()).append("\n")
+ return new StringBuilder().append("JobId: ")
+ .append(state.getJobSpec().getConfig().getString(ConfigurationKeys.JOB_ID_KEY))
+ .append("RunningState: ").append(newStatus.toString()).append("\n")
.append("JobExecutionState: ").append(state.getJobSpec().toLongString()).append("\n")
.append("ExecutionMetadata: ").append(state.getExecutionMetadata()).toString();
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6198120e/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TaskContinuousTest.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TaskContinuousTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TaskContinuousTest.java
index f4f3dfb..ef57684 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TaskContinuousTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TaskContinuousTest.java
@@ -31,6 +31,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import org.apache.gobblin.runtime.util.TaskMetrics;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.testng.Assert;
@@ -348,6 +349,7 @@ public class TaskContinuousTest {
// Create a mock TaskContext
TaskContext mockTaskContext = mock(TaskContext.class);
+ when(mockTaskContext.getTaskMetrics()).thenReturn(TaskMetrics.get(taskState));
when(mockTaskContext.getExtractor()).thenReturn(mockExtractor);
when(mockTaskContext.getRawSourceExtractor()).thenReturn(mockExtractor);
when(mockTaskContext.getWatermarkStorage()).thenReturn(mockWatermarkStorage);
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6198120e/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TaskTest.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TaskTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TaskTest.java
index 184943d..84c4f0a 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TaskTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TaskTest.java
@@ -35,6 +35,7 @@ import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.gobblin.runtime.util.TaskMetrics;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.testng.Assert;
@@ -109,6 +110,7 @@ public class TaskTest {
taskState.addAll(overrides);
// Create a mock TaskContext
TaskContext mockTaskContext = mock(TaskContext.class);
+ when(mockTaskContext.getTaskMetrics()).thenReturn(TaskMetrics.get(taskState));
when(mockTaskContext.getExtractor()).thenReturn(new FailOnceExtractor());
when(mockTaskContext.getForkOperator()).thenReturn(new IdentityForkOperator());
when(mockTaskContext.getTaskState()).thenReturn(taskState);