You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/05/20 18:57:43 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-1127][GOBBLIN-1153] Revert " Provide an option to make metric reporting instantiatio…"
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 2305c26 [GOBBLIN-1127][GOBBLIN-1153] Revert " Provide an option to make metric reporting instantiatio…"
2305c26 is described below
commit 2305c26cd5813dbae88b4922fecc6d6737c460d9
Author: sv2000 <su...@gmail.com>
AuthorDate: Wed May 20 11:57:35 2020 -0700
[GOBBLIN-1127][GOBBLIN-1153] Revert " Provide an option to make metric reporting instantiatio…"
This reverts commit
239115778a08590d7ab5dfa32334efabe0e4fb49.
Closes #2992 from sv2000/revertMetricReportFailure
---
.../cluster/GobblinClusterConfigurationKeys.java | 7 --
.../apache/gobblin/cluster/GobblinTaskRunner.java | 40 ++--------
.../gobblin/cluster/GobblinTaskRunnerTest.java | 19 -----
.../gobblin/compaction/mapreduce/MRCompactor.java | 10 +--
.../gobblin/metrics/EventReporterException.java | 41 -----------
.../org/apache/gobblin/metrics/GobblinMetrics.java | 86 ++++++++++------------
.../gobblin/metrics/MetricReporterException.java | 41 -----------
.../apache/gobblin/metrics/ReporterSinkType.java | 26 -------
.../apache/gobblin/metrics/GobblinMetricsTest.java | 59 +--------------
.../metrics/kafka/KafkaReporterFactory.java | 13 ++--
.../gobblin/runtime/mapreduce/MRJobLauncher.java | 12 +--
.../runtime/services/MetricsReportingService.java | 13 +---
.../java/org/apache/gobblin/yarn/YarnService.java | 10 +--
13 files changed, 58 insertions(+), 319 deletions(-)
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
index b545928..128a5d6 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
@@ -197,11 +197,4 @@ public class GobblinClusterConfigurationKeys {
public static final String CONTAINER_ID_KEY = GOBBLIN_HELIX_PREFIX + "containerId";
public static final String GOBBLIN_CLUSTER_SYSTEM_PROPERTY_PREFIX = GOBBLIN_CLUSTER_PREFIX + "sysProps";
-
- //Gobblin TaskRunner configuration keys
- public static final String GOBBLIN_CLUSTER_TASK_RUNNER_METRIC_REPORTING_FAILURE_FATAL = "gobblin.cluster.taskRunner.isMetricReportingFailureFatal";
- public static final boolean DEFAULT_GOBBLIN_CLUSTER_TASK_RUNNER_METRIC_REPORTING_FAILURE_FATAL = false;
-
- public static final String GOBBLIN_CLUSTER_TASK_RUNNER_EVENT_REPORTING_FAILURE_FATAL = "gobblin.cluster.taskRunner.isEventReportingFailureFatal";
- public static final boolean DEFAULT_GOBBLIN_CLUSTER_TASK_RUNNER_EVENT_REPORTING_FAILURE_FATAL = false;
}
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
index 27f3da8..103f0cc 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
@@ -83,9 +83,7 @@ import lombok.Getter;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.instrumented.StandardMetricsBridge;
-import org.apache.gobblin.metrics.EventReporterException;
import org.apache.gobblin.metrics.GobblinMetrics;
-import org.apache.gobblin.metrics.MetricReporterException;
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.FileUtils;
@@ -153,8 +151,6 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
protected final FileSystem fs;
protected final String applicationName;
protected final String applicationId;
- private final boolean isMetricReportingFailureFatal;
- private final boolean isEventReportingFailureFatal;
public GobblinTaskRunner(String applicationName,
String helixInstanceName,
@@ -174,15 +170,6 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
this.appWorkPath = initAppWorkDir(config, appWorkDirOptional);
this.clusterConfig = saveConfigToFile(config);
this.clusterName = this.clusterConfig.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY);
-
- this.isMetricReportingFailureFatal = ConfigUtils.getBoolean(this.clusterConfig,
- GobblinClusterConfigurationKeys.GOBBLIN_CLUSTER_TASK_RUNNER_METRIC_REPORTING_FAILURE_FATAL,
- GobblinClusterConfigurationKeys.DEFAULT_GOBBLIN_CLUSTER_TASK_RUNNER_METRIC_REPORTING_FAILURE_FATAL);
-
- this.isEventReportingFailureFatal = ConfigUtils.getBoolean(this.clusterConfig,
- GobblinClusterConfigurationKeys.GOBBLIN_CLUSTER_TASK_RUNNER_EVENT_REPORTING_FAILURE_FATAL,
- GobblinClusterConfigurationKeys.DEFAULT_GOBBLIN_CLUSTER_TASK_RUNNER_EVENT_REPORTING_FAILURE_FATAL);
-
logger.info("Configured GobblinTaskRunner work dir to: {}", this.appWorkPath.toString());
// Set system properties passed in via application config. As an example, Helix uses System#getProperty() for ZK configuration
@@ -321,7 +308,11 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
addInstanceTags();
// Start metric reporting
- initMetricReporter();
+ if (this.containerMetrics.isPresent()) {
+ this.containerMetrics.get()
+ .startMetricReportingWithFileSuffix(ConfigUtils.configToState(this.clusterConfig),
+ this.taskRunnerId);
+ }
if (this.serviceManager != null) {
this.serviceManager.startAsync();
@@ -332,27 +323,6 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
}
}
- private void initMetricReporter() {
- if (this.containerMetrics.isPresent()) {
- try {
- this.containerMetrics.get()
- .startMetricReportingWithFileSuffix(ConfigUtils.configToState(this.clusterConfig), this.taskRunnerId);
- } catch (MetricReporterException e) {
- if (this.isMetricReportingFailureFatal) {
- Throwables.propagate(e);
- } else {
- logger.error("Failed to start {} event reporter", e.getType().name(), e);
- }
- } catch (EventReporterException e) {
- if (this.isEventReportingFailureFatal) {
- Throwables.propagate(e);
- } else {
- logger.error("Failed to start {} event reporter", e.getType().name(), e);
- }
- }
- }
- }
-
public synchronized void stop() {
if (this.isStopped) {
logger.info("Gobblin Task runner is already stopped.");
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinTaskRunnerTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinTaskRunnerTest.java
index ee76ede..98d13df 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinTaskRunnerTest.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinTaskRunnerTest.java
@@ -43,7 +43,6 @@ import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;
import org.apache.gobblin.cluster.suite.IntegrationBasicSuite;
-import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.testing.AssertWithBackoff;
@@ -72,7 +71,6 @@ public class GobblinTaskRunnerTest {
private GobblinClusterManager gobblinClusterManager;
private GobblinTaskRunner corruptGobblinTaskRunner;
- private GobblinTaskRunner gobblinTaskRunnerFailedReporter;
private String clusterName;
private String corruptHelixInstance;
private TaskAssignmentAfterConnectionRetry suite;
@@ -105,18 +103,6 @@ public class GobblinTaskRunnerTest {
TestHelper.TEST_APPLICATION_ID, TestHelper.TEST_TASK_RUNNER_ID, config, Optional.<Path>absent());
this.gobblinTaskRunner.connectHelixManager();
- // Participant that fails to start due to metric reporter failures
- String instanceName = HelixUtils.getHelixInstanceName("MetricReporterFailureInstance", 0);
-
- Config metricConfig = config.withValue(ConfigurationKeys.METRICS_ENABLED_KEY, ConfigValueFactory.fromAnyRef(true))
- .withValue(ConfigurationKeys.METRICS_REPORTING_KAFKA_ENABLED_KEY, ConfigValueFactory.fromAnyRef(true))
- .withValue(ConfigurationKeys.METRICS_KAFKA_TOPIC_METRICS, ConfigValueFactory.fromAnyRef("metricTopic"))
- .withValue(GobblinClusterConfigurationKeys.GOBBLIN_CLUSTER_TASK_RUNNER_METRIC_REPORTING_FAILURE_FATAL, ConfigValueFactory.fromAnyRef(true));
-
- this.gobblinTaskRunnerFailedReporter =
- new GobblinTaskRunner(TestHelper.TEST_APPLICATION_NAME, instanceName,
- TestHelper.TEST_APPLICATION_ID, TestHelper.TEST_TASK_RUNNER_ID, metricConfig, Optional.<Path>absent());
-
// Participant with a partial Instance set up on Helix/ZK
this.corruptHelixInstance = HelixUtils.getHelixInstanceName("CorruptHelixInstance", 0);
this.corruptGobblinTaskRunner =
@@ -156,11 +142,6 @@ public class GobblinTaskRunnerTest {
}, "gobblinTaskRunner stopped");
}
- @Test (expectedExceptions = RuntimeException.class, expectedExceptionsMessageRegExp = ".*Could not create metric reporter.*")
- public void testStartUpFailsDueToMetricReporterFailure() {
- GobblinTaskRunnerTest.this.gobblinTaskRunnerFailedReporter.start();
- }
-
@Test
public void testBuildFileSystemConfig() {
FileSystem fileSystem = this.gobblinTaskRunner.getFs();
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactor.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactor.java
index 78feb64..5a937ed 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactor.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactor.java
@@ -68,9 +68,7 @@ import org.apache.gobblin.compaction.verify.DataCompletenessVerifier;
import org.apache.gobblin.compaction.verify.DataCompletenessVerifier.Results;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
-import org.apache.gobblin.metrics.EventReporterException;
import org.apache.gobblin.metrics.GobblinMetrics;
-import org.apache.gobblin.metrics.MetricReporterException;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.util.ClassAliasResolver;
@@ -366,13 +364,7 @@ public class MRCompactor implements Compactor {
tags.addAll(Tag.fromMap(ClusterNameTags.getClusterNameTags()));
GobblinMetrics gobblinMetrics =
GobblinMetrics.get(this.state.getProp(ConfigurationKeys.JOB_NAME_KEY), null, tags.build());
- try {
- gobblinMetrics.startMetricReporting(this.state.getProperties());
- } catch (MetricReporterException e) {
- LOG.error("Failed to start {} metric reporter.", e.getType().name(), e);
- } catch (EventReporterException e) {
- LOG.error("Failed to start {} event reporter.", e.getType().name(), e);
- }
+ gobblinMetrics.startMetricReporting(this.state.getProperties());
return gobblinMetrics;
}
diff --git a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/EventReporterException.java b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/EventReporterException.java
deleted file mode 100644
index 7804000..0000000
--- a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/EventReporterException.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gobblin.metrics;
-
-import java.io.IOException;
-
-import lombok.Getter;
-
-public class EventReporterException extends IOException {
- @Getter
- private final ReporterSinkType type;
-
- public EventReporterException(Throwable t, ReporterSinkType type) {
- super(t);
- this.type = type;
- }
-
- public EventReporterException(String message, ReporterSinkType type) {
- super(message);
- this.type = type;
- }
-
- public EventReporterException(String message, Throwable t, ReporterSinkType type) {
- super(message, t);
- this.type = type;
- }
-}
\ No newline at end of file
diff --git a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java
index 4af9fde..e969083 100644
--- a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java
+++ b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java
@@ -27,6 +27,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.gobblin.metrics.reporter.FileFailureEventReporter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -47,8 +48,6 @@ import com.google.common.collect.Lists;
import com.google.common.io.Closer;
import com.typesafe.config.Config;
-import lombok.Getter;
-
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.metrics.graphite.GraphiteConnectionType;
@@ -57,13 +56,14 @@ import org.apache.gobblin.metrics.graphite.GraphiteReporter;
import org.apache.gobblin.metrics.influxdb.InfluxDBConnectionType;
import org.apache.gobblin.metrics.influxdb.InfluxDBEventReporter;
import org.apache.gobblin.metrics.influxdb.InfluxDBReporter;
-import org.apache.gobblin.metrics.reporter.FileFailureEventReporter;
import org.apache.gobblin.metrics.reporter.OutputStreamEventReporter;
import org.apache.gobblin.metrics.reporter.OutputStreamReporter;
import org.apache.gobblin.metrics.reporter.ScheduledReporter;
import org.apache.gobblin.password.PasswordManager;
import org.apache.gobblin.util.PropertiesUtils;
+import lombok.Getter;
+
/**
* A class that represents a set of metrics associated with a given name.
@@ -354,8 +354,7 @@ public class GobblinMetrics {
* Starts metric reporting and appends the given metrics file suffix to the current value of
* {@link ConfigurationKeys#METRICS_FILE_SUFFIX}.
*/
- public void startMetricReportingWithFileSuffix(State state, String metricsFileSuffix)
- throws MetricReporterException, EventReporterException {
+ public void startMetricReportingWithFileSuffix(State state, String metricsFileSuffix) {
Properties metricsReportingProps = new Properties();
metricsReportingProps.putAll(state.getProperties());
@@ -375,8 +374,7 @@ public class GobblinMetrics {
*
* @param configuration configuration properties
*/
- public void startMetricReporting(Configuration configuration)
- throws MetricReporterException, EventReporterException {
+ public void startMetricReporting(Configuration configuration) {
Properties props = new Properties();
for (Map.Entry<String, String> entry : configuration) {
props.put(entry.getKey(), entry.getValue());
@@ -389,8 +387,7 @@ public class GobblinMetrics {
*
* @param properties configuration properties
*/
- public void startMetricReporting(Properties properties)
- throws MetricReporterException, EventReporterException {
+ public void startMetricReporting(Properties properties) {
if (this.metricsReportingStarted) {
LOGGER.warn("Metric reporting has already started");
return;
@@ -476,8 +473,7 @@ public class GobblinMetrics {
LOGGER.info("Metrics reporting stopped successfully");
}
- private void buildFileMetricReporter(Properties properties)
- throws MetricReporterException {
+ private void buildFileMetricReporter(Properties properties) {
if (!Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_FILE_ENABLED_KEY,
ConfigurationKeys.DEFAULT_METRICS_REPORTING_FILE_ENABLED))) {
return;
@@ -485,8 +481,9 @@ public class GobblinMetrics {
LOGGER.info("Reporting metrics to log files");
if (!properties.containsKey(ConfigurationKeys.METRICS_LOG_DIR_KEY)) {
- throw new MetricReporterException(
- "Not reporting metrics to log files because " + ConfigurationKeys.METRICS_LOG_DIR_KEY + " is undefined", ReporterSinkType.FILE);
+ LOGGER.error(
+ "Not reporting metrics to log files because " + ConfigurationKeys.METRICS_LOG_DIR_KEY + " is undefined");
+ return;
}
try {
@@ -496,7 +493,8 @@ public class GobblinMetrics {
// Each job gets its own metric log subdirectory
Path metricsLogDir = new Path(properties.getProperty(ConfigurationKeys.METRICS_LOG_DIR_KEY), this.getName());
if (!fs.exists(metricsLogDir) && !fs.mkdirs(metricsLogDir)) {
- throw new MetricReporterException("Failed to create metric log directory for metrics " + this.getName(), ReporterSinkType.FILE);
+ LOGGER.error("Failed to create metric log directory for metrics " + this.getName());
+ return;
}
// Add a suffix to file name if specified in properties.
@@ -525,12 +523,11 @@ public class GobblinMetrics {
LOGGER.info("Will start reporting metrics to directory " + metricsLogDir);
} catch (IOException ioe) {
- throw new MetricReporterException("Failed to build file metric reporter for job " + this.id, ioe, ReporterSinkType.FILE);
+ LOGGER.error("Failed to build file metric reporter for job " + this.id, ioe);
}
}
- private void buildFileFailureEventReporter(Properties properties)
- throws EventReporterException {
+ private void buildFileFailureEventReporter(Properties properties) {
if (!Boolean.valueOf(properties.getProperty(ConfigurationKeys.FAILURE_REPORTING_FILE_ENABLED_KEY,
ConfigurationKeys.DEFAULT_FAILURE_REPORTING_FILE_ENABLED))) {
return;
@@ -538,8 +535,9 @@ public class GobblinMetrics {
LOGGER.info("Reporting failure to log files");
if (!properties.containsKey(ConfigurationKeys.FAILURE_LOG_DIR_KEY)) {
- throw new EventReporterException(
- "Not reporting failure to log files because " + ConfigurationKeys.FAILURE_LOG_DIR_KEY + " is undefined", ReporterSinkType.FILE);
+ LOGGER.error(
+ "Not reporting failure to log files because " + ConfigurationKeys.FAILURE_LOG_DIR_KEY + " is undefined");
+ return;
}
try {
@@ -549,7 +547,8 @@ public class GobblinMetrics {
// Each job gets its own log subdirectory
Path failureLogDir = new Path(properties.getProperty(ConfigurationKeys.FAILURE_LOG_DIR_KEY), this.getName());
if (!fs.exists(failureLogDir) && !fs.mkdirs(failureLogDir)) {
- throw new EventReporterException("Failed to create failure log directory for metrics " + this.getName(), ReporterSinkType.FILE);
+ LOGGER.error("Failed to create failure log directory for metrics " + this.getName());
+ return;
}
// Add a suffix to file name if specified in properties.
@@ -567,7 +566,7 @@ public class GobblinMetrics {
LOGGER.info("Will start reporting failure to directory " + failureLogDir);
} catch (IOException ioe) {
- throw new EventReporterException("Failed to build file failure event reporter for job " + this.id, ioe, ReporterSinkType.FILE);
+ LOGGER.error("Failed to build file failure event reporter for job " + this.id, ioe);
}
}
@@ -582,8 +581,7 @@ public class GobblinMetrics {
convertRatesTo(TimeUnit.SECONDS).convertDurationsTo(TimeUnit.MILLISECONDS).build()));
}
- private void buildKafkaMetricReporter(Properties properties)
- throws MetricReporterException, EventReporterException {
+ private void buildKafkaMetricReporter(Properties properties) {
if (!Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_ENABLED_KEY,
ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_ENABLED))) {
return;
@@ -591,8 +589,7 @@ public class GobblinMetrics {
buildScheduledReporter(properties, ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_REPORTER_CLASS, Optional.of("Kafka"));
}
- private void buildGraphiteMetricReporter(Properties properties)
- throws MetricReporterException, EventReporterException {
+ private void buildGraphiteMetricReporter(Properties properties) {
boolean metricsEnabled = PropertiesUtils
.getPropAsBoolean(properties, ConfigurationKeys.METRICS_REPORTING_GRAPHITE_METRICS_ENABLED_KEY,
ConfigurationKeys.DEFAULT_METRICS_REPORTING_GRAPHITE_METRICS_ENABLED);
@@ -615,7 +612,8 @@ public class GobblinMetrics {
Preconditions.checkArgument(properties.containsKey(ConfigurationKeys.METRICS_REPORTING_GRAPHITE_HOSTNAME),
"Graphite hostname is missing.");
} catch (IllegalArgumentException exception) {
- throw new MetricReporterException("Missing Graphite configuration(s).", exception, ReporterSinkType.GRAPHITE);
+ LOGGER.error("Not reporting to Graphite due to missing Graphite configuration(s).", exception);
+ return;
}
String hostname = properties.getProperty(ConfigurationKeys.METRICS_REPORTING_GRAPHITE_HOSTNAME);
@@ -643,7 +641,7 @@ public class GobblinMetrics {
.withMetricsPrefix(prefix)
.build(properties);
} catch (IOException e) {
- throw new MetricReporterException("Failed to create Graphite metrics reporter.", e, ReporterSinkType.GRAPHITE);
+ LOGGER.error("Failed to create Graphite metrics reporter. Will not report metrics to Graphite.", e);
}
}
@@ -665,13 +663,12 @@ public class GobblinMetrics {
this.codahaleScheduledReporters.add(this.codahaleReportersCloser.register(eventReporter));
}
catch (IOException e) {
- throw new EventReporterException("Failed to create Graphite event reporter.", e, ReporterSinkType.GRAPHITE);
+ LOGGER.error("Failed to create Graphite event reporter. Will not report events to Graphite.", e);
}
}
}
- private void buildInfluxDBMetricReporter(Properties properties)
- throws MetricReporterException, EventReporterException {
+ private void buildInfluxDBMetricReporter(Properties properties) {
boolean metricsEnabled = PropertiesUtils
.getPropAsBoolean(properties, ConfigurationKeys.METRICS_REPORTING_INFLUXDB_METRICS_ENABLED_KEY,
ConfigurationKeys.DEFAULT_METRICS_REPORTING_INFLUXDB_METRICS_ENABLED);
@@ -694,7 +691,8 @@ public class GobblinMetrics {
Preconditions.checkArgument(properties.containsKey(ConfigurationKeys.METRICS_REPORTING_INFLUXDB_DATABASE),
"InfluxDB database name is missing.");
} catch (IllegalArgumentException exception) {
- throw new MetricReporterException("Missing InfluxDB configuration(s)", exception, ReporterSinkType.INFLUXDB);
+ LOGGER.error("Not reporting to InfluxDB due to missing InfluxDB configuration(s).", exception);
+ return;
}
String url = properties.getProperty(ConfigurationKeys.METRICS_REPORTING_INFLUXDB_URL);
@@ -721,7 +719,7 @@ public class GobblinMetrics {
this.metricContext.getName()) // contains the current job id
.build(properties);
} catch (IOException e) {
- throw new MetricReporterException("Failed to create InfluxDB metrics reporter.", e, ReporterSinkType.INFLUXDB);
+ LOGGER.error("Failed to create InfluxDB metrics reporter. Will not report metrics to InfluxDB.", e);
}
}
@@ -737,7 +735,7 @@ public class GobblinMetrics {
this.codahaleScheduledReporters.add(this.codahaleReportersCloser.register(eventReporter));
}
catch (IOException e) {
- throw new EventReporterException("Failed to create InfluxDB event reporter.", e, ReporterSinkType.INFLUXDB);
+ LOGGER.error("Failed to create InfluxDB event reporter. Will not report events to InfluxDB.", e);
}
}
}
@@ -747,8 +745,7 @@ public class GobblinMetrics {
* {@link org.apache.gobblin.configuration.ConfigurationKeys#METRICS_CUSTOM_BUILDERS}. This allows users to specify custom
* reporters for Gobblin runtime without having to modify the code.
*/
- private void buildCustomMetricReporters(Properties properties)
- throws MetricReporterException, EventReporterException {
+ private void buildCustomMetricReporters(Properties properties) {
String reporterClasses = properties.getProperty(ConfigurationKeys.METRICS_CUSTOM_BUILDERS);
if (Strings.isNullOrEmpty(reporterClasses)) {
@@ -760,8 +757,7 @@ public class GobblinMetrics {
}
}
- private void buildScheduledReporter(Properties properties, String reporterClass, Optional<String> reporterSink)
- throws MetricReporterException, EventReporterException {
+ private void buildScheduledReporter(Properties properties, String reporterClass, Optional<String> reporterSink) {
try {
Class<?> clazz = Class.forName(reporterClass);
@@ -783,21 +779,19 @@ public class GobblinMetrics {
customReporterFactory.newScheduledReporter(properties);
LOGGER.info("Will start reporting metrics using " + reporterClass);
} else {
- throw new MetricReporterException("Class " + reporterClass +
+ throw new IllegalArgumentException("Class " + reporterClass +
" specified by key " + ConfigurationKeys.METRICS_CUSTOM_BUILDERS + " must implement: "
- + CustomCodahaleReporterFactory.class + " or " + CustomReporterFactory.class, ReporterSinkType.CUSTOM);
+ + CustomCodahaleReporterFactory.class + " or " + CustomReporterFactory.class);
}
} catch (ClassNotFoundException exception) {
- throw new MetricReporterException(String
+ LOGGER.warn(String
.format("Failed to create metric reporter: requested CustomReporterFactory %s not found.", reporterClass),
- exception, ReporterSinkType.CUSTOM);
+ exception);
} catch (NoSuchMethodException exception) {
- throw new MetricReporterException(String.format("Failed to create metric reporter: requested CustomReporterFactory %s "
- + "does not have parameterless constructor.", reporterClass), exception, ReporterSinkType.CUSTOM);
- } catch (EventReporterException exception) {
- throw exception;
+ LOGGER.warn(String.format("Failed to create metric reporter: requested CustomReporterFactory %s "
+ + "does not have parameterless constructor.", reporterClass), exception);
} catch (Exception exception) {
- throw new MetricReporterException("Could not create metric reporter from builder " + reporterClass + ".", exception, ReporterSinkType.CUSTOM);
+ LOGGER.warn("Could not create metric reporter from builder " + reporterClass + ".", exception);
}
}
}
diff --git a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/MetricReporterException.java b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/MetricReporterException.java
deleted file mode 100644
index 8631df3..0000000
--- a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/MetricReporterException.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gobblin.metrics;
-
-import java.io.IOException;
-
-import lombok.Getter;
-
-public class MetricReporterException extends IOException {
- @Getter
- private final ReporterSinkType type;
-
- public MetricReporterException(Throwable t, ReporterSinkType type) {
- super(t);
- this.type = type;
- }
-
- public MetricReporterException(String message, ReporterSinkType type) {
- super(message);
- this.type = type;
- }
-
- public MetricReporterException(String message, Throwable t, ReporterSinkType type) {
- super(message, t);
- this.type = type;
- }
-}
diff --git a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ReporterSinkType.java b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ReporterSinkType.java
deleted file mode 100644
index ab5398e..0000000
--- a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ReporterSinkType.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gobblin.metrics;
-
-public enum ReporterSinkType {
- GRAPHITE,
- FILE,
- KAFKA,
- INFLUXDB,
- CONSOLE,
- CUSTOM
-}
diff --git a/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/GobblinMetricsTest.java b/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/GobblinMetricsTest.java
index 40f9438..4a357c8 100644
--- a/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/GobblinMetricsTest.java
+++ b/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/GobblinMetricsTest.java
@@ -22,74 +22,21 @@ import java.util.Properties;
import org.testng.Assert;
import org.testng.annotations.Test;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigValueFactory;
-
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.util.ConfigUtils;
-
@Test
public class GobblinMetricsTest {
+
/**
* Test the {@link GobblinMetrics} instance is removed from {@link GobblinMetricsRegistry} when
* it stops metrics reporting
*/
- public void testStopReportingMetrics()
- throws MetricReporterException, EventReporterException {
+ public void testStopReportingMetrics() {
String id = getClass().getSimpleName() + "-" + System.currentTimeMillis();
GobblinMetrics gobblinMetrics = GobblinMetrics.get(id);
-
- Properties properties = new Properties();
- properties.put(ConfigurationKeys.FAILURE_REPORTING_FILE_ENABLED_KEY, "false");
-
- gobblinMetrics.startMetricReporting(properties);
+ gobblinMetrics.startMetricReporting(new Properties());
Assert.assertEquals(GobblinMetricsRegistry.getInstance().get(id).get(), gobblinMetrics);
gobblinMetrics.stopMetricsReporting();
Assert.assertFalse(GobblinMetricsRegistry.getInstance().get(id).isPresent());
}
-
- public void testMetricFileReporterThrowsException() {
- String id = getClass().getSimpleName() + "-" + System.currentTimeMillis();
- GobblinMetrics gobblinMetrics = GobblinMetrics.get(id);
-
- //Enable file reporter without specifying metrics.log.dir.
- Config config = ConfigFactory.empty()
- .withValue(ConfigurationKeys.METRICS_REPORTING_FILE_ENABLED_KEY, ConfigValueFactory.fromAnyRef(true));
-
- Properties properties = ConfigUtils.configToProperties(config);
- //Ensure MetricReporterException is thrown
- try {
- gobblinMetrics.startMetricReporting(properties);
- Assert.fail("Metric reporting unexpectedly succeeded.");
- } catch (MetricReporterException e) {
- //Do nothing. Expected to be here.
- } catch (EventReporterException e) {
- Assert.fail("Unexpected exception " + e.getMessage());
- }
- }
-
- public void testMetricFileReporterSuccessful() {
- String id = getClass().getSimpleName() + "-" + System.currentTimeMillis();
- GobblinMetrics gobblinMetrics = GobblinMetrics.get(id);
-
- //Enable file reporter without specifying metrics.log.dir.
- Config config = ConfigFactory.empty()
- .withValue(ConfigurationKeys.METRICS_REPORTING_FILE_ENABLED_KEY, ConfigValueFactory.fromAnyRef(true))
- .withValue(ConfigurationKeys.METRICS_LOG_DIR_KEY, ConfigValueFactory.fromAnyRef("/tmp"))
- .withValue(ConfigurationKeys.FAILURE_LOG_DIR_KEY, ConfigValueFactory.fromAnyRef("/tmp"));
-
- Properties properties = ConfigUtils.configToProperties(config);
- //Ensure MetricReporterException is thrown
- try {
- gobblinMetrics.startMetricReporting(properties);
- } catch (MetricReporterException e) {
- Assert.fail("Unexpected exception " + e.getMessage());
- } catch (EventReporterException e) {
- Assert.fail("Unexpected exception " + e.getMessage());
- }
- }
-
}
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java
index 4c64c31..5c12360 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java
@@ -29,10 +29,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metrics.CustomCodahaleReporterFactory;
-import org.apache.gobblin.metrics.EventReporterException;
import org.apache.gobblin.metrics.KafkaReportingFormats;
-import org.apache.gobblin.metrics.MetricReporterException;
-import org.apache.gobblin.metrics.ReporterSinkType;
import org.apache.gobblin.metrics.RootMetricContext;
import org.apache.gobblin.metrics.reporter.util.KafkaReporterUtils;
@@ -40,8 +37,7 @@ import org.apache.gobblin.metrics.reporter.util.KafkaReporterUtils;
@Slf4j
public class KafkaReporterFactory implements CustomCodahaleReporterFactory {
@Override
- public ScheduledReporter newScheduledReporter(MetricRegistry registry, Properties properties)
- throws IOException {
+ public ScheduledReporter newScheduledReporter(MetricRegistry registry, Properties properties) {
if (!Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_ENABLED_KEY,
ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_ENABLED))) {
return null;
@@ -67,7 +63,8 @@ public class KafkaReporterFactory implements CustomCodahaleReporterFactory {
"Kafka metrics brokers missing.");
Preconditions.checkArgument(KafkaReporterUtils.getMetricsTopic(properties).or(eventsTopic).or(defaultTopic).isPresent(), "Kafka topic missing.");
} catch (IllegalArgumentException exception) {
- throw new MetricReporterException("Missing Kafka configuration(s).", exception, ReporterSinkType.KAFKA);
+ log.error("Not reporting metrics to Kafka due to missing Kafka configuration(s).", exception);
+ return null;
}
String brokers = properties.getProperty(ConfigurationKeys.METRICS_KAFKA_BROKERS);
@@ -90,7 +87,7 @@ public class KafkaReporterFactory implements CustomCodahaleReporterFactory {
try {
formatEnum.buildMetricsReporter(brokers, metricsTopic.or(defaultTopic).get(), properties);
} catch (IOException exception) {
- throw new MetricReporterException("Failed to create Kafka metrics reporter.", exception, ReporterSinkType.KAFKA);
+ log.error("Failed to create Kafka metrics reporter. Will not report metrics to Kafka.", exception);
}
}
@@ -118,7 +115,7 @@ public class KafkaReporterFactory implements CustomCodahaleReporterFactory {
return reporter;
} catch (IOException exception) {
- throw new EventReporterException("Failed to create Kafka events reporter.", exception, ReporterSinkType.KAFKA);
+ log.error("Failed to create Kafka events reporter. Will not report events to Kafka.", exception);
}
}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
index b509f09..c649595 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
@@ -73,9 +73,7 @@ import org.apache.gobblin.configuration.State;
import org.apache.gobblin.fsm.FiniteStateMachine;
import org.apache.gobblin.metastore.FsStateStore;
import org.apache.gobblin.metastore.StateStore;
-import org.apache.gobblin.metrics.EventReporterException;
import org.apache.gobblin.metrics.GobblinMetrics;
-import org.apache.gobblin.metrics.MetricReporterException;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.metrics.event.CountEventBuilder;
import org.apache.gobblin.metrics.event.JobEvent;
@@ -785,14 +783,8 @@ public class MRJobLauncher extends AbstractJobLauncher {
if (Boolean.valueOf(
configuration.get(ConfigurationKeys.METRICS_ENABLED_KEY, ConfigurationKeys.DEFAULT_METRICS_ENABLED))) {
this.jobMetrics = Optional.of(JobMetrics.get(this.jobState));
- try {
- this.jobMetrics.get()
- .startMetricReportingWithFileSuffix(gobblinJobState, context.getTaskAttemptID().toString());
- } catch (MetricReporterException e) {
- LOG.error("Failed to start {} metric reporter.", e.getType().name(), e);
- } catch (EventReporterException e) {
- LOG.error("Failed to start {} event reporter.", e.getType().name(), e);
- }
+ this.jobMetrics.get()
+ .startMetricReportingWithFileSuffix(gobblinJobState, context.getTaskAttemptID().toString());
}
}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/services/MetricsReportingService.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/services/MetricsReportingService.java
index 2456b66..43b53cd 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/services/MetricsReportingService.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/services/MetricsReportingService.java
@@ -21,17 +21,12 @@ import java.util.Properties;
import com.google.common.util.concurrent.AbstractIdleService;
-import lombok.extern.slf4j.Slf4j;
-
-import org.apache.gobblin.metrics.EventReporterException;
import org.apache.gobblin.metrics.GobblinMetrics;
-import org.apache.gobblin.metrics.MetricReporterException;
/**
* A {@link com.google.common.util.concurrent.Service} for handling life cycle events around {@link GobblinMetrics}.
*/
-@Slf4j
public class MetricsReportingService extends AbstractIdleService {
private final Properties properties;
@@ -44,13 +39,7 @@ public class MetricsReportingService extends AbstractIdleService {
@Override
protected void startUp() throws Exception {
- try {
- GobblinMetrics.get(this.appId).startMetricReporting(this.properties);
- } catch (MetricReporterException e) {
- log.error("Failed to start {} metric reporter", e.getType().name(), e);
- } catch (EventReporterException e) {
- log.error("Failed to start {} event reporter", e.getType().name(), e);
- }
+ GobblinMetrics.get(this.appId).startMetricReporting(this.properties);
}
@Override
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
index 5e1946e..3f127eb 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
@@ -99,9 +99,7 @@ import org.apache.gobblin.cluster.GobblinClusterUtils;
import org.apache.gobblin.cluster.HelixUtils;
import org.apache.gobblin.cluster.event.ClusterManagerShutdownRequest;
import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.metrics.EventReporterException;
import org.apache.gobblin.metrics.GobblinMetrics;
-import org.apache.gobblin.metrics.MetricReporterException;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.util.ConfigUtils;
@@ -404,13 +402,7 @@ public class YarnService extends AbstractIdleService {
// Intialize Gobblin metrics and start reporters
GobblinMetrics gobblinMetrics = GobblinMetrics.get(this.applicationId, null, tags.build());
- try {
- gobblinMetrics.startMetricReporting(ConfigUtils.configToProperties(config));
- } catch (MetricReporterException e) {
- LOGGER.error("Failed to start {} metric reporter.", e.getType().name(), e);
- } catch (EventReporterException e) {
- LOGGER.error("Failed to start {} event reporter.", e.getType().name(), e);
- }
+ gobblinMetrics.startMetricReporting(ConfigUtils.configToProperties(config));
return gobblinMetrics;
}