You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2017/12/07 23:28:24 UTC
incubator-gobblin git commit: [GOBBLIN-326] Add more metrics for GaaS
and Gobblin Cluster
Repository: incubator-gobblin
Updated Branches:
refs/heads/master 35b894c2c -> 1be745524
[GOBBLIN-326] Add more metrics for GaaS and Gobblin Cluster
Closes #2178 from yukuai518/timer
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/1be74552
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/1be74552
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/1be74552
Branch: refs/heads/master
Commit: 1be7455246d5c35900c8715c32559bd75b6c7bde
Parents: 35b894c
Author: Kuai Yu <ku...@linkedin.com>
Authored: Thu Dec 7 15:28:16 2017 -0800
Committer: Hung Tran <hu...@linkedin.com>
Committed: Thu Dec 7 15:28:16 2017 -0800
----------------------------------------------------------------------
.../gobblin/cluster/GobblinClusterManager.java | 68 +++++++++++++++-
.../cluster/GobblinHelixJobScheduler.java | 85 +++++++++++---------
.../instrumented/StandardMetricsBridge.java | 35 ++++++--
.../gobblin/metrics/ContextAwareHistogram.java | 8 ++
.../metrics/ContextAwareMetricFactory.java | 23 +++++-
.../metrics/ContextAwareMetricFactoryArgs.java | 47 +++++++++++
.../gobblin/metrics/ContextAwareTimer.java | 9 +++
.../apache/gobblin/metrics/InnerHistogram.java | 17 ++++
.../gobblin/metrics/InnerMetricContext.java | 17 ++++
.../org/apache/gobblin/metrics/InnerTimer.java | 14 ++++
.../apache/gobblin/metrics/MetricContext.java | 27 ++++---
.../apache/gobblin/runtime/api/JobCatalog.java | 77 ++++++++++--------
.../runtime/api/JobExecutionLauncher.java | 8 ++
.../gobblin/runtime/api/MutableJobCatalog.java | 43 ++++++++++
.../gobblin/runtime/api/MutableSpecCatalog.java | 42 ++++++++++
.../apache/gobblin/runtime/api/SpecCatalog.java | 85 ++++++++++++++++----
.../runtime/job_catalog/FSJobCatalog.java | 17 +++-
.../runtime/job_catalog/JobCatalogBase.java | 20 ++++-
.../runtime/job_catalog/StaticJobCatalog.java | 3 +
.../runtime/spec_catalog/FlowCatalog.java | 27 +++++--
.../runtime/spec_catalog/TopologyCatalog.java | 1 +
.../modules/core/GobblinServiceManager.java | 62 +++++++++++++-
.../scheduler/GobblinServiceJobScheduler.java | 2 +-
23 files changed, 615 insertions(+), 122 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1be74552/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
index 7948a8a..6b53c6c 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
@@ -20,6 +20,8 @@ package org.apache.gobblin.cluster;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -35,6 +37,12 @@ import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.lang.StringUtils;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.instrumented.StandardMetricsBridge;
+import org.apache.gobblin.metrics.ContextAwareHistogram;
+import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.MetricContext;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -83,9 +91,9 @@ import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher;
import org.apache.gobblin.scheduler.SchedulerService;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.JvmUtils;
-import org.apache.gobblin.util.logs.Log4jConfigurationHelper;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+import javax.annotation.Nonnull;
import lombok.Getter;
@@ -112,7 +120,7 @@ import lombok.Getter;
* @author Yinan Li
*/
@Alpha
-public class GobblinClusterManager implements ApplicationLauncher {
+public class GobblinClusterManager implements ApplicationLauncher, StandardMetricsBridge {
private static final Logger LOGGER = LoggerFactory.getLogger(GobblinClusterManager.class);
@@ -148,12 +156,14 @@ public class GobblinClusterManager implements ApplicationLauncher {
private GobblinHelixJobScheduler jobScheduler;
private final String clusterName;
private final Config config;
-
+ private final MetricContext metricContext;
+ private final Metrics metrics;
public GobblinClusterManager(String clusterName, String applicationId, Config config,
Optional<Path> appWorkDirOptional) throws Exception {
this.clusterName = clusterName;
this.config = config;
-
+ this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(config), this.getClass());
+ this.metrics = new Metrics(this.metricContext);
this.isStandaloneMode = ConfigUtils.getBoolean(config, GobblinClusterConfigurationKeys.STANDALONE_CLUSTER_MODE_KEY,
GobblinClusterConfigurationKeys.DEFAULT_STANDALONE_CLUSTER_MODE);
@@ -239,6 +249,7 @@ public class GobblinClusterManager implements ApplicationLauncher {
*/
@VisibleForTesting
void handleLeadershipChange(NotificationContext changeContext) {
+ this.metrics.clusterLeadershipChange.update(1);
if (this.helixManager.isLeader()) {
// can get multiple notifications on a leadership change, so only start the application launcher the first time
// the notification is received
@@ -531,6 +542,37 @@ public class GobblinClusterManager implements ApplicationLauncher {
this.applicationLauncher.close();
}
+ @Override
+ public StandardMetrics getStandardMetrics() {
+ return this.metrics;
+ }
+
+ @Nonnull
+ @Override
+ public MetricContext getMetricContext() {
+ return this.metricContext;
+ }
+
+ @Override
+ public boolean isInstrumentationEnabled() {
+ return GobblinMetrics.isEnabled(ConfigUtils.configToProperties(this.config));
+ }
+
+ @Override
+ public List<Tag<?>> generateTags(State state) {
+ return ImmutableList.of();
+ }
+
+ @Override
+ public void switchMetricContext(List<Tag<?>> tags) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void switchMetricContext(MetricContext context) {
+ throw new UnsupportedOperationException();
+ }
+
/**
* A custom implementation of {@link LiveInstanceChangeListener}.
*/
@@ -544,6 +586,24 @@ public class GobblinClusterManager implements ApplicationLauncher {
}
}
+ private class Metrics extends StandardMetrics {
+ public static final String CLUSTER_LEADERSHIP_CHANGE = "clusterLeadershipChange";
+ private ContextAwareHistogram clusterLeadershipChange;
+ public Metrics(final MetricContext metricContext) {
+ clusterLeadershipChange = metricContext.contextAwareHistogram(CLUSTER_LEADERSHIP_CHANGE, 1, TimeUnit.MINUTES);
+ }
+
+ @Override
+ public String getName() {
+ return GobblinClusterManager.class.getName();
+ }
+
+ @Override
+ public Collection<ContextAwareHistogram> getHistograms() {
+ return ImmutableList.of(this.clusterLeadershipChange);
+ }
+ }
+
/**
* A custom {@link MessageHandlerFactory} for {@link MessageHandler}s that handle messages of type
* "SHUTDOWN" for shutting down the controller.
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1be74552/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
index 3a25df7..9fd5add 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
@@ -19,7 +19,6 @@ package org.apache.gobblin.cluster;
import java.net.URI;
import java.net.URISyntaxException;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -28,12 +27,13 @@ import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metrics.ContextAwareHistogram;
import org.apache.hadoop.fs.Path;
import org.apache.helix.HelixManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.codahale.metrics.Gauge;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
@@ -50,8 +50,6 @@ import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.instrumented.StandardMetricsBridge;
import org.apache.gobblin.metrics.ContextAwareCounter;
import org.apache.gobblin.metrics.ContextAwareGauge;
-import org.apache.gobblin.metrics.ContextAwareHistogram;
-import org.apache.gobblin.metrics.ContextAwareMeter;
import org.apache.gobblin.metrics.ContextAwareTimer;
import org.apache.gobblin.metrics.GobblinMetrics;
import org.apache.gobblin.metrics.MetricContext;
@@ -69,7 +67,6 @@ import org.apache.gobblin.scheduler.SchedulerService;
import javax.annotation.Nonnull;
-import lombok.AllArgsConstructor;
/**
@@ -96,7 +93,7 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe
private final ConcurrentHashMap<String, Boolean> jobRunningMap;
private final MutableJobCatalog jobCatalog;
private final MetricContext metricContext;
- private final InnerStandardMetrics metrics;
+ private final Metrics metrics;
public GobblinHelixJobScheduler(Properties properties, HelixManager helixManager, EventBus eventBus,
Path appWorkDir, List<? extends Tag<?>> metadataTags, SchedulerService schedulerService,
@@ -109,16 +106,8 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe
this.appWorkDir = appWorkDir;
this.metadataTags = metadataTags;
this.jobCatalog = jobCatalog;
- this.metricContext = getDefaultMetricContext(properties);
- this.metrics = new InnerStandardMetrics(this.metricContext);
- }
-
- public MetricContext getDefaultMetricContext(Properties properties) {
- org.apache.gobblin.configuration.State fakeState =
- new org.apache.gobblin.configuration.State(properties);
- List<Tag<?>> tags = new ArrayList<>();
- MetricContext res = Instrumented.getMetricContext(fakeState, GobblinHelixJobScheduler.class, tags);
- return res;
+ this.metricContext = Instrumented.getMetricContext(new org.apache.gobblin.configuration.State(properties), this.getClass());
+ this.metrics = new Metrics(this.metricContext);
}
@Nonnull
@@ -148,36 +137,61 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe
}
@Override
- public StandardMetricsBridge.StandardMetrics getStandardMetrics() {
+ public StandardMetrics getStandardMetrics() {
return metrics;
}
- private class InnerStandardMetrics implements StandardMetrics {
+ private class Metrics extends StandardMetrics {
private final ContextAwareCounter numJobsLaunched;
private final ContextAwareCounter numJobsCompleted;
private final ContextAwareCounter numJobsCommitted;
private final ContextAwareCounter numJobsFailed;
private final ContextAwareCounter numJobsCancelled;
+ private final ContextAwareHistogram histogramJobsLaunched;
+ private final ContextAwareHistogram histogramJobsCompleted;
+ private final ContextAwareHistogram histogramJobsCommitted;
+ private final ContextAwareHistogram histogramJobsFailed;
+ private final ContextAwareHistogram histogramJobsCancelled;
+
private final ContextAwareGauge<Integer> numJobsRunning;
private final ContextAwareTimer timeForJobCompletion;
private final ContextAwareTimer timeForJobFailure;
+ private final ContextAwareTimer timeBeforeJobScheduling;
+ private final ContextAwareTimer timeBeforeJobLaunching;
- public InnerStandardMetrics(final MetricContext metricContext) {
+ public Metrics(final MetricContext metricContext) {
+ // All historical counters
this.numJobsLaunched = metricContext.contextAwareCounter(JobExecutionLauncher.StandardMetrics.NUM_JOBS_LAUNCHED_COUNTER);
this.numJobsCompleted = metricContext.contextAwareCounter(JobExecutionLauncher.StandardMetrics.NUM_JOBS_COMPLETED_COUNTER);
this.numJobsCommitted = metricContext.contextAwareCounter(JobExecutionLauncher.StandardMetrics.NUM_JOBS_COMMITTED_COUNTER);
this.numJobsFailed = metricContext.contextAwareCounter(JobExecutionLauncher.StandardMetrics.NUM_JOBS_FAILED_COUNTER);
this.numJobsCancelled = metricContext.contextAwareCounter(JobExecutionLauncher.StandardMetrics.NUM_JOBS_CANCELLED_COUNTER);
+
+ // Counters within last 1 minute
+ this.histogramJobsLaunched = metricContext.contextAwareHistogram(JobExecutionLauncher.StandardMetrics.NUM_JOBS_LAUNCHED_HISTOGRAM, 1, TimeUnit.MINUTES);
+ this.histogramJobsCompleted = metricContext.contextAwareHistogram(JobExecutionLauncher.StandardMetrics.NUM_JOBS_COMPLETED_HISTOGRAM, 1, TimeUnit.MINUTES);
+ this.histogramJobsCommitted = metricContext.contextAwareHistogram(JobExecutionLauncher.StandardMetrics.NUM_JOBS_COMMITTED_HISTOGRAM, 1, TimeUnit.MINUTES);
+ this.histogramJobsFailed = metricContext.contextAwareHistogram(JobExecutionLauncher.StandardMetrics.NUM_JOBS_FAILED_HISTOGRAM, 1, TimeUnit.MINUTES);
+ this.histogramJobsCancelled = metricContext.contextAwareHistogram(JobExecutionLauncher.StandardMetrics.NUM_JOBS_CANCELLED_HISTOGRAM, 1, TimeUnit.MINUTES);
+
this.numJobsRunning = metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_RUNNING_GAUGE,
- new Gauge<Integer>() {
- @Override public Integer getValue() {
- return (int)(InnerStandardMetrics.this.numJobsLaunched.getCount() -
- InnerStandardMetrics.this.numJobsCompleted.getCount());
- }
- });
- this.timeForJobCompletion = metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_FOR_JOB_COMPLETION);
- this.timeForJobFailure = metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_FOR_JOB_FAILURE);
+ ()->(int)(Metrics.this.numJobsLaunched.getCount() - Metrics.this.numJobsCompleted.getCount()));
+
+ this.timeForJobCompletion = metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_FOR_JOB_COMPLETION, 1, TimeUnit.MINUTES);
+ this.timeForJobFailure = metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_FOR_JOB_FAILURE,1, TimeUnit.MINUTES);
+ this.timeBeforeJobScheduling = metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_BEFORE_JOB_SCHEDULING, 1, TimeUnit.MINUTES);
+ this.timeBeforeJobLaunching = metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_BEFORE_JOB_LAUNCHING, 1, TimeUnit.MINUTES);
+ }
+
+ private void updateTimeBeforeJobScheduling (Properties jobConfig) {
+ long jobCreationTime = Long.parseLong(jobConfig.getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, "0"));
+ Instrumented.updateTimer(Optional.of(timeBeforeJobScheduling), System.currentTimeMillis() - jobCreationTime, TimeUnit.MILLISECONDS);
+ }
+
+ private void updateTimeBeforeJobLaunching (Properties jobConfig) {
+ long jobCreationTime = Long.parseLong(jobConfig.getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, "0"));
+ Instrumented.updateTimer(Optional.of(timeBeforeJobLaunching), System.currentTimeMillis() - jobCreationTime, TimeUnit.MILLISECONDS);
}
@Override
@@ -202,25 +216,20 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe
}
@Override
- public Collection<ContextAwareMeter> getMeters() {
- return null;
- }
-
- @Override
public Collection<ContextAwareTimer> getTimers() {
- return ImmutableList.of(timeForJobCompletion, timeForJobFailure);
+ return ImmutableList.of(timeForJobCompletion, timeForJobFailure, timeBeforeJobScheduling, timeBeforeJobLaunching);
}
@Override
public Collection<ContextAwareHistogram> getHistograms() {
- return null;
+ return ImmutableList.of(histogramJobsCompleted, histogramJobsLaunched, histogramJobsFailed, histogramJobsCancelled, histogramJobsCommitted);
}
}
private class MetricsTrackingListener extends AbstractJobListener {
- private final InnerStandardMetrics metrics;
+ private final Metrics metrics;
private static final String START_TIME = "startTime";
- MetricsTrackingListener(InnerStandardMetrics metrics) {
+ MetricsTrackingListener(Metrics metrics) {
this.metrics = metrics;
}
@@ -308,6 +317,9 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe
Properties jobConfig = new Properties();
jobConfig.putAll(this.properties);
jobConfig.putAll(newJobArrival.getJobConfig());
+
+ metrics.updateTimeBeforeJobScheduling(jobConfig);
+
if (jobConfig.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
LOGGER.info("Scheduling job " + newJobArrival.getJobName());
scheduleJob(jobConfig, new MetricsTrackingListener(metrics));
@@ -365,6 +377,7 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe
@Override
public void run() {
try {
+ ((MetricsTrackingListener)jobListener).metrics.updateTimeBeforeJobLaunching(this.jobConfig);
GobblinHelixJobScheduler.this.runJob(this.jobConfig, this.jobListener);
// remove non-scheduled job catalog once done so it won't be re-executed
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1be74552/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/StandardMetricsBridge.java
----------------------------------------------------------------------
diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/StandardMetricsBridge.java b/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/StandardMetricsBridge.java
index 087cf39..3993dce 100644
--- a/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/StandardMetricsBridge.java
+++ b/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/StandardMetricsBridge.java
@@ -25,6 +25,9 @@ import org.apache.gobblin.metrics.ContextAwareHistogram;
import org.apache.gobblin.metrics.ContextAwareMeter;
import org.apache.gobblin.metrics.ContextAwareTimer;
+import com.google.common.collect.ImmutableList;
+
+
/**
* This interface indicates a class will expose its metrics to some external systems.
*/
@@ -32,12 +35,30 @@ public interface StandardMetricsBridge extends Instrumentable {
StandardMetrics getStandardMetrics();
- interface StandardMetrics {
- String getName();
- Collection<ContextAwareGauge<?>> getGauges();
- Collection<ContextAwareCounter> getCounters();
- Collection<ContextAwareMeter> getMeters();
- Collection<ContextAwareTimer> getTimers();
- Collection<ContextAwareHistogram> getHistograms();
+ public class StandardMetrics {
+
+ public String getName() {
+ return this.getClass().getName();
+ }
+
+ public Collection<ContextAwareGauge<?>> getGauges() {
+ return ImmutableList.of();
+ }
+
+ public Collection<ContextAwareCounter> getCounters() {
+ return ImmutableList.of();
+ }
+
+ public Collection<ContextAwareMeter> getMeters() {
+ return ImmutableList.of();
+ }
+
+ public Collection<ContextAwareTimer> getTimers() {
+ return ImmutableList.of();
+ }
+
+ public Collection<ContextAwareHistogram> getHistograms() {
+ return ImmutableList.of();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1be74552/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/ContextAwareHistogram.java
----------------------------------------------------------------------
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/ContextAwareHistogram.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/ContextAwareHistogram.java
index a940715..a45f928 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/ContextAwareHistogram.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/ContextAwareHistogram.java
@@ -17,10 +17,13 @@
package org.apache.gobblin.metrics;
+import java.util.concurrent.TimeUnit;
+
import lombok.experimental.Delegate;
import com.codahale.metrics.ExponentiallyDecayingReservoir;
import com.codahale.metrics.Histogram;
+import com.codahale.metrics.SlidingTimeWindowReservoir;
import org.apache.gobblin.metrics.metric.InnerMetric;
@@ -52,7 +55,12 @@ public class ContextAwareHistogram extends Histogram implements ContextAwareMetr
super(new ExponentiallyDecayingReservoir());
this.innerHistogram = new InnerHistogram(context, name, this);
this.context = context;
+ }
+ ContextAwareHistogram(MetricContext context, String name, long windowSize, TimeUnit unit) {
+ super(new SlidingTimeWindowReservoir(windowSize, unit));
+ this.innerHistogram = new InnerHistogram(context, name, this, windowSize, unit);
+ this.context = context;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1be74552/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/ContextAwareMetricFactory.java
----------------------------------------------------------------------
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/ContextAwareMetricFactory.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/ContextAwareMetricFactory.java
index d699b06..e9f6d62 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/ContextAwareMetricFactory.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/ContextAwareMetricFactory.java
@@ -23,7 +23,6 @@ import com.codahale.metrics.Meter;
import com.codahale.metrics.Metric;
import com.codahale.metrics.Timer;
-
/**
* An interface for factory classes for {@link ContextAwareMetric}s.
*
@@ -49,6 +48,10 @@ public interface ContextAwareMetricFactory<T extends ContextAwareMetric> {
*/
public T newMetric(MetricContext context, String name);
+ default public T newMetric(ContextAwareMetricFactoryArgs args) {
+ return null;
+ }
+
/**
* Check if a given metric is an instance of the type of context-aware metrics created by this
* {@link ContextAwareMetricFactory}.
@@ -102,6 +105,15 @@ public interface ContextAwareMetricFactory<T extends ContextAwareMetric> {
}
@Override
+ public ContextAwareHistogram newMetric(ContextAwareMetricFactoryArgs args) {
+ if (args instanceof ContextAwareMetricFactoryArgs.SlidingTimeWindowArgs) {
+ ContextAwareMetricFactoryArgs.SlidingTimeWindowArgs windowArgs = (ContextAwareMetricFactoryArgs.SlidingTimeWindowArgs)args;
+ return new ContextAwareHistogram(windowArgs.getContext(), windowArgs.getName(), windowArgs.getWindowSize(), windowArgs.getUnit());
+ }
+ throw new UnsupportedOperationException("Unknown factory arguments to create ContextAwareHistogram");
+ }
+
+ @Override
public boolean isInstance(Metric metric) {
return Histogram.class.isInstance(metric);
}
@@ -118,6 +130,15 @@ public interface ContextAwareMetricFactory<T extends ContextAwareMetric> {
}
@Override
+ public ContextAwareTimer newMetric(ContextAwareMetricFactoryArgs args) {
+ if (args instanceof ContextAwareMetricFactoryArgs.SlidingTimeWindowArgs) {
+ ContextAwareMetricFactoryArgs.SlidingTimeWindowArgs windowArgs = (ContextAwareMetricFactoryArgs.SlidingTimeWindowArgs)args;
+ return new ContextAwareTimer(windowArgs.getContext(), windowArgs.getName(), windowArgs.getWindowSize(), windowArgs.getUnit());
+ }
+ throw new UnsupportedOperationException("Unknown factory arguments to create ContextAwareTimer");
+ }
+
+ @Override
public boolean isInstance(Metric metric) {
return Timer.class.isInstance(metric);
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1be74552/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/ContextAwareMetricFactoryArgs.java
----------------------------------------------------------------------
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/ContextAwareMetricFactoryArgs.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/ContextAwareMetricFactoryArgs.java
new file mode 100644
index 0000000..1e7e077
--- /dev/null
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/ContextAwareMetricFactoryArgs.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics;
+
+import java.util.concurrent.TimeUnit;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
+
+/**
+ * A class which wraps all arguments required by {@link ContextAwareMetricFactory}s.
+ *
+ * A concrete {@link ContextAwareMetricFactory} knows how to interpret this class into its corresponding sub-type.
+ */
+@AllArgsConstructor
+@Getter
+public class ContextAwareMetricFactoryArgs {
+ protected final MetricContext context;
+ protected final String name;
+
+ @Getter
+ public static class SlidingTimeWindowArgs extends ContextAwareMetricFactoryArgs {
+ protected final long windowSize;
+ protected final TimeUnit unit;
+ public SlidingTimeWindowArgs(MetricContext context, String name, long windowSize, TimeUnit unit) {
+ super(context, name);
+ this.windowSize = windowSize;
+ this.unit = unit;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1be74552/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/ContextAwareTimer.java
----------------------------------------------------------------------
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/ContextAwareTimer.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/ContextAwareTimer.java
index 33d68ed..839df20 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/ContextAwareTimer.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/ContextAwareTimer.java
@@ -17,6 +17,9 @@
package org.apache.gobblin.metrics;
+import java.util.concurrent.TimeUnit;
+
+import com.codahale.metrics.SlidingTimeWindowReservoir;
import com.codahale.metrics.Timer;
import org.apache.gobblin.metrics.metric.InnerMetric;
@@ -51,6 +54,12 @@ public class ContextAwareTimer extends Timer implements ContextAwareMetric {
this.context = context;
}
+ ContextAwareTimer(MetricContext context, String name, long windowSize, TimeUnit unit) {
+ super(new SlidingTimeWindowReservoir(windowSize, unit));
+ this.innerTimer = new InnerTimer(context, name, this, windowSize, unit);
+ this.context = context;
+ }
+
@Override
public MetricContext getContext() {
return this.context;
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1be74552/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/InnerHistogram.java
----------------------------------------------------------------------
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/InnerHistogram.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/InnerHistogram.java
index 7fb5401..66f8fd2 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/InnerHistogram.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/InnerHistogram.java
@@ -18,9 +18,11 @@
package org.apache.gobblin.metrics;
import java.lang.ref.WeakReference;
+import java.util.concurrent.TimeUnit;
import com.codahale.metrics.ExponentiallyDecayingReservoir;
import com.codahale.metrics.Histogram;
+import com.codahale.metrics.SlidingTimeWindowReservoir;
import com.google.common.base.Optional;
import org.apache.gobblin.metrics.metric.InnerMetric;
@@ -50,6 +52,21 @@ public class InnerHistogram extends Histogram implements InnerMetric {
this.contextAwareHistogram = new WeakReference<>(contextAwareHistogram);
}
+ InnerHistogram(MetricContext context, String name, ContextAwareHistogram contextAwareHistogram, long windowSize, TimeUnit unit) {
+ super(new SlidingTimeWindowReservoir(windowSize, unit));
+
+ this.name = name;
+
+ Optional<MetricContext> parentContext = context.getParent();
+ if (parentContext.isPresent()) {
+ this.parentHistogram = Optional.fromNullable(parentContext.get().contextAwareHistogram(name, windowSize, unit));
+ } else {
+ this.parentHistogram = Optional.absent();
+ }
+
+ this.contextAwareHistogram = new WeakReference<>(contextAwareHistogram);
+ }
+
@Override
public void update(int value) {
update((long) value);
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1be74552/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/InnerMetricContext.java
----------------------------------------------------------------------
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/InnerMetricContext.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/InnerMetricContext.java
index 70502ec..2738974 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/InnerMetricContext.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/InnerMetricContext.java
@@ -368,6 +368,23 @@ public class InnerMetricContext extends MetricRegistry implements ReportableCont
return newMetric;
}
+ @SuppressWarnings("unchecked")
+ protected synchronized <T extends ContextAwareMetric> T getOrCreate(
+ ContextAwareMetricFactory<T> factory, ContextAwareMetricFactoryArgs args) {
+ String name = args.getName();
+ InnerMetric metric = this.contextAwareMetrics.get(name);
+ if (metric != null) {
+ if (factory.isInstance(metric)) {
+ return (T) metric.getContextAwareMetric();
+ }
+ throw new IllegalArgumentException(name + " is already used for a different type of metric");
+ }
+
+ T newMetric = factory.newMetric(args);
+ this.register(name, newMetric);
+ return newMetric;
+ }
+
private boolean removeChildrenMetrics(String name) {
boolean removed = true;
for (MetricContext child : getChildContextsAsMap().values()) {
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1be74552/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/InnerTimer.java
----------------------------------------------------------------------
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/InnerTimer.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/InnerTimer.java
index db4fc0a..be3517f 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/InnerTimer.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/InnerTimer.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.metrics;
import java.lang.ref.WeakReference;
import java.util.concurrent.TimeUnit;
+import com.codahale.metrics.SlidingTimeWindowReservoir;
import com.codahale.metrics.Timer;
import com.google.common.base.Optional;
@@ -47,6 +48,19 @@ public class InnerTimer extends Timer implements InnerMetric {
this.timer = new WeakReference<>(contextAwareTimer);
}
+ InnerTimer(MetricContext context, String name, ContextAwareTimer contextAwareTimer, long windowSize, TimeUnit unit) {
+ super(new SlidingTimeWindowReservoir(windowSize, unit));
+ this.name = name;
+
+ Optional<MetricContext> parentContext = context.getParent();
+ if (parentContext.isPresent()) {
+ this.parentTimer = Optional.fromNullable(parentContext.get().contextAwareTimer(name, windowSize, unit));
+ } else {
+ this.parentTimer = Optional.absent();
+ }
+ this.timer = new WeakReference<>(contextAwareTimer);
+ }
+
@Override
public void update(long duration, TimeUnit unit) {
super.update(duration, unit);
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1be74552/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContext.java
----------------------------------------------------------------------
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContext.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContext.java
index dcc1029..46f8ab1 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContext.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContext.java
@@ -465,19 +465,21 @@ public class MetricContext extends MetricRegistry implements ReportableContext,
* @return the {@link ContextAwareHistogram} with the given name
*/
public ContextAwareHistogram contextAwareHistogram(String name) {
- return contextAwareHistogram(name, ContextAwareMetricFactory.DEFAULT_CONTEXT_AWARE_HISTOGRAM_FACTORY);
+ return this.innerMetricContext.getOrCreate(name, ContextAwareMetricFactory.DEFAULT_CONTEXT_AWARE_HISTOGRAM_FACTORY);
}
/**
- * Get a {@link ContextAwareHistogram} with a given name.
+ * Get a {@link ContextAwareHistogram} with a given name and a customized {@link com.codahale.metrics.SlidingTimeWindowReservoir}
*
* @param name name of the {@link ContextAwareHistogram}
- * @param factory a {@link ContextAwareMetricFactory} for building {@link ContextAwareHistogram}s
+ * @param windowSize normally the duration of the time window
+ * @param unit the unit of time
* @return the {@link ContextAwareHistogram} with the given name
*/
- public ContextAwareHistogram contextAwareHistogram(String name,
- ContextAwareMetricFactory<ContextAwareHistogram> factory) {
- return this.innerMetricContext.getOrCreate(name, factory);
+ public ContextAwareHistogram contextAwareHistogram(String name, long windowSize, TimeUnit unit) {
+ ContextAwareMetricFactoryArgs.SlidingTimeWindowArgs args = new ContextAwareMetricFactoryArgs.SlidingTimeWindowArgs(
+ this.innerMetricContext.getMetricContext().get(), name, windowSize, unit);
+ return this.innerMetricContext.getOrCreate(ContextAwareMetricFactory.DEFAULT_CONTEXT_AWARE_HISTOGRAM_FACTORY, args);
}
/**
@@ -487,18 +489,21 @@ public class MetricContext extends MetricRegistry implements ReportableContext,
* @return the {@link ContextAwareTimer} with the given name
*/
public ContextAwareTimer contextAwareTimer(String name) {
- return contextAwareTimer(name, ContextAwareMetricFactory.DEFAULT_CONTEXT_AWARE_TIMER_FACTORY);
+ return this.innerMetricContext.getOrCreate(name, ContextAwareMetricFactory.DEFAULT_CONTEXT_AWARE_TIMER_FACTORY);
}
/**
- * Get a {@link ContextAwareTimer} with a given name.
+ * Get a {@link ContextAwareTimer} with a given name and a customized {@link com.codahale.metrics.SlidingTimeWindowReservoir}
*
* @param name name of the {@link ContextAwareTimer}
- * @param factory a {@link ContextAwareMetricFactory} for building {@link ContextAwareTimer}s
+ * @param windowSize normally the duration of the time window
+ * @param unit the unit of time
* @return the {@link ContextAwareTimer} with the given name
*/
- public ContextAwareTimer contextAwareTimer(String name, ContextAwareMetricFactory<ContextAwareTimer> factory) {
- return this.innerMetricContext.getOrCreate(name, factory);
+ public ContextAwareTimer contextAwareTimer(String name, long windowSize, TimeUnit unit) {
+ ContextAwareMetricFactoryArgs.SlidingTimeWindowArgs args = new ContextAwareMetricFactoryArgs.SlidingTimeWindowArgs(
+ this.innerMetricContext.getMetricContext().get(), name, windowSize, unit);
+ return this.innerMetricContext.getOrCreate(ContextAwareMetricFactory.DEFAULT_CONTEXT_AWARE_TIMER_FACTORY, args);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1be74552/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java
index df1dc44..6c0ea5b 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java
@@ -17,28 +17,30 @@
package org.apache.gobblin.runtime.api;
import java.net.URI;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.List;
+import java.util.concurrent.TimeUnit;
import com.codahale.metrics.Gauge;
+import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
-import com.google.common.util.concurrent.Service;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.instrumented.GobblinMetricsKeys;
import org.apache.gobblin.instrumented.Instrumentable;
+import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.instrumented.StandardMetricsBridge;
import org.apache.gobblin.metrics.ContextAwareCounter;
import org.apache.gobblin.metrics.ContextAwareGauge;
import org.apache.gobblin.metrics.ContextAwareHistogram;
-import org.apache.gobblin.metrics.ContextAwareMeter;
import org.apache.gobblin.metrics.ContextAwareTimer;
import org.apache.gobblin.metrics.GobblinTrackingEvent;
+import org.apache.gobblin.metrics.MetricContext;
import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
/**
* A catalog of all the {@link JobSpec}s a Gobblin instance is currently aware of.
@@ -50,10 +52,9 @@ public interface JobCatalog extends JobCatalogListenersContainer, Instrumentable
/** Metrics for the job catalog; null if
* ({@link #isInstrumentationEnabled()}) is false. */
- StandardMetrics getMetrics();
+ JobCatalog.StandardMetrics getMetrics();
- @Override
- default StandardMetrics getStandardMetrics() {
+ default StandardMetricsBridge.StandardMetrics getStandardMetrics() {
return getMetrics();
}
@@ -63,11 +64,16 @@ public interface JobCatalog extends JobCatalogListenersContainer, Instrumentable
**/
JobSpec getJobSpec(URI uri) throws JobSpecNotFoundException;
- public static class StandardMetrics implements JobCatalogListener, StandardMetricsBridge.StandardMetrics {
+ @Slf4j
+ public static class StandardMetrics extends StandardMetricsBridge.StandardMetrics implements JobCatalogListener {
public static final String NUM_ACTIVE_JOBS_NAME = "numActiveJobs";
public static final String NUM_ADDED_JOBS = "numAddedJobs";
public static final String NUM_DELETED_JOBS = "numDeletedJobs";
public static final String NUM_UPDATED_JOBS = "numUpdatedJobs";
+ public static final String TIME_FOR_JOB_CATALOG_GET = "timeForJobCatalogGet";
+ public static final String HISTOGRAM_FOR_JOB_ADD = "histogramForJobAdd";
+ public static final String HISTOGRAM_FOR_JOB_UPDATE = "histogramForJobUpdate";
+ public static final String HISTOGRAM_FOR_JOB_DELETE = "histogramForJobDelete";
public static final String TRACKING_EVENT_NAME = "JobCatalogEvent";
public static final String JOB_ADDED_OPERATION_TYPE = "JobAdded";
public static final String JOB_DELETED_OPERATION_TYPE = "JobDeleted";
@@ -77,22 +83,36 @@ public interface JobCatalog extends JobCatalogListenersContainer, Instrumentable
@Getter private final ContextAwareCounter numAddedJobs;
@Getter private final ContextAwareCounter numDeletedJobs;
@Getter private final ContextAwareCounter numUpdatedJobs;
-
- public StandardMetrics(final JobCatalog parent) {
- this.numAddedJobs = parent.getMetricContext().contextAwareCounter(NUM_ADDED_JOBS);
- this.numDeletedJobs = parent.getMetricContext().contextAwareCounter(NUM_DELETED_JOBS);
- this.numUpdatedJobs = parent.getMetricContext().contextAwareCounter(NUM_UPDATED_JOBS);
- this.numActiveJobs = parent.getMetricContext().newContextAwareGauge(NUM_ACTIVE_JOBS_NAME,
- new Gauge<Integer>() {
- @Override public Integer getValue() {
- return parent.getJobs().size();
- }
+ @Getter private final ContextAwareTimer timeForJobCatalogGet;
+ @Getter private final ContextAwareHistogram histogramForJobAdd;
+ @Getter private final ContextAwareHistogram histogramForJobUpdate;
+ @Getter private final ContextAwareHistogram histogramForJobDelete;
+
+ public StandardMetrics(final JobCatalog jobCatalog) {
+ MetricContext context = jobCatalog.getMetricContext();
+ this.timeForJobCatalogGet = context.contextAwareTimer(TIME_FOR_JOB_CATALOG_GET, 1, TimeUnit.MINUTES);
+ this.numAddedJobs = context.contextAwareCounter(NUM_ADDED_JOBS);
+ this.numDeletedJobs = context.contextAwareCounter(NUM_DELETED_JOBS);
+ this.numUpdatedJobs = context.contextAwareCounter(NUM_UPDATED_JOBS);
+ this.numActiveJobs = context.newContextAwareGauge(NUM_ACTIVE_JOBS_NAME, ()->{
+ long startTime = System.currentTimeMillis();
+ int size = jobCatalog.getJobs().size();
+ updateGetJobTime(startTime);
+ return size;
});
- parent.addListener(this);
+ this.histogramForJobAdd = context.contextAwareHistogram(HISTOGRAM_FOR_JOB_ADD, 1, TimeUnit.MINUTES);
+ this.histogramForJobUpdate = context.contextAwareHistogram(HISTOGRAM_FOR_JOB_UPDATE, 1, TimeUnit.MINUTES);
+ this.histogramForJobDelete = context.contextAwareHistogram(HISTOGRAM_FOR_JOB_DELETE, 1, TimeUnit.MINUTES);
+ }
+
+ public void updateGetJobTime(long startTime) {
+ log.info("updateGetJobTime...");
+ Instrumented.updateTimer(Optional.of(this.timeForJobCatalogGet), System.currentTimeMillis() - startTime, TimeUnit.MILLISECONDS);
}
@Override public void onAddJob(JobSpec addedJob) {
this.numAddedJobs.inc();
+ this.histogramForJobAdd.update(1);
submitTrackingEvent(addedJob, JOB_ADDED_OPERATION_TYPE);
}
@@ -116,44 +136,35 @@ public interface JobCatalog extends JobCatalogListenersContainer, Instrumentable
@Override
public void onDeleteJob(URI deletedJobURI, String deletedJobVersion) {
this.numDeletedJobs.inc();
+ this.histogramForJobDelete.update(1);
submitTrackingEvent(deletedJobURI, deletedJobVersion, JOB_DELETED_OPERATION_TYPE);
}
@Override
public void onUpdateJob(JobSpec updatedJob) {
this.numUpdatedJobs.inc();
+ this.histogramForJobUpdate.update(1);
submitTrackingEvent(updatedJob, JOB_UPDATED_OPERATION_TYPE);
}
@Override
- public String getName() {
- return "JobCatalog";
- }
-
- @Override
public Collection<ContextAwareGauge<?>> getGauges() {
return Collections.singleton(this.numActiveJobs);
}
@Override
public Collection<ContextAwareCounter> getCounters() {
- List<ContextAwareCounter> counters = ImmutableList.of(numAddedJobs, numDeletedJobs, numDeletedJobs);
- return counters;
- }
-
- @Override
- public Collection<ContextAwareMeter> getMeters() {
- return null;
+ return ImmutableList.of(numAddedJobs, numDeletedJobs, numUpdatedJobs);
}
@Override
public Collection<ContextAwareTimer> getTimers() {
- return null;
+ return ImmutableList.of(timeForJobCatalogGet);
}
@Override
public Collection<ContextAwareHistogram> getHistograms() {
- return null;
+ return ImmutableList.of(histogramForJobAdd, histogramForJobDelete, histogramForJobUpdate);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1be74552/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java
index e63cb4a..fb54139 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java
@@ -43,8 +43,16 @@ public interface JobExecutionLauncher extends Instrumentable {
public static final String NUM_JOBS_FAILED_COUNTER = "numJobsFailed";
public static final String NUM_JOBS_CANCELLED_COUNTER = "numJobsCancelled";
public static final String NUM_JOBS_RUNNING_GAUGE = "numJobsRunning";
+ public static final String NUM_JOBS_LAUNCHED_HISTOGRAM = "histogramJobsLaunched";
+ public static final String NUM_JOBS_COMPLETED_HISTOGRAM = "histogramJobsCompleted";
+ public static final String NUM_JOBS_COMMITTED_HISTOGRAM = "histogramJobsCommitted";
+ public static final String NUM_JOBS_FAILED_HISTOGRAM = "histogramJobsFailed";
+ public static final String NUM_JOBS_CANCELLED_HISTOGRAM = "histogramJobsCancelled";
+
public static final String TIMER_FOR_JOB_COMPLETION = "timerForJobCompletion";
public static final String TIMER_FOR_JOB_FAILURE = "timerForJobFailure";
+ public static final String TIMER_BEFORE_JOB_SCHEDULING = "timerBeforeJobScheduling";
+ public static final String TIMER_BEFORE_JOB_LAUNCHING = "timerBeforeJobLaunching";
public static final String TRACKING_EVENT_NAME = "JobExecutionLauncherEvent";
public static final String JOB_EXECID_META = "jobExecId";
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1be74552/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableJobCatalog.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableJobCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableJobCatalog.java
index b5c82b7..8b6e98c 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableJobCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableJobCatalog.java
@@ -17,8 +17,19 @@
package org.apache.gobblin.runtime.api;
import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareTimer;
+
+import com.google.common.base.Optional;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
/**
* A {@link JobCatalog} that can have its {@link Collection} of {@link JobSpec}s modified
@@ -37,4 +48,36 @@ public interface MutableJobCatalog extends JobCatalog {
* Removes an existing JobSpec with the given URI. A no-op if such JobSpec does not exist.
*/
void remove(URI uri);
+
+ @Slf4j
+ public static class MutableStandardMetrics extends JobCatalog.StandardMetrics {
+ public static final String TIME_FOR_JOB_CATALOG_REMOVE = "timeForJobCatalogRemove";
+ public static final String TIME_FOR_JOB_CATALOG_PUT = "timeForJobCatalogPut";
+ @Getter private final ContextAwareTimer timeForJobCatalogPut;
+ @Getter private final ContextAwareTimer timeForJobCatalogRemove;
+ public MutableStandardMetrics(JobCatalog catalog) {
+ super(catalog);
+ timeForJobCatalogPut = catalog.getMetricContext().contextAwareTimer(TIME_FOR_JOB_CATALOG_PUT, 1, TimeUnit.MINUTES);
+ timeForJobCatalogRemove = catalog.getMetricContext().contextAwareTimer(TIME_FOR_JOB_CATALOG_REMOVE, 1, TimeUnit.MINUTES);
+ }
+
+ public void updatePutJobTime(long startTime) {
+ log.info("updatePutJobTime...");
+ Instrumented.updateTimer(Optional.of(this.timeForJobCatalogPut), System.currentTimeMillis() - startTime, TimeUnit.MILLISECONDS);
+ }
+
+ public void updateRemoveJobTime(long startTime) {
+ log.info("updateRemoveJobTime...");
+ Instrumented.updateTimer(Optional.of(this.timeForJobCatalogRemove), System.currentTimeMillis() - startTime, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public Collection<ContextAwareTimer> getTimers() {
+ Collection<ContextAwareTimer> all = new ArrayList<>();
+ all.addAll(super.getTimers());
+ all.add(this.timeForJobCatalogPut);
+ all.add(this.timeForJobCatalogRemove);
+ return all;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1be74552/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java
index f63600a..3aa16be 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java
@@ -18,7 +18,17 @@
package org.apache.gobblin.runtime.api;
import java.net.URI;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareTimer;
+
+import com.google.common.base.Optional;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
/**
@@ -38,4 +48,36 @@ public interface MutableSpecCatalog extends SpecCatalog {
* Throws SpecNotFoundException if such {@link Spec} does not exist.
*/
void remove(URI uri) throws SpecNotFoundException;
+
+ @Slf4j
+ public static class MutableStandardMetrics extends StandardMetrics {
+ public static final String TIME_FOR_SPEC_CATALOG_REMOVE = "timeForSpecCatalogRemove";
+ public static final String TIME_FOR_SPEC_CATALOG_PUT = "timeForSpecCatalogPut";
+ @Getter private final ContextAwareTimer timeForSpecCatalogPut;
+ @Getter private final ContextAwareTimer timeForSpecCatalogRemove;
+ public MutableStandardMetrics(SpecCatalog catalog) {
+ super(catalog);
+ timeForSpecCatalogPut = catalog.getMetricContext().contextAwareTimer(TIME_FOR_SPEC_CATALOG_PUT, 1, TimeUnit.MINUTES);
+ timeForSpecCatalogRemove = catalog.getMetricContext().contextAwareTimer(TIME_FOR_SPEC_CATALOG_REMOVE, 1, TimeUnit.MINUTES);
+ }
+
+ public void updatePutSpecTime(long startTime) {
+ log.info("updatePutSpecTime...");
+ Instrumented.updateTimer(Optional.of(this.timeForSpecCatalogPut), System.currentTimeMillis() - startTime, TimeUnit.MILLISECONDS);
+ }
+
+ public void updateRemoveSpecTime(long startTime) {
+ log.info("updateRemoveSpecTime...");
+ Instrumented.updateTimer(Optional.of(this.timeForSpecCatalogRemove), System.currentTimeMillis() - startTime, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public Collection<ContextAwareTimer> getTimers() {
+ Collection<ContextAwareTimer> all = new ArrayList<>();
+ all.addAll(super.getTimers());
+ all.add(this.timeForSpecCatalogPut);
+ all.add(this.timeForSpecCatalogRemove);
+ return all;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1be74552/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java
index d2af9b8..4b85ea9 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java
@@ -19,20 +19,29 @@ package org.apache.gobblin.runtime.api;
import java.net.URI;
import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
import com.codahale.metrics.Gauge;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.gobblin.instrumented.GobblinMetricsKeys;
-import org.apache.gobblin.instrumented.Instrumentable;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.instrumented.StandardMetricsBridge;
import org.apache.gobblin.metrics.ContextAwareCounter;
import org.apache.gobblin.metrics.ContextAwareGauge;
+import org.apache.gobblin.metrics.ContextAwareHistogram;
+import org.apache.gobblin.metrics.ContextAwareTimer;
import org.apache.gobblin.metrics.GobblinTrackingEvent;
+import org.apache.gobblin.metrics.MetricContext;
import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
-public interface SpecCatalog extends SpecCatalogListenersContainer, Instrumentable {
+public interface SpecCatalog extends SpecCatalogListenersContainer, StandardMetricsBridge {
/** Returns an immutable {@link Collection} of {@link Spec}s that are known to the catalog. */
Collection<Spec> getSpecs();
@@ -40,13 +49,18 @@ public interface SpecCatalog extends SpecCatalogListenersContainer, Instrumentab
* ({@link #isInstrumentationEnabled()}) is false. */
SpecCatalog.StandardMetrics getMetrics();
+ default StandardMetricsBridge.StandardMetrics getStandardMetrics() {
+ return this.getMetrics();
+ }
+
/**
* Get a {@link Spec} by uri.
* @throws SpecNotFoundException if no such Spec exists
**/
Spec getSpec(URI uri) throws SpecNotFoundException;
- public static class StandardMetrics implements SpecCatalogListener {
+ @Slf4j
+ public static class StandardMetrics extends StandardMetricsBridge.StandardMetrics implements SpecCatalogListener {
public static final String NUM_ACTIVE_SPECS_NAME = "numActiveSpecs";
public static final String NUM_ADDED_SPECS = "numAddedSpecs";
public static final String NUM_DELETED_SPECS = "numDeletedSpecs";
@@ -55,28 +69,65 @@ public interface SpecCatalog extends SpecCatalogListenersContainer, Instrumentab
public static final String SPEC_ADDED_OPERATION_TYPE = "SpecAdded";
public static final String SPEC_DELETED_OPERATION_TYPE = "SpecDeleted";
public static final String SPEC_UPDATED_OPERATION_TYPE = "SpecUpdated";
+ public static final String TIME_FOR_SPEC_CATALOG_GET = "timeForSpecCatalogGet";
+ public static final String HISTOGRAM_FOR_SPEC_ADD = "histogramForSpecAdd";
+ public static final String HISTOGRAM_FOR_SPEC_UPDATE = "histogramForSpecUpdate";
+ public static final String HISTOGRAM_FOR_SPEC_DELETE = "histogramForSpecDelete";
- @Getter
- private final ContextAwareGauge<Integer> numActiveSpecs;
+ @Getter private final ContextAwareGauge<Integer> numActiveSpecs;
@Getter private final ContextAwareCounter numAddedSpecs;
@Getter private final ContextAwareCounter numDeletedSpecs;
@Getter private final ContextAwareCounter numUpdatedSpecs;
+ @Getter private final ContextAwareTimer timeForSpecCatalogGet;
+ @Getter private final ContextAwareHistogram histogramForSpecAdd;
+ @Getter private final ContextAwareHistogram histogramForSpecUpdate;
+ @Getter private final ContextAwareHistogram histogramForSpecDelete;
+
+ public StandardMetrics(final SpecCatalog specCatalog) {
+ MetricContext context = specCatalog.getMetricContext();
+ this.timeForSpecCatalogGet = context.contextAwareTimer(TIME_FOR_SPEC_CATALOG_GET, 1, TimeUnit.MINUTES);
+ this.numAddedSpecs = context.contextAwareCounter(NUM_ADDED_SPECS);
+ this.numDeletedSpecs = context.contextAwareCounter(NUM_DELETED_SPECS);
+ this.numUpdatedSpecs = context.contextAwareCounter(NUM_UPDATED_SPECS);
+ this.numActiveSpecs = context.newContextAwareGauge(NUM_ACTIVE_SPECS_NAME, ()->{
+ long startTime = System.currentTimeMillis();
+ int size = specCatalog.getSpecs().size();
+ updateGetSpecTime(startTime);
+ return size;
+ });
+ this.histogramForSpecAdd = context.contextAwareHistogram(HISTOGRAM_FOR_SPEC_ADD, 1, TimeUnit.MINUTES);
+ this.histogramForSpecUpdate = context.contextAwareHistogram(HISTOGRAM_FOR_SPEC_UPDATE, 1, TimeUnit.MINUTES);
+ this.histogramForSpecDelete = context.contextAwareHistogram(HISTOGRAM_FOR_SPEC_DELETE, 1, TimeUnit.MINUTES);
+ }
+
+ public void updateGetSpecTime(long startTime) {
+ log.info("updateGetSpecTime...");
+ Instrumented.updateTimer(Optional.of(this.timeForSpecCatalogGet), System.currentTimeMillis() - startTime, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public Collection<ContextAwareGauge<?>> getGauges() {
+ return Collections.singleton(this.numActiveSpecs);
+ }
- public StandardMetrics(final SpecCatalog parent) {
- this.numAddedSpecs = parent.getMetricContext().contextAwareCounter(NUM_ADDED_SPECS);
- this.numDeletedSpecs = parent.getMetricContext().contextAwareCounter(NUM_DELETED_SPECS);
- this.numUpdatedSpecs = parent.getMetricContext().contextAwareCounter(NUM_UPDATED_SPECS);
- this.numActiveSpecs = parent.getMetricContext().newContextAwareGauge(NUM_ACTIVE_SPECS_NAME,
- new Gauge<Integer>() {
- @Override public Integer getValue() {
- return parent.getSpecs().size();
- }
- });
- parent.addListener(this);
+ @Override
+ public Collection<ContextAwareCounter> getCounters() {
+ return ImmutableList.of(numAddedSpecs, numDeletedSpecs, numUpdatedSpecs);
+ }
+
+ @Override
+ public Collection<ContextAwareHistogram> getHistograms() {
+ return ImmutableList.of(histogramForSpecAdd, histogramForSpecDelete, histogramForSpecUpdate);
+ }
+
+ @Override
+ public Collection<ContextAwareTimer> getTimers() {
+ return ImmutableList.of(this.timeForSpecCatalogGet);
}
@Override public void onAddSpec(Spec addedSpec) {
this.numAddedSpecs.inc();
+ this.histogramForSpecAdd.update(1);
submitTrackingEvent(addedSpec, SPEC_ADDED_OPERATION_TYPE);
}
@@ -100,12 +151,14 @@ public interface SpecCatalog extends SpecCatalogListenersContainer, Instrumentab
@Override
public void onDeleteSpec(URI deletedSpecURI, String deletedSpecVersion) {
this.numDeletedSpecs.inc();
+ this.histogramForSpecDelete.update(1);
submitTrackingEvent(deletedSpecURI, deletedSpecVersion, SPEC_DELETED_OPERATION_TYPE);
}
@Override
public void onUpdateSpec(Spec updatedSpec) {
this.numUpdatedSpecs.inc();
+ this.histogramForSpecUpdate.update(1);
submitTrackingEvent(updatedSpec, SPEC_UPDATED_OPERATION_TYPE);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1be74552/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/FSJobCatalog.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/FSJobCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/FSJobCatalog.java
index be06b1e..8d89b97 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/FSJobCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/FSJobCatalog.java
@@ -17,6 +17,7 @@
package org.apache.gobblin.runtime.job_catalog;
import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.JobCatalog;
import org.apache.gobblin.runtime.api.JobCatalogWithTemplates;
import org.apache.gobblin.runtime.api.JobTemplate;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
@@ -62,7 +63,7 @@ public class FSJobCatalog extends ImmutableFSJobCatalog implements MutableJobCat
private static final Logger LOGGER = LoggerFactory.getLogger(FSJobCatalog.class);
public static final String CONF_EXTENSION = ".conf";
private static final String FS_SCHEME = "FS";
-
+ private final MutableStandardMetrics mutableMetrics;
/**
* Initialize the JobCatalog, fetch all jobs in jobConfDirPath.
* @param sysConfig
@@ -71,15 +72,24 @@ public class FSJobCatalog extends ImmutableFSJobCatalog implements MutableJobCat
public FSJobCatalog(Config sysConfig)
throws IOException {
super(sysConfig);
+ this.mutableMetrics = (MutableStandardMetrics)metrics;
}
public FSJobCatalog(GobblinInstanceEnvironment env) throws IOException {
super(env);
+ this.mutableMetrics = (MutableStandardMetrics)metrics;
}
public FSJobCatalog(Config sysConfig, Optional<MetricContext> parentMetricContext,
boolean instrumentationEnabled) throws IOException{
super(sysConfig, null, parentMetricContext, instrumentationEnabled);
+ this.mutableMetrics = (MutableStandardMetrics)metrics;
+ }
+
+ @Override
+ protected JobCatalog.StandardMetrics createStandardMetrics() {
+ log.info("create standard metrics {} for {}", MutableStandardMetrics.class.getName(), this.getClass().getName());
+ return new MutableStandardMetrics(this);
}
/**
@@ -94,6 +104,7 @@ public class FSJobCatalog extends ImmutableFSJobCatalog implements MutableJobCat
protected FSJobCatalog(Config sysConfig, PathAlterationObserver observer)
throws IOException {
super(sysConfig, observer);
+ this.mutableMetrics = (MutableStandardMetrics)this.metrics;
}
/**
@@ -108,8 +119,10 @@ public class FSJobCatalog extends ImmutableFSJobCatalog implements MutableJobCat
Preconditions.checkState(state() == State.RUNNING, String.format("%s is not running.", this.getClass().getName()));
Preconditions.checkNotNull(jobSpec);
try {
+ long startTime = System.currentTimeMillis();
Path jobSpecPath = getPathForURI(this.jobConfDirPath, jobSpec.getUri());
materializedJobSpec(jobSpecPath, jobSpec, this.fs);
+ this.mutableMetrics.updatePutJobTime(startTime);
} catch (IOException e) {
throw new RuntimeException("When persisting a new JobSpec, unexpected issues happen:" + e.getMessage());
} catch (JobSpecNotFoundException e) {
@@ -126,10 +139,12 @@ public class FSJobCatalog extends ImmutableFSJobCatalog implements MutableJobCat
public synchronized void remove(URI jobURI) {
Preconditions.checkState(state() == State.RUNNING, String.format("%s is not running.", this.getClass().getName()));
try {
+ long startTime = System.currentTimeMillis();
Path jobSpecPath = getPathForURI(this.jobConfDirPath, jobURI);
if (fs.exists(jobSpecPath)) {
fs.delete(jobSpecPath, false);
+ this.mutableMetrics.updateRemoveJobTime(startTime);
} else {
LOGGER.warn("No file with URI:" + jobSpecPath + " is found. Deletion failed.");
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1be74552/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/JobCatalogBase.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/JobCatalogBase.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/JobCatalogBase.java
index ec33226..d40c962 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/JobCatalogBase.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/JobCatalogBase.java
@@ -18,6 +18,7 @@
package org.apache.gobblin.runtime.job_catalog;
import java.io.IOException;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -68,7 +69,8 @@ public abstract class JobCatalogBase extends AbstractIdleService implements JobC
MetricContext realParentCtx =
parentMetricContext.or(Instrumented.getMetricContext(new org.apache.gobblin.configuration.State(), getClass()));
this.metricContext = realParentCtx.childBuilder(JobCatalog.class.getSimpleName()).build();
- this.metrics = new StandardMetrics(this);
+ this.metrics = createStandardMetrics();
+ this.addListener(this.metrics);
}
else {
this.metricContext = null;
@@ -76,6 +78,10 @@ public abstract class JobCatalogBase extends AbstractIdleService implements JobC
}
}
+ protected StandardMetrics createStandardMetrics() {
+ return new StandardMetrics(this);
+ }
+
@Override
protected void startUp() throws IOException {
notifyAllListeners();
@@ -87,11 +93,19 @@ public abstract class JobCatalogBase extends AbstractIdleService implements JobC
}
protected void notifyAllListeners() {
- for (JobSpec jobSpec : getJobs()) {
+ Collection<JobSpec> jobSpecs = getJobsWithTimeUpdate();
+ for (JobSpec jobSpec : jobSpecs) {
this.listeners.onAddJob(jobSpec);
}
}
+ private Collection<JobSpec> getJobsWithTimeUpdate() {
+ long startTime = System.currentTimeMillis();
+ Collection<JobSpec> jobSpecs = getJobs();
+ this.metrics.updateGetJobTime(startTime);
+ return jobSpecs;
+ }
+
/**{@inheritDoc}*/
@Override
public synchronized void addListener(JobCatalogListener jobListener) {
@@ -99,7 +113,7 @@ public abstract class JobCatalogBase extends AbstractIdleService implements JobC
this.listeners.addListener(jobListener);
if (state() == State.RUNNING) {
- for (JobSpec jobSpec : getJobs()) {
+ for (JobSpec jobSpec : getJobsWithTimeUpdate()) {
JobCatalogListener.AddJobCallback addJobCallback = new JobCatalogListener.AddJobCallback(jobSpec);
this.listeners.callbackOneListener(addJobCallback, jobListener);
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1be74552/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/StaticJobCatalog.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/StaticJobCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/StaticJobCatalog.java
index 80d5955..091984f 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/StaticJobCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/StaticJobCatalog.java
@@ -68,6 +68,9 @@ public class StaticJobCatalog extends JobCatalogBase {
return mapBuilder.build();
}
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings(
+ value = "UR_UNINIT_READ_CALLED_FROM_SUPER_CONSTRUCTOR",
+ justification = "Uninitialized variable has been checked.")
@Override
public void addListener(JobCatalogListener jobListener) {
if (this.jobs == null) {
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1be74552/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
index ecfe036..482825f 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
@@ -61,7 +61,7 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
protected final SpecCatalogListenersList listeners;
protected final Logger log;
protected final MetricContext metricContext;
- protected final FlowCatalog.StandardMetrics metrics;
+ protected final MutableStandardMetrics metrics;
protected final SpecStore specStore;
private final ClassAliasResolver<SpecStore> aliasResolver;
@@ -87,7 +87,8 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
MetricContext realParentCtx =
parentMetricContext.or(Instrumented.getMetricContext(new org.apache.gobblin.configuration.State(), getClass()));
this.metricContext = realParentCtx.childBuilder(FlowCatalog.class.getSimpleName()).build();
- this.metrics = new StandardMetrics(this);
+ this.metrics = new MutableStandardMetrics(this);
+ this.addListener(this.metrics);
}
else {
this.metricContext = null;
@@ -133,7 +134,7 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
/**************************************************/
protected void notifyAllListeners() {
- for (Spec spec : getSpecs()) {
+ for (Spec spec : getSpecsWithTimeUpdate()) {
this.listeners.onAddSpec(spec);
}
}
@@ -144,7 +145,7 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
this.listeners.addListener(specListener);
if (state() == State.RUNNING) {
- for (Spec spec : getSpecs()) {
+ for (Spec spec : getSpecsWithTimeUpdate()) {
SpecCatalogListener.AddSpecCallback addJobCallback = new SpecCatalogListener.AddSpecCallback(spec);
this.listeners.callbackOneListener(addJobCallback, specListener);
}
@@ -192,7 +193,7 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
}
@Override
- public StandardMetrics getMetrics() {
+ public SpecCatalog.StandardMetrics getMetrics() {
return this.metrics;
}
@@ -209,6 +210,17 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
}
}
+ public Collection<Spec> getSpecsWithTimeUpdate() {
+ try {
+ long startTime = System.currentTimeMillis();
+ Collection<Spec> specs = specStore.getSpecs();
+ this.metrics.updateGetSpecTime(startTime);
+ return specs;
+ } catch (IOException e) {
+ throw new RuntimeException("Cannot retrieve Specs from Spec store", e);
+ }
+ }
+
public boolean exists(URI uri) {
try {
return specStore.exists(uri);
@@ -232,9 +244,11 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
Preconditions.checkState(state() == State.RUNNING, String.format("%s is not running.", this.getClass().getName()));
Preconditions.checkNotNull(spec);
+ long startTime = System.currentTimeMillis();
log.info(String.format("Adding FlowSpec with URI: %s and Config: %s", spec.getUri(),
((FlowSpec) spec).getConfigAsProperties()));
specStore.addSpec(spec);
+ metrics.updatePutSpecTime(startTime);
this.listeners.onAddSpec(spec);
} catch (IOException e) {
throw new RuntimeException("Cannot add Spec to Spec store: " + spec, e);
@@ -246,9 +260,10 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
try {
Preconditions.checkState(state() == State.RUNNING, String.format("%s is not running.", this.getClass().getName()));
Preconditions.checkNotNull(uri);
-
+ long startTime = System.currentTimeMillis();
log.info(String.format("Removing FlowSpec with URI: %s", uri));
specStore.deleteSpec(uri);
+ this.metrics.updateRemoveSpecTime(startTime);
this.listeners.onDeleteSpec(uri, FlowSpec.Builder.DEFAULT_VERSION);
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1be74552/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
index 7bb8b9c..c334d2b 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
@@ -95,6 +95,7 @@ public class TopologyCatalog extends AbstractIdleService implements SpecCatalog,
parentMetricContext.or(Instrumented.getMetricContext(new org.apache.gobblin.configuration.State(), getClass()));
this.metricContext = realParentCtx.childBuilder(TopologyCatalog.class.getSimpleName()).build();
this.metrics = new SpecCatalog.StandardMetrics(this);
+ this.addListener(this.metrics);
}
else {
this.metricContext = null;
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1be74552/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
index a13ed28..2cbb113 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
@@ -17,6 +17,12 @@
package org.apache.gobblin.service.modules.core;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.instrumented.StandardMetricsBridge;
+import org.apache.gobblin.metrics.ContextAwareHistogram;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.service.FlowId;
import org.apache.gobblin.service.Schedule;
import java.io.IOException;
@@ -28,7 +34,9 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nonnull;
import lombok.Getter;
import org.apache.commons.cli.CommandLine;
@@ -37,7 +45,6 @@ import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.lang3.reflect.ConstructorUtils;
-import org.apache.gobblin.util.logs.Log4jConfigurationHelper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -55,6 +62,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Splitter;
import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
import com.google.common.eventbus.EventBus;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -94,7 +102,7 @@ import org.apache.gobblin.util.ConfigUtils;
@Alpha
-public class GobblinServiceManager implements ApplicationLauncher {
+public class GobblinServiceManager implements ApplicationLauncher, StandardMetricsBridge{
private static final Logger LOGGER = LoggerFactory.getLogger(GobblinServiceManager.class);
@@ -134,6 +142,9 @@ public class GobblinServiceManager implements ApplicationLauncher {
@Getter
protected Config config;
+ private final MetricContext metricContext;
+ private final Metrics metrics;
+
public GobblinServiceManager(String serviceName, String serviceId, Config config,
Optional<Path> serviceWorkDirOptional) throws Exception {
@@ -143,7 +154,8 @@ public class GobblinServiceManager implements ApplicationLauncher {
properties.setProperty(ServiceBasedAppLauncher.APP_STOP_TIME_SECONDS, Long.toString(300));
}
this.config = config;
-
+ this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(config), this.getClass());
+ this.metrics = new Metrics(this.metricContext);
this.serviceId = serviceId;
this.serviceLauncher = new ServiceBasedAppLauncher(properties, serviceName);
@@ -435,6 +447,50 @@ public class GobblinServiceManager implements ApplicationLauncher {
this.serviceLauncher.close();
}
+ @Override
+ public StandardMetrics getStandardMetrics() {
+ return this.metrics;
+ }
+
+ @Nonnull
+ @Override
+ public MetricContext getMetricContext() {
+ return this.metricContext;
+ }
+
+ @Override
+ public boolean isInstrumentationEnabled() {
+ return false;
+ }
+
+ @Override
+ public List<Tag<?>> generateTags(State state) {
+ return null;
+ }
+
+ @Override
+ public void switchMetricContext(List<Tag<?>> tags) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void switchMetricContext(MetricContext context) {
+ throw new UnsupportedOperationException();
+ }
+
+ private class Metrics extends StandardMetrics {
+ public static final String SERVICE_LEADERSHIP_CHANGE = "serviceLeadershipChange";
+ private ContextAwareHistogram serviceLeadershipChange;
+ public Metrics(final MetricContext metricContext) {
+ serviceLeadershipChange = metricContext.contextAwareHistogram(SERVICE_LEADERSHIP_CHANGE, 1, TimeUnit.MINUTES);
+ }
+
+ @Override
+ public Collection<ContextAwareHistogram> getHistograms() {
+ return ImmutableList.of(this.serviceLeadershipChange);
+ }
+ }
+
/**
* A custom {@link MessageHandlerFactory} for {@link ControllerUserDefinedMessageHandler}s that
* handle messages of type {@link org.apache.helix.model.Message.MessageType#USER_DEFINE_MSG}.
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1be74552/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index 0c45daf..5c26445 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -109,7 +109,7 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
// the onAddSpec will forward specs to the leader, which is itself.
this.isActive = isActive;
if (this.flowCatalog.isPresent()) {
- Collection<Spec> specs = this.flowCatalog.get().getSpecs();
+ Collection<Spec> specs = this.flowCatalog.get().getSpecsWithTimeUpdate();
for (Spec spec : specs) {
onAddSpec(spec);
}