You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/05/11 17:06:43 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-1127] Provide an option to make metric reporting instantiatio…

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 2391157  [GOBBLIN-1127] Provide an option to make metric reporting instantiatio…
2391157 is described below

commit 239115778a08590d7ab5dfa32334efabe0e4fb49
Author: sv2000 <su...@gmail.com>
AuthorDate: Mon May 11 10:04:28 2020 -0700

    [GOBBLIN-1127] Provide an option to make metric reporting instantiatio…
    
    Closes #2967 from sv2000/metricReportFatal
---
 .../cluster/GobblinClusterConfigurationKeys.java   |  7 ++
 .../apache/gobblin/cluster/GobblinTaskRunner.java  | 40 ++++++++--
 .../gobblin/cluster/GobblinTaskRunnerTest.java     | 19 +++++
 .../gobblin/compaction/mapreduce/MRCompactor.java  | 10 ++-
 .../gobblin/metrics/EventReporterException.java}   | 35 +++++----
 .../org/apache/gobblin/metrics/GobblinMetrics.java | 86 ++++++++++++----------
 .../gobblin/metrics/MetricReporterException.java}  | 33 ++++-----
 .../apache/gobblin/metrics/ReporterSinkType.java}  | 30 ++------
 .../apache/gobblin/metrics/GobblinMetricsTest.java | 59 ++++++++++++++-
 .../metrics/kafka/KafkaReporterFactory.java        | 13 ++--
 .../gobblin/runtime/mapreduce/MRJobLauncher.java   | 12 ++-
 .../runtime/services/MetricsReportingService.java  | 13 +++-
 .../java/org/apache/gobblin/yarn/YarnService.java  | 10 ++-
 13 files changed, 251 insertions(+), 116 deletions(-)

diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
index 128a5d6..b545928 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
@@ -197,4 +197,11 @@ public class GobblinClusterConfigurationKeys {
   public static final String CONTAINER_ID_KEY = GOBBLIN_HELIX_PREFIX + "containerId";
 
   public static final String GOBBLIN_CLUSTER_SYSTEM_PROPERTY_PREFIX = GOBBLIN_CLUSTER_PREFIX + "sysProps";
+
+  //Gobblin TaskRunner configuration keys
+  public static final String GOBBLIN_CLUSTER_TASK_RUNNER_METRIC_REPORTING_FAILURE_FATAL = "gobblin.cluster.taskRunner.isMetricReportingFailureFatal";
+  public static final boolean DEFAULT_GOBBLIN_CLUSTER_TASK_RUNNER_METRIC_REPORTING_FAILURE_FATAL = false;
+
+  public static final String GOBBLIN_CLUSTER_TASK_RUNNER_EVENT_REPORTING_FAILURE_FATAL = "gobblin.cluster.taskRunner.isEventReportingFailureFatal";
+  public static final boolean DEFAULT_GOBBLIN_CLUSTER_TASK_RUNNER_EVENT_REPORTING_FAILURE_FATAL = false;
 }
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
index 8e6cdd1..6a8fed4 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
@@ -83,7 +83,9 @@ import lombok.Getter;
 
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.instrumented.StandardMetricsBridge;
+import org.apache.gobblin.metrics.EventReporterException;
 import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.MetricReporterException;
 import org.apache.gobblin.util.ClassAliasResolver;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.FileUtils;
@@ -151,6 +153,8 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
   protected final FileSystem fs;
   protected final String applicationName;
   protected final String applicationId;
+  private final boolean isMetricReportingFailureFatal;
+  private final boolean isEventReportingFailureFatal;
 
   public GobblinTaskRunner(String applicationName,
       String helixInstanceName,
@@ -170,6 +174,15 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
     this.appWorkPath = initAppWorkDir(config, appWorkDirOptional);
     this.clusterConfig = saveConfigToFile(config);
     this.clusterName = this.clusterConfig.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY);
+
+    this.isMetricReportingFailureFatal = ConfigUtils.getBoolean(this.clusterConfig,
+        GobblinClusterConfigurationKeys.GOBBLIN_CLUSTER_TASK_RUNNER_METRIC_REPORTING_FAILURE_FATAL,
+        GobblinClusterConfigurationKeys.DEFAULT_GOBBLIN_CLUSTER_TASK_RUNNER_METRIC_REPORTING_FAILURE_FATAL);
+
+    this.isEventReportingFailureFatal = ConfigUtils.getBoolean(this.clusterConfig,
+        GobblinClusterConfigurationKeys.GOBBLIN_CLUSTER_TASK_RUNNER_EVENT_REPORTING_FAILURE_FATAL,
+        GobblinClusterConfigurationKeys.DEFAULT_GOBBLIN_CLUSTER_TASK_RUNNER_EVENT_REPORTING_FAILURE_FATAL);
+
     logger.info("Configured GobblinTaskRunner work dir to: {}", this.appWorkPath.toString());
 
     // Set system properties passed in via application config. As an example, Helix uses System#getProperty() for ZK configuration
@@ -308,11 +321,7 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
     addInstanceTags();
 
     // Start metric reporting
-    if (this.containerMetrics.isPresent()) {
-      this.containerMetrics.get()
-          .startMetricReportingWithFileSuffix(ConfigUtils.configToState(this.clusterConfig),
-              this.taskRunnerId);
-    }
+    initMetricReporter();
 
     if (this.serviceManager != null) {
       this.serviceManager.startAsync();
@@ -323,6 +332,27 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
     }
   }
 
+  private void initMetricReporter() {
+    if (this.containerMetrics.isPresent()) {
+      try {
+        this.containerMetrics.get()
+            .startMetricReportingWithFileSuffix(ConfigUtils.configToState(this.clusterConfig), this.taskRunnerId);
+      } catch (MetricReporterException e) {
+        if (this.isMetricReportingFailureFatal) {
+          Throwables.propagate(e);
+        } else {
+          logger.error("Failed to start {} event reporter", e.getType().name(), e);
+        }
+      } catch (EventReporterException e) {
+        if (this.isEventReportingFailureFatal) {
+          Throwables.propagate(e);
+        } else {
+          logger.error("Failed to start {} event reporter", e.getType().name(), e);
+        }
+      }
+    }
+  }
+
   public synchronized void stop() {
     if (this.isStopped) {
       logger.info("Gobblin Task runner is already stopped.");
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinTaskRunnerTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinTaskRunnerTest.java
index 98d13df..ee76ede 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinTaskRunnerTest.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinTaskRunnerTest.java
@@ -43,6 +43,7 @@ import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigValueFactory;
 
 import org.apache.gobblin.cluster.suite.IntegrationBasicSuite;
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.testing.AssertWithBackoff;
 
 
@@ -71,6 +72,7 @@ public class GobblinTaskRunnerTest {
 
   private GobblinClusterManager gobblinClusterManager;
   private GobblinTaskRunner corruptGobblinTaskRunner;
+  private GobblinTaskRunner gobblinTaskRunnerFailedReporter;
   private String clusterName;
   private String corruptHelixInstance;
   private TaskAssignmentAfterConnectionRetry suite;
@@ -103,6 +105,18 @@ public class GobblinTaskRunnerTest {
             TestHelper.TEST_APPLICATION_ID, TestHelper.TEST_TASK_RUNNER_ID, config, Optional.<Path>absent());
     this.gobblinTaskRunner.connectHelixManager();
 
+    // Participant that fails to start due to metric reporter failures
+    String instanceName = HelixUtils.getHelixInstanceName("MetricReporterFailureInstance", 0);
+
+    Config metricConfig = config.withValue(ConfigurationKeys.METRICS_ENABLED_KEY, ConfigValueFactory.fromAnyRef(true))
+        .withValue(ConfigurationKeys.METRICS_REPORTING_KAFKA_ENABLED_KEY, ConfigValueFactory.fromAnyRef(true))
+        .withValue(ConfigurationKeys.METRICS_KAFKA_TOPIC_METRICS, ConfigValueFactory.fromAnyRef("metricTopic"))
+        .withValue(GobblinClusterConfigurationKeys.GOBBLIN_CLUSTER_TASK_RUNNER_METRIC_REPORTING_FAILURE_FATAL, ConfigValueFactory.fromAnyRef(true));
+
+    this.gobblinTaskRunnerFailedReporter =
+        new GobblinTaskRunner(TestHelper.TEST_APPLICATION_NAME, instanceName,
+            TestHelper.TEST_APPLICATION_ID, TestHelper.TEST_TASK_RUNNER_ID, metricConfig, Optional.<Path>absent());
+
     // Participant with a partial Instance set up on Helix/ZK
     this.corruptHelixInstance = HelixUtils.getHelixInstanceName("CorruptHelixInstance", 0);
     this.corruptGobblinTaskRunner =
@@ -142,6 +156,11 @@ public class GobblinTaskRunnerTest {
       }, "gobblinTaskRunner stopped");
   }
 
+  @Test (expectedExceptions = RuntimeException.class, expectedExceptionsMessageRegExp = ".*Could not create metric reporter.*")
+  public void testStartUpFailsDueToMetricReporterFailure() {
+      GobblinTaskRunnerTest.this.gobblinTaskRunnerFailedReporter.start();
+  }
+
   @Test
   public void testBuildFileSystemConfig() {
     FileSystem fileSystem = this.gobblinTaskRunner.getFs();
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactor.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactor.java
index 5a937ed..78feb64 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactor.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactor.java
@@ -68,7 +68,9 @@ import org.apache.gobblin.compaction.verify.DataCompletenessVerifier;
 import org.apache.gobblin.compaction.verify.DataCompletenessVerifier.Results;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metrics.EventReporterException;
 import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.MetricReporterException;
 import org.apache.gobblin.metrics.Tag;
 import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.util.ClassAliasResolver;
@@ -364,7 +366,13 @@ public class MRCompactor implements Compactor {
     tags.addAll(Tag.fromMap(ClusterNameTags.getClusterNameTags()));
     GobblinMetrics gobblinMetrics =
         GobblinMetrics.get(this.state.getProp(ConfigurationKeys.JOB_NAME_KEY), null, tags.build());
-    gobblinMetrics.startMetricReporting(this.state.getProperties());
+    try {
+      gobblinMetrics.startMetricReporting(this.state.getProperties());
+    } catch (MetricReporterException e) {
+      LOG.error("Failed to start {} metric reporter.", e.getType().name(), e);
+    } catch (EventReporterException e) {
+      LOG.error("Failed to start {} event reporter.", e.getType().name(), e);
+    }
     return gobblinMetrics;
   }
 
diff --git a/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/GobblinMetricsTest.java b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/EventReporterException.java
similarity index 53%
copy from gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/GobblinMetricsTest.java
copy to gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/EventReporterException.java
index 4a357c8..7804000 100644
--- a/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/GobblinMetricsTest.java
+++ b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/EventReporterException.java
@@ -14,29 +14,28 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.gobblin.metrics;
 
-import java.util.Properties;
+import java.io.IOException;
 
-import org.testng.Assert;
-import org.testng.annotations.Test;
+import lombok.Getter;
 
+public class EventReporterException extends IOException {
+  @Getter
+  private final ReporterSinkType type;
 
-@Test
-public class GobblinMetricsTest {
+  public EventReporterException(Throwable t, ReporterSinkType type) {
+    super(t);
+    this.type = type;
+  }
 
-  /**
-   * Test the {@link GobblinMetrics} instance is removed from {@link GobblinMetricsRegistry} when
-   * it stops metrics reporting
-   */
-  public void testStopReportingMetrics() {
-    String id = getClass().getSimpleName() + "-" + System.currentTimeMillis();
-    GobblinMetrics gobblinMetrics = GobblinMetrics.get(id);
-    gobblinMetrics.startMetricReporting(new Properties());
-    Assert.assertEquals(GobblinMetricsRegistry.getInstance().get(id).get(), gobblinMetrics);
+  public EventReporterException(String message, ReporterSinkType type) {
+    super(message);
+    this.type = type;
+  }
 
-    gobblinMetrics.stopMetricsReporting();
-    Assert.assertFalse(GobblinMetricsRegistry.getInstance().get(id).isPresent());
+  public EventReporterException(String message, Throwable t, ReporterSinkType type) {
+    super(message, t);
+    this.type = type;
   }
-}
+}
\ No newline at end of file
diff --git a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java
index e969083..4af9fde 100644
--- a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java
+++ b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java
@@ -27,7 +27,6 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.lang.exception.ExceptionUtils;
-import org.apache.gobblin.metrics.reporter.FileFailureEventReporter;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -48,6 +47,8 @@ import com.google.common.collect.Lists;
 import com.google.common.io.Closer;
 import com.typesafe.config.Config;
 
+import lombok.Getter;
+
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.metrics.graphite.GraphiteConnectionType;
@@ -56,14 +57,13 @@ import org.apache.gobblin.metrics.graphite.GraphiteReporter;
 import org.apache.gobblin.metrics.influxdb.InfluxDBConnectionType;
 import org.apache.gobblin.metrics.influxdb.InfluxDBEventReporter;
 import org.apache.gobblin.metrics.influxdb.InfluxDBReporter;
+import org.apache.gobblin.metrics.reporter.FileFailureEventReporter;
 import org.apache.gobblin.metrics.reporter.OutputStreamEventReporter;
 import org.apache.gobblin.metrics.reporter.OutputStreamReporter;
 import org.apache.gobblin.metrics.reporter.ScheduledReporter;
 import org.apache.gobblin.password.PasswordManager;
 import org.apache.gobblin.util.PropertiesUtils;
 
-import lombok.Getter;
-
 
 /**
  * A class that represents a set of metrics associated with a given name.
@@ -354,7 +354,8 @@ public class GobblinMetrics {
    * Starts metric reporting and appends the given metrics file suffix to the current value of
    * {@link ConfigurationKeys#METRICS_FILE_SUFFIX}.
    */
-  public void startMetricReportingWithFileSuffix(State state, String metricsFileSuffix) {
+  public void startMetricReportingWithFileSuffix(State state, String metricsFileSuffix)
+      throws MetricReporterException, EventReporterException {
     Properties metricsReportingProps = new Properties();
     metricsReportingProps.putAll(state.getProperties());
 
@@ -374,7 +375,8 @@ public class GobblinMetrics {
    *
    * @param configuration configuration properties
    */
-  public void startMetricReporting(Configuration configuration) {
+  public void startMetricReporting(Configuration configuration)
+      throws MetricReporterException, EventReporterException {
     Properties props = new Properties();
     for (Map.Entry<String, String> entry : configuration) {
       props.put(entry.getKey(), entry.getValue());
@@ -387,7 +389,8 @@ public class GobblinMetrics {
    *
    * @param properties configuration properties
    */
-  public void startMetricReporting(Properties properties) {
+  public void startMetricReporting(Properties properties)
+      throws MetricReporterException, EventReporterException {
     if (this.metricsReportingStarted) {
       LOGGER.warn("Metric reporting has already started");
       return;
@@ -473,7 +476,8 @@ public class GobblinMetrics {
     LOGGER.info("Metrics reporting stopped successfully");
   }
 
-  private void buildFileMetricReporter(Properties properties) {
+  private void buildFileMetricReporter(Properties properties)
+      throws MetricReporterException {
     if (!Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_FILE_ENABLED_KEY,
         ConfigurationKeys.DEFAULT_METRICS_REPORTING_FILE_ENABLED))) {
       return;
@@ -481,9 +485,8 @@ public class GobblinMetrics {
     LOGGER.info("Reporting metrics to log files");
 
     if (!properties.containsKey(ConfigurationKeys.METRICS_LOG_DIR_KEY)) {
-      LOGGER.error(
-          "Not reporting metrics to log files because " + ConfigurationKeys.METRICS_LOG_DIR_KEY + " is undefined");
-      return;
+      throw new MetricReporterException(
+          "Not reporting metrics to log files because " + ConfigurationKeys.METRICS_LOG_DIR_KEY + " is undefined", ReporterSinkType.FILE);
     }
 
     try {
@@ -493,8 +496,7 @@ public class GobblinMetrics {
       // Each job gets its own metric log subdirectory
       Path metricsLogDir = new Path(properties.getProperty(ConfigurationKeys.METRICS_LOG_DIR_KEY), this.getName());
       if (!fs.exists(metricsLogDir) && !fs.mkdirs(metricsLogDir)) {
-        LOGGER.error("Failed to create metric log directory for metrics " + this.getName());
-        return;
+        throw new MetricReporterException("Failed to create metric log directory for metrics " + this.getName(), ReporterSinkType.FILE);
       }
 
       // Add a suffix to file name if specified in properties.
@@ -523,11 +525,12 @@ public class GobblinMetrics {
 
       LOGGER.info("Will start reporting metrics to directory " + metricsLogDir);
     } catch (IOException ioe) {
-      LOGGER.error("Failed to build file metric reporter for job " + this.id, ioe);
+      throw new MetricReporterException("Failed to build file metric reporter for job " + this.id, ioe, ReporterSinkType.FILE);
     }
   }
 
-  private void buildFileFailureEventReporter(Properties properties) {
+  private void buildFileFailureEventReporter(Properties properties)
+      throws EventReporterException {
     if (!Boolean.valueOf(properties.getProperty(ConfigurationKeys.FAILURE_REPORTING_FILE_ENABLED_KEY,
         ConfigurationKeys.DEFAULT_FAILURE_REPORTING_FILE_ENABLED))) {
       return;
@@ -535,9 +538,8 @@ public class GobblinMetrics {
     LOGGER.info("Reporting failure to log files");
 
     if (!properties.containsKey(ConfigurationKeys.FAILURE_LOG_DIR_KEY)) {
-      LOGGER.error(
-          "Not reporting failure to log files because " + ConfigurationKeys.FAILURE_LOG_DIR_KEY + " is undefined");
-      return;
+      throw new EventReporterException(
+          "Not reporting failure to log files because " + ConfigurationKeys.FAILURE_LOG_DIR_KEY + " is undefined", ReporterSinkType.FILE);
     }
 
     try {
@@ -547,8 +549,7 @@ public class GobblinMetrics {
       // Each job gets its own log subdirectory
       Path failureLogDir = new Path(properties.getProperty(ConfigurationKeys.FAILURE_LOG_DIR_KEY), this.getName());
       if (!fs.exists(failureLogDir) && !fs.mkdirs(failureLogDir)) {
-        LOGGER.error("Failed to create failure log directory for metrics " + this.getName());
-        return;
+        throw new EventReporterException("Failed to create failure log directory for metrics " + this.getName(), ReporterSinkType.FILE);
       }
 
       // Add a suffix to file name if specified in properties.
@@ -566,7 +567,7 @@ public class GobblinMetrics {
 
       LOGGER.info("Will start reporting failure to directory " + failureLogDir);
     } catch (IOException ioe) {
-      LOGGER.error("Failed to build file failure event reporter for job " + this.id, ioe);
+      throw new EventReporterException("Failed to build file failure event reporter for job " + this.id, ioe, ReporterSinkType.FILE);
     }
   }
 
@@ -581,7 +582,8 @@ public class GobblinMetrics {
         convertRatesTo(TimeUnit.SECONDS).convertDurationsTo(TimeUnit.MILLISECONDS).build()));
   }
 
-  private void buildKafkaMetricReporter(Properties properties) {
+  private void buildKafkaMetricReporter(Properties properties)
+      throws MetricReporterException, EventReporterException {
     if (!Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_ENABLED_KEY,
         ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_ENABLED))) {
       return;
@@ -589,7 +591,8 @@ public class GobblinMetrics {
     buildScheduledReporter(properties, ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_REPORTER_CLASS, Optional.of("Kafka"));
   }
 
-  private void buildGraphiteMetricReporter(Properties properties) {
+  private void buildGraphiteMetricReporter(Properties properties)
+      throws MetricReporterException, EventReporterException {
     boolean metricsEnabled = PropertiesUtils
         .getPropAsBoolean(properties, ConfigurationKeys.METRICS_REPORTING_GRAPHITE_METRICS_ENABLED_KEY,
             ConfigurationKeys.DEFAULT_METRICS_REPORTING_GRAPHITE_METRICS_ENABLED);
@@ -612,8 +615,7 @@ public class GobblinMetrics {
       Preconditions.checkArgument(properties.containsKey(ConfigurationKeys.METRICS_REPORTING_GRAPHITE_HOSTNAME),
           "Graphite hostname is missing.");
     } catch (IllegalArgumentException exception) {
-      LOGGER.error("Not reporting to Graphite due to missing Graphite configuration(s).", exception);
-      return;
+      throw new MetricReporterException("Missing Graphite configuration(s).", exception, ReporterSinkType.GRAPHITE);
     }
 
     String hostname = properties.getProperty(ConfigurationKeys.METRICS_REPORTING_GRAPHITE_HOSTNAME);
@@ -641,7 +643,7 @@ public class GobblinMetrics {
             .withMetricsPrefix(prefix)
             .build(properties);
       } catch (IOException e) {
-        LOGGER.error("Failed to create Graphite metrics reporter. Will not report metrics to Graphite.", e);
+        throw new MetricReporterException("Failed to create Graphite metrics reporter.", e, ReporterSinkType.GRAPHITE);
       }
     }
 
@@ -663,12 +665,13 @@ public class GobblinMetrics {
         this.codahaleScheduledReporters.add(this.codahaleReportersCloser.register(eventReporter));
       }
       catch (IOException e) {
-        LOGGER.error("Failed to create Graphite event reporter. Will not report events to Graphite.", e);
+        throw new EventReporterException("Failed to create Graphite event reporter.", e, ReporterSinkType.GRAPHITE);
       }
     }
   }
 
-  private void buildInfluxDBMetricReporter(Properties properties) {
+  private void buildInfluxDBMetricReporter(Properties properties)
+      throws MetricReporterException, EventReporterException {
     boolean metricsEnabled = PropertiesUtils
         .getPropAsBoolean(properties, ConfigurationKeys.METRICS_REPORTING_INFLUXDB_METRICS_ENABLED_KEY,
             ConfigurationKeys.DEFAULT_METRICS_REPORTING_INFLUXDB_METRICS_ENABLED);
@@ -691,8 +694,7 @@ public class GobblinMetrics {
       Preconditions.checkArgument(properties.containsKey(ConfigurationKeys.METRICS_REPORTING_INFLUXDB_DATABASE),
           "InfluxDB database name is missing.");
     } catch (IllegalArgumentException exception) {
-      LOGGER.error("Not reporting to InfluxDB due to missing InfluxDB configuration(s).", exception);
-      return;
+      throw new MetricReporterException("Missing InfluxDB configuration(s)", exception, ReporterSinkType.INFLUXDB);
     }
 
     String url = properties.getProperty(ConfigurationKeys.METRICS_REPORTING_INFLUXDB_URL);
@@ -719,7 +721,7 @@ public class GobblinMetrics {
             this.metricContext.getName()) // contains the current job id
             .build(properties);
       } catch (IOException e) {
-        LOGGER.error("Failed to create InfluxDB metrics reporter. Will not report metrics to InfluxDB.", e);
+        throw new MetricReporterException("Failed to create InfluxDB metrics reporter.", e, ReporterSinkType.INFLUXDB);
       }
     }
 
@@ -735,7 +737,7 @@ public class GobblinMetrics {
         this.codahaleScheduledReporters.add(this.codahaleReportersCloser.register(eventReporter));
       }
       catch (IOException e) {
-        LOGGER.error("Failed to create InfluxDB event reporter. Will not report events to InfluxDB.", e);
+        throw new EventReporterException("Failed to create InfluxDB event reporter.", e, ReporterSinkType.INFLUXDB);
       }
     }
   }
@@ -745,7 +747,8 @@ public class GobblinMetrics {
    * {@link org.apache.gobblin.configuration.ConfigurationKeys#METRICS_CUSTOM_BUILDERS}. This allows users to specify custom
    * reporters for Gobblin runtime without having to modify the code.
    */
-  private void buildCustomMetricReporters(Properties properties) {
+  private void buildCustomMetricReporters(Properties properties)
+      throws MetricReporterException, EventReporterException {
     String reporterClasses = properties.getProperty(ConfigurationKeys.METRICS_CUSTOM_BUILDERS);
 
     if (Strings.isNullOrEmpty(reporterClasses)) {
@@ -757,7 +760,8 @@ public class GobblinMetrics {
     }
   }
 
-  private void buildScheduledReporter(Properties properties, String reporterClass, Optional<String> reporterSink) {
+  private void buildScheduledReporter(Properties properties, String reporterClass, Optional<String> reporterSink)
+      throws MetricReporterException, EventReporterException {
     try {
       Class<?> clazz = Class.forName(reporterClass);
 
@@ -779,19 +783,21 @@ public class GobblinMetrics {
         customReporterFactory.newScheduledReporter(properties);
         LOGGER.info("Will start reporting metrics using " + reporterClass);
       } else {
-        throw new IllegalArgumentException("Class " + reporterClass +
+        throw new MetricReporterException("Class " + reporterClass +
             " specified by key " + ConfigurationKeys.METRICS_CUSTOM_BUILDERS + " must implement: "
-            + CustomCodahaleReporterFactory.class + " or " + CustomReporterFactory.class);
+            + CustomCodahaleReporterFactory.class + " or " + CustomReporterFactory.class, ReporterSinkType.CUSTOM);
       }
     } catch (ClassNotFoundException exception) {
-      LOGGER.warn(String
+      throw new MetricReporterException(String
           .format("Failed to create metric reporter: requested CustomReporterFactory %s not found.", reporterClass),
-          exception);
+          exception, ReporterSinkType.CUSTOM);
     } catch (NoSuchMethodException exception) {
-      LOGGER.warn(String.format("Failed to create metric reporter: requested CustomReporterFactory %s "
-          + "does not have parameterless constructor.", reporterClass), exception);
+      throw new MetricReporterException(String.format("Failed to create metric reporter: requested CustomReporterFactory %s "
+          + "does not have parameterless constructor.", reporterClass), exception, ReporterSinkType.CUSTOM);
+    } catch (EventReporterException exception) {
+      throw exception;
     } catch (Exception exception) {
-      LOGGER.warn("Could not create metric reporter from builder " + reporterClass + ".", exception);
+      throw new MetricReporterException("Could not create metric reporter from builder " + reporterClass + ".", exception, ReporterSinkType.CUSTOM);
     }
   }
 }
diff --git a/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/GobblinMetricsTest.java b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/MetricReporterException.java
similarity index 53%
copy from gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/GobblinMetricsTest.java
copy to gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/MetricReporterException.java
index 4a357c8..8631df3 100644
--- a/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/GobblinMetricsTest.java
+++ b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/MetricReporterException.java
@@ -14,29 +14,28 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.gobblin.metrics;
 
-import java.util.Properties;
+import java.io.IOException;
 
-import org.testng.Assert;
-import org.testng.annotations.Test;
+import lombok.Getter;
 
+public class MetricReporterException extends IOException {
+  @Getter
+  private final ReporterSinkType type;
 
-@Test
-public class GobblinMetricsTest {
+  public MetricReporterException(Throwable t, ReporterSinkType type) {
+    super(t);
+    this.type = type;
+  }
 
-  /**
-   * Test the {@link GobblinMetrics} instance is removed from {@link GobblinMetricsRegistry} when
-   * it stops metrics reporting
-   */
-  public void testStopReportingMetrics() {
-    String id = getClass().getSimpleName() + "-" + System.currentTimeMillis();
-    GobblinMetrics gobblinMetrics = GobblinMetrics.get(id);
-    gobblinMetrics.startMetricReporting(new Properties());
-    Assert.assertEquals(GobblinMetricsRegistry.getInstance().get(id).get(), gobblinMetrics);
+  public MetricReporterException(String message, ReporterSinkType type) {
+    super(message);
+    this.type = type;
+  }
 
-    gobblinMetrics.stopMetricsReporting();
-    Assert.assertFalse(GobblinMetricsRegistry.getInstance().get(id).isPresent());
+  public MetricReporterException(String message, Throwable t, ReporterSinkType type) {
+    super(message, t);
+    this.type = type;
   }
 }
diff --git a/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/GobblinMetricsTest.java b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ReporterSinkType.java
similarity index 53%
copy from gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/GobblinMetricsTest.java
copy to gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ReporterSinkType.java
index 4a357c8..ab5398e 100644
--- a/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/GobblinMetricsTest.java
+++ b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ReporterSinkType.java
@@ -14,29 +14,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.gobblin.metrics;
 
-import java.util.Properties;
-
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-
-@Test
-public class GobblinMetricsTest {
-
-  /**
-   * Test the {@link GobblinMetrics} instance is removed from {@link GobblinMetricsRegistry} when
-   * it stops metrics reporting
-   */
-  public void testStopReportingMetrics() {
-    String id = getClass().getSimpleName() + "-" + System.currentTimeMillis();
-    GobblinMetrics gobblinMetrics = GobblinMetrics.get(id);
-    gobblinMetrics.startMetricReporting(new Properties());
-    Assert.assertEquals(GobblinMetricsRegistry.getInstance().get(id).get(), gobblinMetrics);
-
-    gobblinMetrics.stopMetricsReporting();
-    Assert.assertFalse(GobblinMetricsRegistry.getInstance().get(id).isPresent());
-  }
+public enum ReporterSinkType {
+  GRAPHITE,
+  FILE,
+  KAFKA,
+  INFLUXDB,
+  CONSOLE,
+  CUSTOM
 }
diff --git a/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/GobblinMetricsTest.java b/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/GobblinMetricsTest.java
index 4a357c8..40f9438 100644
--- a/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/GobblinMetricsTest.java
+++ b/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/GobblinMetricsTest.java
@@ -22,21 +22,74 @@ import java.util.Properties;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.util.ConfigUtils;
+
 
 @Test
 public class GobblinMetricsTest {
-
   /**
    * Test the {@link GobblinMetrics} instance is removed from {@link GobblinMetricsRegistry} when
    * it stops metrics reporting
    */
-  public void testStopReportingMetrics() {
+  public void testStopReportingMetrics()
+      throws MetricReporterException, EventReporterException {
     String id = getClass().getSimpleName() + "-" + System.currentTimeMillis();
     GobblinMetrics gobblinMetrics = GobblinMetrics.get(id);
-    gobblinMetrics.startMetricReporting(new Properties());
+
+    Properties properties = new Properties();
+    properties.put(ConfigurationKeys.FAILURE_REPORTING_FILE_ENABLED_KEY, "false");
+
+    gobblinMetrics.startMetricReporting(properties);
     Assert.assertEquals(GobblinMetricsRegistry.getInstance().get(id).get(), gobblinMetrics);
 
     gobblinMetrics.stopMetricsReporting();
     Assert.assertFalse(GobblinMetricsRegistry.getInstance().get(id).isPresent());
   }
+
+  public void testMetricFileReporterThrowsException() {
+    String id = getClass().getSimpleName() + "-" + System.currentTimeMillis();
+    GobblinMetrics gobblinMetrics = GobblinMetrics.get(id);
+
+    //Enable file reporter without specifying metrics.log.dir.
+    Config config = ConfigFactory.empty()
+        .withValue(ConfigurationKeys.METRICS_REPORTING_FILE_ENABLED_KEY, ConfigValueFactory.fromAnyRef(true));
+
+    Properties properties = ConfigUtils.configToProperties(config);
+    //Ensure MetricReporterException is thrown
+    try {
+      gobblinMetrics.startMetricReporting(properties);
+      Assert.fail("Metric reporting unexpectedly succeeded.");
+    } catch (MetricReporterException e) {
+      //Do nothing. Expected to be here.
+    } catch (EventReporterException e) {
+      Assert.fail("Unexpected exception " + e.getMessage());
+    }
+  }
+
+  public void testMetricFileReporterSuccessful() {
+    String id = getClass().getSimpleName() + "-" + System.currentTimeMillis();
+    GobblinMetrics gobblinMetrics = GobblinMetrics.get(id);
+
+    //Enable file reporter without specifying metrics.log.dir.
+    Config config = ConfigFactory.empty()
+        .withValue(ConfigurationKeys.METRICS_REPORTING_FILE_ENABLED_KEY, ConfigValueFactory.fromAnyRef(true))
+        .withValue(ConfigurationKeys.METRICS_LOG_DIR_KEY, ConfigValueFactory.fromAnyRef("/tmp"))
+        .withValue(ConfigurationKeys.FAILURE_LOG_DIR_KEY, ConfigValueFactory.fromAnyRef("/tmp"));
+
+    Properties properties = ConfigUtils.configToProperties(config);
+    //Ensure MetricReporterException is thrown
+    try {
+      gobblinMetrics.startMetricReporting(properties);
+    } catch (MetricReporterException e) {
+      Assert.fail("Unexpected exception " + e.getMessage());
+    } catch (EventReporterException e) {
+      Assert.fail("Unexpected exception " + e.getMessage());
+    }
+  }
+
 }
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java
index 5c12360..4c64c31 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java
@@ -29,7 +29,10 @@ import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metrics.CustomCodahaleReporterFactory;
+import org.apache.gobblin.metrics.EventReporterException;
 import org.apache.gobblin.metrics.KafkaReportingFormats;
+import org.apache.gobblin.metrics.MetricReporterException;
+import org.apache.gobblin.metrics.ReporterSinkType;
 import org.apache.gobblin.metrics.RootMetricContext;
 import org.apache.gobblin.metrics.reporter.util.KafkaReporterUtils;
 
@@ -37,7 +40,8 @@ import org.apache.gobblin.metrics.reporter.util.KafkaReporterUtils;
 @Slf4j
 public class KafkaReporterFactory implements CustomCodahaleReporterFactory {
   @Override
-  public ScheduledReporter newScheduledReporter(MetricRegistry registry, Properties properties) {
+  public ScheduledReporter newScheduledReporter(MetricRegistry registry, Properties properties)
+      throws IOException {
     if (!Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_ENABLED_KEY,
         ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_ENABLED))) {
       return null;
@@ -63,8 +67,7 @@ public class KafkaReporterFactory implements CustomCodahaleReporterFactory {
           "Kafka metrics brokers missing.");
       Preconditions.checkArgument(KafkaReporterUtils.getMetricsTopic(properties).or(eventsTopic).or(defaultTopic).isPresent(), "Kafka topic missing.");
     } catch (IllegalArgumentException exception) {
-      log.error("Not reporting metrics to Kafka due to missing Kafka configuration(s).", exception);
-      return null;
+      throw new MetricReporterException("Missing Kafka configuration(s).", exception, ReporterSinkType.KAFKA);
     }
 
     String brokers = properties.getProperty(ConfigurationKeys.METRICS_KAFKA_BROKERS);
@@ -87,7 +90,7 @@ public class KafkaReporterFactory implements CustomCodahaleReporterFactory {
       try {
         formatEnum.buildMetricsReporter(brokers, metricsTopic.or(defaultTopic).get(), properties);
       } catch (IOException exception) {
-        log.error("Failed to create Kafka metrics reporter. Will not report metrics to Kafka.", exception);
+        throw new MetricReporterException("Failed to create Kafka metrics reporter.", exception, ReporterSinkType.KAFKA);
       }
     }
 
@@ -115,7 +118,7 @@ public class KafkaReporterFactory implements CustomCodahaleReporterFactory {
 
         return reporter;
       } catch (IOException exception) {
-        log.error("Failed to create Kafka events reporter. Will not report events to Kafka.", exception);
+        throw new EventReporterException("Failed to create Kafka events reporter.", exception, ReporterSinkType.KAFKA);
       }
     }
 
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
index c649595..b509f09 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
@@ -73,7 +73,9 @@ import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.fsm.FiniteStateMachine;
 import org.apache.gobblin.metastore.FsStateStore;
 import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.metrics.EventReporterException;
 import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.MetricReporterException;
 import org.apache.gobblin.metrics.Tag;
 import org.apache.gobblin.metrics.event.CountEventBuilder;
 import org.apache.gobblin.metrics.event.JobEvent;
@@ -783,8 +785,14 @@ public class MRJobLauncher extends AbstractJobLauncher {
       if (Boolean.valueOf(
           configuration.get(ConfigurationKeys.METRICS_ENABLED_KEY, ConfigurationKeys.DEFAULT_METRICS_ENABLED))) {
         this.jobMetrics = Optional.of(JobMetrics.get(this.jobState));
-        this.jobMetrics.get()
-            .startMetricReportingWithFileSuffix(gobblinJobState, context.getTaskAttemptID().toString());
+        try {
+          this.jobMetrics.get()
+              .startMetricReportingWithFileSuffix(gobblinJobState, context.getTaskAttemptID().toString());
+        } catch (MetricReporterException e) {
+          LOG.error("Failed to start {} metric reporter.", e.getType().name(), e);
+        } catch (EventReporterException e) {
+          LOG.error("Failed to start {} event reporter.", e.getType().name(), e);
+        }
       }
     }
 
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/services/MetricsReportingService.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/services/MetricsReportingService.java
index 43b53cd..2456b66 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/services/MetricsReportingService.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/services/MetricsReportingService.java
@@ -21,12 +21,17 @@ import java.util.Properties;
 
 import com.google.common.util.concurrent.AbstractIdleService;
 
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.metrics.EventReporterException;
 import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.MetricReporterException;
 
 
 /**
  * A {@link com.google.common.util.concurrent.Service} for handling life cycle events around {@link GobblinMetrics}.
  */
+@Slf4j
 public class MetricsReportingService extends AbstractIdleService {
 
   private final Properties properties;
@@ -39,7 +44,13 @@ public class MetricsReportingService extends AbstractIdleService {
 
   @Override
   protected void startUp() throws Exception {
-    GobblinMetrics.get(this.appId).startMetricReporting(this.properties);
+    try {
+      GobblinMetrics.get(this.appId).startMetricReporting(this.properties);
+    } catch (MetricReporterException e) {
+      log.error("Failed to start {} metric reporter", e.getType().name(), e);
+    } catch (EventReporterException e) {
+      log.error("Failed to start {} event reporter", e.getType().name(), e);
+    }
   }
 
   @Override
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
index 3f127eb..5e1946e 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
@@ -99,7 +99,9 @@ import org.apache.gobblin.cluster.GobblinClusterUtils;
 import org.apache.gobblin.cluster.HelixUtils;
 import org.apache.gobblin.cluster.event.ClusterManagerShutdownRequest;
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.EventReporterException;
 import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.MetricReporterException;
 import org.apache.gobblin.metrics.Tag;
 import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.util.ConfigUtils;
@@ -402,7 +404,13 @@ public class YarnService extends AbstractIdleService {
 
     // Intialize Gobblin metrics and start reporters
     GobblinMetrics gobblinMetrics = GobblinMetrics.get(this.applicationId, null, tags.build());
-    gobblinMetrics.startMetricReporting(ConfigUtils.configToProperties(config));
+    try {
+      gobblinMetrics.startMetricReporting(ConfigUtils.configToProperties(config));
+    } catch (MetricReporterException  e) {
+      LOGGER.error("Failed to start {} metric reporter.", e.getType().name(), e);
+    } catch (EventReporterException e) {
+      LOGGER.error("Failed to start {} event reporter.", e.getType().name(), e);
+    }
 
     return gobblinMetrics;
   }