You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/05/11 17:06:43 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-1127] Provide an option to make metric reporting instantiatio…
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 2391157 [GOBBLIN-1127] Provide an option to make metric reporting instantiatio…
2391157 is described below
commit 239115778a08590d7ab5dfa32334efabe0e4fb49
Author: sv2000 <su...@gmail.com>
AuthorDate: Mon May 11 10:04:28 2020 -0700
[GOBBLIN-1127] Provide an option to make metric reporting instantiatio…
Closes #2967 from sv2000/metricReportFatal
---
.../cluster/GobblinClusterConfigurationKeys.java | 7 ++
.../apache/gobblin/cluster/GobblinTaskRunner.java | 40 ++++++++--
.../gobblin/cluster/GobblinTaskRunnerTest.java | 19 +++++
.../gobblin/compaction/mapreduce/MRCompactor.java | 10 ++-
.../gobblin/metrics/EventReporterException.java} | 35 +++++----
.../org/apache/gobblin/metrics/GobblinMetrics.java | 86 ++++++++++++----------
.../gobblin/metrics/MetricReporterException.java} | 33 ++++-----
.../apache/gobblin/metrics/ReporterSinkType.java} | 30 ++------
.../apache/gobblin/metrics/GobblinMetricsTest.java | 59 ++++++++++++++-
.../metrics/kafka/KafkaReporterFactory.java | 13 ++--
.../gobblin/runtime/mapreduce/MRJobLauncher.java | 12 ++-
.../runtime/services/MetricsReportingService.java | 13 +++-
.../java/org/apache/gobblin/yarn/YarnService.java | 10 ++-
13 files changed, 251 insertions(+), 116 deletions(-)
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
index 128a5d6..b545928 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
@@ -197,4 +197,11 @@ public class GobblinClusterConfigurationKeys {
public static final String CONTAINER_ID_KEY = GOBBLIN_HELIX_PREFIX + "containerId";
public static final String GOBBLIN_CLUSTER_SYSTEM_PROPERTY_PREFIX = GOBBLIN_CLUSTER_PREFIX + "sysProps";
+
+ //Gobblin TaskRunner configuration keys
+ public static final String GOBBLIN_CLUSTER_TASK_RUNNER_METRIC_REPORTING_FAILURE_FATAL = "gobblin.cluster.taskRunner.isMetricReportingFailureFatal";
+ public static final boolean DEFAULT_GOBBLIN_CLUSTER_TASK_RUNNER_METRIC_REPORTING_FAILURE_FATAL = false;
+
+ public static final String GOBBLIN_CLUSTER_TASK_RUNNER_EVENT_REPORTING_FAILURE_FATAL = "gobblin.cluster.taskRunner.isEventReportingFailureFatal";
+ public static final boolean DEFAULT_GOBBLIN_CLUSTER_TASK_RUNNER_EVENT_REPORTING_FAILURE_FATAL = false;
}
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
index 8e6cdd1..6a8fed4 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
@@ -83,7 +83,9 @@ import lombok.Getter;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.instrumented.StandardMetricsBridge;
+import org.apache.gobblin.metrics.EventReporterException;
import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.MetricReporterException;
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.FileUtils;
@@ -151,6 +153,8 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
protected final FileSystem fs;
protected final String applicationName;
protected final String applicationId;
+ private final boolean isMetricReportingFailureFatal;
+ private final boolean isEventReportingFailureFatal;
public GobblinTaskRunner(String applicationName,
String helixInstanceName,
@@ -170,6 +174,15 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
this.appWorkPath = initAppWorkDir(config, appWorkDirOptional);
this.clusterConfig = saveConfigToFile(config);
this.clusterName = this.clusterConfig.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY);
+
+ this.isMetricReportingFailureFatal = ConfigUtils.getBoolean(this.clusterConfig,
+ GobblinClusterConfigurationKeys.GOBBLIN_CLUSTER_TASK_RUNNER_METRIC_REPORTING_FAILURE_FATAL,
+ GobblinClusterConfigurationKeys.DEFAULT_GOBBLIN_CLUSTER_TASK_RUNNER_METRIC_REPORTING_FAILURE_FATAL);
+
+ this.isEventReportingFailureFatal = ConfigUtils.getBoolean(this.clusterConfig,
+ GobblinClusterConfigurationKeys.GOBBLIN_CLUSTER_TASK_RUNNER_EVENT_REPORTING_FAILURE_FATAL,
+ GobblinClusterConfigurationKeys.DEFAULT_GOBBLIN_CLUSTER_TASK_RUNNER_EVENT_REPORTING_FAILURE_FATAL);
+
logger.info("Configured GobblinTaskRunner work dir to: {}", this.appWorkPath.toString());
// Set system properties passed in via application config. As an example, Helix uses System#getProperty() for ZK configuration
@@ -308,11 +321,7 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
addInstanceTags();
// Start metric reporting
- if (this.containerMetrics.isPresent()) {
- this.containerMetrics.get()
- .startMetricReportingWithFileSuffix(ConfigUtils.configToState(this.clusterConfig),
- this.taskRunnerId);
- }
+ initMetricReporter();
if (this.serviceManager != null) {
this.serviceManager.startAsync();
@@ -323,6 +332,27 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
}
}
+ private void initMetricReporter() {
+ if (this.containerMetrics.isPresent()) {
+ try {
+ this.containerMetrics.get()
+ .startMetricReportingWithFileSuffix(ConfigUtils.configToState(this.clusterConfig), this.taskRunnerId);
+ } catch (MetricReporterException e) {
+ if (this.isMetricReportingFailureFatal) {
+ Throwables.propagate(e);
+ } else {
+ logger.error("Failed to start {} event reporter", e.getType().name(), e);
+ }
+ } catch (EventReporterException e) {
+ if (this.isEventReportingFailureFatal) {
+ Throwables.propagate(e);
+ } else {
+ logger.error("Failed to start {} event reporter", e.getType().name(), e);
+ }
+ }
+ }
+ }
+
public synchronized void stop() {
if (this.isStopped) {
logger.info("Gobblin Task runner is already stopped.");
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinTaskRunnerTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinTaskRunnerTest.java
index 98d13df..ee76ede 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinTaskRunnerTest.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinTaskRunnerTest.java
@@ -43,6 +43,7 @@ import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;
import org.apache.gobblin.cluster.suite.IntegrationBasicSuite;
+import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.testing.AssertWithBackoff;
@@ -71,6 +72,7 @@ public class GobblinTaskRunnerTest {
private GobblinClusterManager gobblinClusterManager;
private GobblinTaskRunner corruptGobblinTaskRunner;
+ private GobblinTaskRunner gobblinTaskRunnerFailedReporter;
private String clusterName;
private String corruptHelixInstance;
private TaskAssignmentAfterConnectionRetry suite;
@@ -103,6 +105,18 @@ public class GobblinTaskRunnerTest {
TestHelper.TEST_APPLICATION_ID, TestHelper.TEST_TASK_RUNNER_ID, config, Optional.<Path>absent());
this.gobblinTaskRunner.connectHelixManager();
+ // Participant that fails to start due to metric reporter failures
+ String instanceName = HelixUtils.getHelixInstanceName("MetricReporterFailureInstance", 0);
+
+ Config metricConfig = config.withValue(ConfigurationKeys.METRICS_ENABLED_KEY, ConfigValueFactory.fromAnyRef(true))
+ .withValue(ConfigurationKeys.METRICS_REPORTING_KAFKA_ENABLED_KEY, ConfigValueFactory.fromAnyRef(true))
+ .withValue(ConfigurationKeys.METRICS_KAFKA_TOPIC_METRICS, ConfigValueFactory.fromAnyRef("metricTopic"))
+ .withValue(GobblinClusterConfigurationKeys.GOBBLIN_CLUSTER_TASK_RUNNER_METRIC_REPORTING_FAILURE_FATAL, ConfigValueFactory.fromAnyRef(true));
+
+ this.gobblinTaskRunnerFailedReporter =
+ new GobblinTaskRunner(TestHelper.TEST_APPLICATION_NAME, instanceName,
+ TestHelper.TEST_APPLICATION_ID, TestHelper.TEST_TASK_RUNNER_ID, metricConfig, Optional.<Path>absent());
+
// Participant with a partial Instance set up on Helix/ZK
this.corruptHelixInstance = HelixUtils.getHelixInstanceName("CorruptHelixInstance", 0);
this.corruptGobblinTaskRunner =
@@ -142,6 +156,11 @@ public class GobblinTaskRunnerTest {
}, "gobblinTaskRunner stopped");
}
+ @Test (expectedExceptions = RuntimeException.class, expectedExceptionsMessageRegExp = ".*Could not create metric reporter.*")
+ public void testStartUpFailsDueToMetricReporterFailure() {
+ GobblinTaskRunnerTest.this.gobblinTaskRunnerFailedReporter.start();
+ }
+
@Test
public void testBuildFileSystemConfig() {
FileSystem fileSystem = this.gobblinTaskRunner.getFs();
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactor.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactor.java
index 5a937ed..78feb64 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactor.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactor.java
@@ -68,7 +68,9 @@ import org.apache.gobblin.compaction.verify.DataCompletenessVerifier;
import org.apache.gobblin.compaction.verify.DataCompletenessVerifier.Results;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metrics.EventReporterException;
import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.MetricReporterException;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.util.ClassAliasResolver;
@@ -364,7 +366,13 @@ public class MRCompactor implements Compactor {
tags.addAll(Tag.fromMap(ClusterNameTags.getClusterNameTags()));
GobblinMetrics gobblinMetrics =
GobblinMetrics.get(this.state.getProp(ConfigurationKeys.JOB_NAME_KEY), null, tags.build());
- gobblinMetrics.startMetricReporting(this.state.getProperties());
+ try {
+ gobblinMetrics.startMetricReporting(this.state.getProperties());
+ } catch (MetricReporterException e) {
+ LOG.error("Failed to start {} metric reporter.", e.getType().name(), e);
+ } catch (EventReporterException e) {
+ LOG.error("Failed to start {} event reporter.", e.getType().name(), e);
+ }
return gobblinMetrics;
}
diff --git a/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/GobblinMetricsTest.java b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/EventReporterException.java
similarity index 53%
copy from gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/GobblinMetricsTest.java
copy to gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/EventReporterException.java
index 4a357c8..7804000 100644
--- a/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/GobblinMetricsTest.java
+++ b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/EventReporterException.java
@@ -14,29 +14,28 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.gobblin.metrics;
-import java.util.Properties;
+import java.io.IOException;
-import org.testng.Assert;
-import org.testng.annotations.Test;
+import lombok.Getter;
+public class EventReporterException extends IOException {
+ @Getter
+ private final ReporterSinkType type;
-@Test
-public class GobblinMetricsTest {
+ public EventReporterException(Throwable t, ReporterSinkType type) {
+ super(t);
+ this.type = type;
+ }
- /**
- * Test the {@link GobblinMetrics} instance is removed from {@link GobblinMetricsRegistry} when
- * it stops metrics reporting
- */
- public void testStopReportingMetrics() {
- String id = getClass().getSimpleName() + "-" + System.currentTimeMillis();
- GobblinMetrics gobblinMetrics = GobblinMetrics.get(id);
- gobblinMetrics.startMetricReporting(new Properties());
- Assert.assertEquals(GobblinMetricsRegistry.getInstance().get(id).get(), gobblinMetrics);
+ public EventReporterException(String message, ReporterSinkType type) {
+ super(message);
+ this.type = type;
+ }
- gobblinMetrics.stopMetricsReporting();
- Assert.assertFalse(GobblinMetricsRegistry.getInstance().get(id).isPresent());
+ public EventReporterException(String message, Throwable t, ReporterSinkType type) {
+ super(message, t);
+ this.type = type;
}
-}
+}
\ No newline at end of file
diff --git a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java
index e969083..4af9fde 100644
--- a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java
+++ b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java
@@ -27,7 +27,6 @@ import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.exception.ExceptionUtils;
-import org.apache.gobblin.metrics.reporter.FileFailureEventReporter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -48,6 +47,8 @@ import com.google.common.collect.Lists;
import com.google.common.io.Closer;
import com.typesafe.config.Config;
+import lombok.Getter;
+
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.metrics.graphite.GraphiteConnectionType;
@@ -56,14 +57,13 @@ import org.apache.gobblin.metrics.graphite.GraphiteReporter;
import org.apache.gobblin.metrics.influxdb.InfluxDBConnectionType;
import org.apache.gobblin.metrics.influxdb.InfluxDBEventReporter;
import org.apache.gobblin.metrics.influxdb.InfluxDBReporter;
+import org.apache.gobblin.metrics.reporter.FileFailureEventReporter;
import org.apache.gobblin.metrics.reporter.OutputStreamEventReporter;
import org.apache.gobblin.metrics.reporter.OutputStreamReporter;
import org.apache.gobblin.metrics.reporter.ScheduledReporter;
import org.apache.gobblin.password.PasswordManager;
import org.apache.gobblin.util.PropertiesUtils;
-import lombok.Getter;
-
/**
* A class that represents a set of metrics associated with a given name.
@@ -354,7 +354,8 @@ public class GobblinMetrics {
* Starts metric reporting and appends the given metrics file suffix to the current value of
* {@link ConfigurationKeys#METRICS_FILE_SUFFIX}.
*/
- public void startMetricReportingWithFileSuffix(State state, String metricsFileSuffix) {
+ public void startMetricReportingWithFileSuffix(State state, String metricsFileSuffix)
+ throws MetricReporterException, EventReporterException {
Properties metricsReportingProps = new Properties();
metricsReportingProps.putAll(state.getProperties());
@@ -374,7 +375,8 @@ public class GobblinMetrics {
*
* @param configuration configuration properties
*/
- public void startMetricReporting(Configuration configuration) {
+ public void startMetricReporting(Configuration configuration)
+ throws MetricReporterException, EventReporterException {
Properties props = new Properties();
for (Map.Entry<String, String> entry : configuration) {
props.put(entry.getKey(), entry.getValue());
@@ -387,7 +389,8 @@ public class GobblinMetrics {
*
* @param properties configuration properties
*/
- public void startMetricReporting(Properties properties) {
+ public void startMetricReporting(Properties properties)
+ throws MetricReporterException, EventReporterException {
if (this.metricsReportingStarted) {
LOGGER.warn("Metric reporting has already started");
return;
@@ -473,7 +476,8 @@ public class GobblinMetrics {
LOGGER.info("Metrics reporting stopped successfully");
}
- private void buildFileMetricReporter(Properties properties) {
+ private void buildFileMetricReporter(Properties properties)
+ throws MetricReporterException {
if (!Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_FILE_ENABLED_KEY,
ConfigurationKeys.DEFAULT_METRICS_REPORTING_FILE_ENABLED))) {
return;
@@ -481,9 +485,8 @@ public class GobblinMetrics {
LOGGER.info("Reporting metrics to log files");
if (!properties.containsKey(ConfigurationKeys.METRICS_LOG_DIR_KEY)) {
- LOGGER.error(
- "Not reporting metrics to log files because " + ConfigurationKeys.METRICS_LOG_DIR_KEY + " is undefined");
- return;
+ throw new MetricReporterException(
+ "Not reporting metrics to log files because " + ConfigurationKeys.METRICS_LOG_DIR_KEY + " is undefined", ReporterSinkType.FILE);
}
try {
@@ -493,8 +496,7 @@ public class GobblinMetrics {
// Each job gets its own metric log subdirectory
Path metricsLogDir = new Path(properties.getProperty(ConfigurationKeys.METRICS_LOG_DIR_KEY), this.getName());
if (!fs.exists(metricsLogDir) && !fs.mkdirs(metricsLogDir)) {
- LOGGER.error("Failed to create metric log directory for metrics " + this.getName());
- return;
+ throw new MetricReporterException("Failed to create metric log directory for metrics " + this.getName(), ReporterSinkType.FILE);
}
// Add a suffix to file name if specified in properties.
@@ -523,11 +525,12 @@ public class GobblinMetrics {
LOGGER.info("Will start reporting metrics to directory " + metricsLogDir);
} catch (IOException ioe) {
- LOGGER.error("Failed to build file metric reporter for job " + this.id, ioe);
+ throw new MetricReporterException("Failed to build file metric reporter for job " + this.id, ioe, ReporterSinkType.FILE);
}
}
- private void buildFileFailureEventReporter(Properties properties) {
+ private void buildFileFailureEventReporter(Properties properties)
+ throws EventReporterException {
if (!Boolean.valueOf(properties.getProperty(ConfigurationKeys.FAILURE_REPORTING_FILE_ENABLED_KEY,
ConfigurationKeys.DEFAULT_FAILURE_REPORTING_FILE_ENABLED))) {
return;
@@ -535,9 +538,8 @@ public class GobblinMetrics {
LOGGER.info("Reporting failure to log files");
if (!properties.containsKey(ConfigurationKeys.FAILURE_LOG_DIR_KEY)) {
- LOGGER.error(
- "Not reporting failure to log files because " + ConfigurationKeys.FAILURE_LOG_DIR_KEY + " is undefined");
- return;
+ throw new EventReporterException(
+ "Not reporting failure to log files because " + ConfigurationKeys.FAILURE_LOG_DIR_KEY + " is undefined", ReporterSinkType.FILE);
}
try {
@@ -547,8 +549,7 @@ public class GobblinMetrics {
// Each job gets its own log subdirectory
Path failureLogDir = new Path(properties.getProperty(ConfigurationKeys.FAILURE_LOG_DIR_KEY), this.getName());
if (!fs.exists(failureLogDir) && !fs.mkdirs(failureLogDir)) {
- LOGGER.error("Failed to create failure log directory for metrics " + this.getName());
- return;
+ throw new EventReporterException("Failed to create failure log directory for metrics " + this.getName(), ReporterSinkType.FILE);
}
// Add a suffix to file name if specified in properties.
@@ -566,7 +567,7 @@ public class GobblinMetrics {
LOGGER.info("Will start reporting failure to directory " + failureLogDir);
} catch (IOException ioe) {
- LOGGER.error("Failed to build file failure event reporter for job " + this.id, ioe);
+ throw new EventReporterException("Failed to build file failure event reporter for job " + this.id, ioe, ReporterSinkType.FILE);
}
}
@@ -581,7 +582,8 @@ public class GobblinMetrics {
convertRatesTo(TimeUnit.SECONDS).convertDurationsTo(TimeUnit.MILLISECONDS).build()));
}
- private void buildKafkaMetricReporter(Properties properties) {
+ private void buildKafkaMetricReporter(Properties properties)
+ throws MetricReporterException, EventReporterException {
if (!Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_ENABLED_KEY,
ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_ENABLED))) {
return;
@@ -589,7 +591,8 @@ public class GobblinMetrics {
buildScheduledReporter(properties, ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_REPORTER_CLASS, Optional.of("Kafka"));
}
- private void buildGraphiteMetricReporter(Properties properties) {
+ private void buildGraphiteMetricReporter(Properties properties)
+ throws MetricReporterException, EventReporterException {
boolean metricsEnabled = PropertiesUtils
.getPropAsBoolean(properties, ConfigurationKeys.METRICS_REPORTING_GRAPHITE_METRICS_ENABLED_KEY,
ConfigurationKeys.DEFAULT_METRICS_REPORTING_GRAPHITE_METRICS_ENABLED);
@@ -612,8 +615,7 @@ public class GobblinMetrics {
Preconditions.checkArgument(properties.containsKey(ConfigurationKeys.METRICS_REPORTING_GRAPHITE_HOSTNAME),
"Graphite hostname is missing.");
} catch (IllegalArgumentException exception) {
- LOGGER.error("Not reporting to Graphite due to missing Graphite configuration(s).", exception);
- return;
+ throw new MetricReporterException("Missing Graphite configuration(s).", exception, ReporterSinkType.GRAPHITE);
}
String hostname = properties.getProperty(ConfigurationKeys.METRICS_REPORTING_GRAPHITE_HOSTNAME);
@@ -641,7 +643,7 @@ public class GobblinMetrics {
.withMetricsPrefix(prefix)
.build(properties);
} catch (IOException e) {
- LOGGER.error("Failed to create Graphite metrics reporter. Will not report metrics to Graphite.", e);
+ throw new MetricReporterException("Failed to create Graphite metrics reporter.", e, ReporterSinkType.GRAPHITE);
}
}
@@ -663,12 +665,13 @@ public class GobblinMetrics {
this.codahaleScheduledReporters.add(this.codahaleReportersCloser.register(eventReporter));
}
catch (IOException e) {
- LOGGER.error("Failed to create Graphite event reporter. Will not report events to Graphite.", e);
+ throw new EventReporterException("Failed to create Graphite event reporter.", e, ReporterSinkType.GRAPHITE);
}
}
}
- private void buildInfluxDBMetricReporter(Properties properties) {
+ private void buildInfluxDBMetricReporter(Properties properties)
+ throws MetricReporterException, EventReporterException {
boolean metricsEnabled = PropertiesUtils
.getPropAsBoolean(properties, ConfigurationKeys.METRICS_REPORTING_INFLUXDB_METRICS_ENABLED_KEY,
ConfigurationKeys.DEFAULT_METRICS_REPORTING_INFLUXDB_METRICS_ENABLED);
@@ -691,8 +694,7 @@ public class GobblinMetrics {
Preconditions.checkArgument(properties.containsKey(ConfigurationKeys.METRICS_REPORTING_INFLUXDB_DATABASE),
"InfluxDB database name is missing.");
} catch (IllegalArgumentException exception) {
- LOGGER.error("Not reporting to InfluxDB due to missing InfluxDB configuration(s).", exception);
- return;
+ throw new MetricReporterException("Missing InfluxDB configuration(s)", exception, ReporterSinkType.INFLUXDB);
}
String url = properties.getProperty(ConfigurationKeys.METRICS_REPORTING_INFLUXDB_URL);
@@ -719,7 +721,7 @@ public class GobblinMetrics {
this.metricContext.getName()) // contains the current job id
.build(properties);
} catch (IOException e) {
- LOGGER.error("Failed to create InfluxDB metrics reporter. Will not report metrics to InfluxDB.", e);
+ throw new MetricReporterException("Failed to create InfluxDB metrics reporter.", e, ReporterSinkType.INFLUXDB);
}
}
@@ -735,7 +737,7 @@ public class GobblinMetrics {
this.codahaleScheduledReporters.add(this.codahaleReportersCloser.register(eventReporter));
}
catch (IOException e) {
- LOGGER.error("Failed to create InfluxDB event reporter. Will not report events to InfluxDB.", e);
+ throw new EventReporterException("Failed to create InfluxDB event reporter.", e, ReporterSinkType.INFLUXDB);
}
}
}
@@ -745,7 +747,8 @@ public class GobblinMetrics {
* {@link org.apache.gobblin.configuration.ConfigurationKeys#METRICS_CUSTOM_BUILDERS}. This allows users to specify custom
* reporters for Gobblin runtime without having to modify the code.
*/
- private void buildCustomMetricReporters(Properties properties) {
+ private void buildCustomMetricReporters(Properties properties)
+ throws MetricReporterException, EventReporterException {
String reporterClasses = properties.getProperty(ConfigurationKeys.METRICS_CUSTOM_BUILDERS);
if (Strings.isNullOrEmpty(reporterClasses)) {
@@ -757,7 +760,8 @@ public class GobblinMetrics {
}
}
- private void buildScheduledReporter(Properties properties, String reporterClass, Optional<String> reporterSink) {
+ private void buildScheduledReporter(Properties properties, String reporterClass, Optional<String> reporterSink)
+ throws MetricReporterException, EventReporterException {
try {
Class<?> clazz = Class.forName(reporterClass);
@@ -779,19 +783,21 @@ public class GobblinMetrics {
customReporterFactory.newScheduledReporter(properties);
LOGGER.info("Will start reporting metrics using " + reporterClass);
} else {
- throw new IllegalArgumentException("Class " + reporterClass +
+ throw new MetricReporterException("Class " + reporterClass +
" specified by key " + ConfigurationKeys.METRICS_CUSTOM_BUILDERS + " must implement: "
- + CustomCodahaleReporterFactory.class + " or " + CustomReporterFactory.class);
+ + CustomCodahaleReporterFactory.class + " or " + CustomReporterFactory.class, ReporterSinkType.CUSTOM);
}
} catch (ClassNotFoundException exception) {
- LOGGER.warn(String
+ throw new MetricReporterException(String
.format("Failed to create metric reporter: requested CustomReporterFactory %s not found.", reporterClass),
- exception);
+ exception, ReporterSinkType.CUSTOM);
} catch (NoSuchMethodException exception) {
- LOGGER.warn(String.format("Failed to create metric reporter: requested CustomReporterFactory %s "
- + "does not have parameterless constructor.", reporterClass), exception);
+ throw new MetricReporterException(String.format("Failed to create metric reporter: requested CustomReporterFactory %s "
+ + "does not have parameterless constructor.", reporterClass), exception, ReporterSinkType.CUSTOM);
+ } catch (EventReporterException exception) {
+ throw exception;
} catch (Exception exception) {
- LOGGER.warn("Could not create metric reporter from builder " + reporterClass + ".", exception);
+ throw new MetricReporterException("Could not create metric reporter from builder " + reporterClass + ".", exception, ReporterSinkType.CUSTOM);
}
}
}
diff --git a/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/GobblinMetricsTest.java b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/MetricReporterException.java
similarity index 53%
copy from gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/GobblinMetricsTest.java
copy to gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/MetricReporterException.java
index 4a357c8..8631df3 100644
--- a/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/GobblinMetricsTest.java
+++ b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/MetricReporterException.java
@@ -14,29 +14,28 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.gobblin.metrics;
-import java.util.Properties;
+import java.io.IOException;
-import org.testng.Assert;
-import org.testng.annotations.Test;
+import lombok.Getter;
+public class MetricReporterException extends IOException {
+ @Getter
+ private final ReporterSinkType type;
-@Test
-public class GobblinMetricsTest {
+ public MetricReporterException(Throwable t, ReporterSinkType type) {
+ super(t);
+ this.type = type;
+ }
- /**
- * Test the {@link GobblinMetrics} instance is removed from {@link GobblinMetricsRegistry} when
- * it stops metrics reporting
- */
- public void testStopReportingMetrics() {
- String id = getClass().getSimpleName() + "-" + System.currentTimeMillis();
- GobblinMetrics gobblinMetrics = GobblinMetrics.get(id);
- gobblinMetrics.startMetricReporting(new Properties());
- Assert.assertEquals(GobblinMetricsRegistry.getInstance().get(id).get(), gobblinMetrics);
+ public MetricReporterException(String message, ReporterSinkType type) {
+ super(message);
+ this.type = type;
+ }
- gobblinMetrics.stopMetricsReporting();
- Assert.assertFalse(GobblinMetricsRegistry.getInstance().get(id).isPresent());
+ public MetricReporterException(String message, Throwable t, ReporterSinkType type) {
+ super(message, t);
+ this.type = type;
}
}
diff --git a/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/GobblinMetricsTest.java b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ReporterSinkType.java
similarity index 53%
copy from gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/GobblinMetricsTest.java
copy to gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ReporterSinkType.java
index 4a357c8..ab5398e 100644
--- a/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/GobblinMetricsTest.java
+++ b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ReporterSinkType.java
@@ -14,29 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.gobblin.metrics;
-import java.util.Properties;
-
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-
-@Test
-public class GobblinMetricsTest {
-
- /**
- * Test the {@link GobblinMetrics} instance is removed from {@link GobblinMetricsRegistry} when
- * it stops metrics reporting
- */
- public void testStopReportingMetrics() {
- String id = getClass().getSimpleName() + "-" + System.currentTimeMillis();
- GobblinMetrics gobblinMetrics = GobblinMetrics.get(id);
- gobblinMetrics.startMetricReporting(new Properties());
- Assert.assertEquals(GobblinMetricsRegistry.getInstance().get(id).get(), gobblinMetrics);
-
- gobblinMetrics.stopMetricsReporting();
- Assert.assertFalse(GobblinMetricsRegistry.getInstance().get(id).isPresent());
- }
+public enum ReporterSinkType {
+ GRAPHITE,
+ FILE,
+ KAFKA,
+ INFLUXDB,
+ CONSOLE,
+ CUSTOM
}
diff --git a/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/GobblinMetricsTest.java b/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/GobblinMetricsTest.java
index 4a357c8..40f9438 100644
--- a/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/GobblinMetricsTest.java
+++ b/gobblin-metrics-libs/gobblin-metrics/src/test/java/org/apache/gobblin/metrics/GobblinMetricsTest.java
@@ -22,21 +22,74 @@ import java.util.Properties;
import org.testng.Assert;
import org.testng.annotations.Test;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.util.ConfigUtils;
+
@Test
public class GobblinMetricsTest {
-
/**
* Test the {@link GobblinMetrics} instance is removed from {@link GobblinMetricsRegistry} when
* it stops metrics reporting
*/
- public void testStopReportingMetrics() {
+ public void testStopReportingMetrics()
+ throws MetricReporterException, EventReporterException {
String id = getClass().getSimpleName() + "-" + System.currentTimeMillis();
GobblinMetrics gobblinMetrics = GobblinMetrics.get(id);
- gobblinMetrics.startMetricReporting(new Properties());
+
+ Properties properties = new Properties();
+ properties.put(ConfigurationKeys.FAILURE_REPORTING_FILE_ENABLED_KEY, "false");
+
+ gobblinMetrics.startMetricReporting(properties);
Assert.assertEquals(GobblinMetricsRegistry.getInstance().get(id).get(), gobblinMetrics);
gobblinMetrics.stopMetricsReporting();
Assert.assertFalse(GobblinMetricsRegistry.getInstance().get(id).isPresent());
}
+
+ public void testMetricFileReporterThrowsException() {
+ String id = getClass().getSimpleName() + "-" + System.currentTimeMillis();
+ GobblinMetrics gobblinMetrics = GobblinMetrics.get(id);
+
+ //Enable file reporter without specifying metrics.log.dir.
+ Config config = ConfigFactory.empty()
+ .withValue(ConfigurationKeys.METRICS_REPORTING_FILE_ENABLED_KEY, ConfigValueFactory.fromAnyRef(true));
+
+ Properties properties = ConfigUtils.configToProperties(config);
+ //Ensure MetricReporterException is thrown
+ try {
+ gobblinMetrics.startMetricReporting(properties);
+ Assert.fail("Metric reporting unexpectedly succeeded.");
+ } catch (MetricReporterException e) {
+ //Do nothing. Expected to be here.
+ } catch (EventReporterException e) {
+ Assert.fail("Unexpected exception " + e.getMessage());
+ }
+ }
+
+ public void testMetricFileReporterSuccessful() {
+ String id = getClass().getSimpleName() + "-" + System.currentTimeMillis();
+ GobblinMetrics gobblinMetrics = GobblinMetrics.get(id);
+
+ //Enable file reporter without specifying metrics.log.dir.
+ Config config = ConfigFactory.empty()
+ .withValue(ConfigurationKeys.METRICS_REPORTING_FILE_ENABLED_KEY, ConfigValueFactory.fromAnyRef(true))
+ .withValue(ConfigurationKeys.METRICS_LOG_DIR_KEY, ConfigValueFactory.fromAnyRef("/tmp"))
+ .withValue(ConfigurationKeys.FAILURE_LOG_DIR_KEY, ConfigValueFactory.fromAnyRef("/tmp"));
+
+ Properties properties = ConfigUtils.configToProperties(config);
+ //Ensure MetricReporterException is thrown
+ try {
+ gobblinMetrics.startMetricReporting(properties);
+ } catch (MetricReporterException e) {
+ Assert.fail("Unexpected exception " + e.getMessage());
+ } catch (EventReporterException e) {
+ Assert.fail("Unexpected exception " + e.getMessage());
+ }
+ }
+
}
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java
index 5c12360..4c64c31 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java
@@ -29,7 +29,10 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metrics.CustomCodahaleReporterFactory;
+import org.apache.gobblin.metrics.EventReporterException;
import org.apache.gobblin.metrics.KafkaReportingFormats;
+import org.apache.gobblin.metrics.MetricReporterException;
+import org.apache.gobblin.metrics.ReporterSinkType;
import org.apache.gobblin.metrics.RootMetricContext;
import org.apache.gobblin.metrics.reporter.util.KafkaReporterUtils;
@@ -37,7 +40,8 @@ import org.apache.gobblin.metrics.reporter.util.KafkaReporterUtils;
@Slf4j
public class KafkaReporterFactory implements CustomCodahaleReporterFactory {
@Override
- public ScheduledReporter newScheduledReporter(MetricRegistry registry, Properties properties) {
+ public ScheduledReporter newScheduledReporter(MetricRegistry registry, Properties properties)
+ throws IOException {
if (!Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_ENABLED_KEY,
ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_ENABLED))) {
return null;
@@ -63,8 +67,7 @@ public class KafkaReporterFactory implements CustomCodahaleReporterFactory {
"Kafka metrics brokers missing.");
Preconditions.checkArgument(KafkaReporterUtils.getMetricsTopic(properties).or(eventsTopic).or(defaultTopic).isPresent(), "Kafka topic missing.");
} catch (IllegalArgumentException exception) {
- log.error("Not reporting metrics to Kafka due to missing Kafka configuration(s).", exception);
- return null;
+ throw new MetricReporterException("Missing Kafka configuration(s).", exception, ReporterSinkType.KAFKA);
}
String brokers = properties.getProperty(ConfigurationKeys.METRICS_KAFKA_BROKERS);
@@ -87,7 +90,7 @@ public class KafkaReporterFactory implements CustomCodahaleReporterFactory {
try {
formatEnum.buildMetricsReporter(brokers, metricsTopic.or(defaultTopic).get(), properties);
} catch (IOException exception) {
- log.error("Failed to create Kafka metrics reporter. Will not report metrics to Kafka.", exception);
+ throw new MetricReporterException("Failed to create Kafka metrics reporter.", exception, ReporterSinkType.KAFKA);
}
}
@@ -115,7 +118,7 @@ public class KafkaReporterFactory implements CustomCodahaleReporterFactory {
return reporter;
} catch (IOException exception) {
- log.error("Failed to create Kafka events reporter. Will not report events to Kafka.", exception);
+ throw new EventReporterException("Failed to create Kafka events reporter.", exception, ReporterSinkType.KAFKA);
}
}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
index c649595..b509f09 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
@@ -73,7 +73,9 @@ import org.apache.gobblin.configuration.State;
import org.apache.gobblin.fsm.FiniteStateMachine;
import org.apache.gobblin.metastore.FsStateStore;
import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.metrics.EventReporterException;
import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.MetricReporterException;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.metrics.event.CountEventBuilder;
import org.apache.gobblin.metrics.event.JobEvent;
@@ -783,8 +785,14 @@ public class MRJobLauncher extends AbstractJobLauncher {
if (Boolean.valueOf(
configuration.get(ConfigurationKeys.METRICS_ENABLED_KEY, ConfigurationKeys.DEFAULT_METRICS_ENABLED))) {
this.jobMetrics = Optional.of(JobMetrics.get(this.jobState));
- this.jobMetrics.get()
- .startMetricReportingWithFileSuffix(gobblinJobState, context.getTaskAttemptID().toString());
+ try {
+ this.jobMetrics.get()
+ .startMetricReportingWithFileSuffix(gobblinJobState, context.getTaskAttemptID().toString());
+ } catch (MetricReporterException e) {
+ LOG.error("Failed to start {} metric reporter.", e.getType().name(), e);
+ } catch (EventReporterException e) {
+ LOG.error("Failed to start {} event reporter.", e.getType().name(), e);
+ }
}
}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/services/MetricsReportingService.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/services/MetricsReportingService.java
index 43b53cd..2456b66 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/services/MetricsReportingService.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/services/MetricsReportingService.java
@@ -21,12 +21,17 @@ import java.util.Properties;
import com.google.common.util.concurrent.AbstractIdleService;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.metrics.EventReporterException;
import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.MetricReporterException;
/**
* A {@link com.google.common.util.concurrent.Service} for handling life cycle events around {@link GobblinMetrics}.
*/
+@Slf4j
public class MetricsReportingService extends AbstractIdleService {
private final Properties properties;
@@ -39,7 +44,13 @@ public class MetricsReportingService extends AbstractIdleService {
@Override
protected void startUp() throws Exception {
- GobblinMetrics.get(this.appId).startMetricReporting(this.properties);
+ try {
+ GobblinMetrics.get(this.appId).startMetricReporting(this.properties);
+ } catch (MetricReporterException e) {
+ log.error("Failed to start {} metric reporter", e.getType().name(), e);
+ } catch (EventReporterException e) {
+ log.error("Failed to start {} event reporter", e.getType().name(), e);
+ }
}
@Override
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
index 3f127eb..5e1946e 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
@@ -99,7 +99,9 @@ import org.apache.gobblin.cluster.GobblinClusterUtils;
import org.apache.gobblin.cluster.HelixUtils;
import org.apache.gobblin.cluster.event.ClusterManagerShutdownRequest;
import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.EventReporterException;
import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.MetricReporterException;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.util.ConfigUtils;
@@ -402,7 +404,13 @@ public class YarnService extends AbstractIdleService {
// Intialize Gobblin metrics and start reporters
GobblinMetrics gobblinMetrics = GobblinMetrics.get(this.applicationId, null, tags.build());
- gobblinMetrics.startMetricReporting(ConfigUtils.configToProperties(config));
+ try {
+ gobblinMetrics.startMetricReporting(ConfigUtils.configToProperties(config));
+ } catch (MetricReporterException e) {
+ LOGGER.error("Failed to start {} metric reporter.", e.getType().name(), e);
+ } catch (EventReporterException e) {
+ LOGGER.error("Failed to start {} event reporter.", e.getType().name(), e);
+ }
return gobblinMetrics;
}