You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2017/12/07 23:28:24 UTC

incubator-gobblin git commit: [GOBBLIN-326] Add more metrics for GaaS and Gobblin Cluster

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 35b894c2c -> 1be745524


[GOBBLIN-326] Add more metrics for GaaS and Gobblin Cluster

Closes #2178 from yukuai518/timer


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/1be74552
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/1be74552
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/1be74552

Branch: refs/heads/master
Commit: 1be7455246d5c35900c8715c32559bd75b6c7bde
Parents: 35b894c
Author: Kuai Yu <ku...@linkedin.com>
Authored: Thu Dec 7 15:28:16 2017 -0800
Committer: Hung Tran <hu...@linkedin.com>
Committed: Thu Dec 7 15:28:16 2017 -0800

----------------------------------------------------------------------
 .../gobblin/cluster/GobblinClusterManager.java  | 68 +++++++++++++++-
 .../cluster/GobblinHelixJobScheduler.java       | 85 +++++++++++---------
 .../instrumented/StandardMetricsBridge.java     | 35 ++++++--
 .../gobblin/metrics/ContextAwareHistogram.java  |  8 ++
 .../metrics/ContextAwareMetricFactory.java      | 23 +++++-
 .../metrics/ContextAwareMetricFactoryArgs.java  | 47 +++++++++++
 .../gobblin/metrics/ContextAwareTimer.java      |  9 +++
 .../apache/gobblin/metrics/InnerHistogram.java  | 17 ++++
 .../gobblin/metrics/InnerMetricContext.java     | 17 ++++
 .../org/apache/gobblin/metrics/InnerTimer.java  | 14 ++++
 .../apache/gobblin/metrics/MetricContext.java   | 27 ++++---
 .../apache/gobblin/runtime/api/JobCatalog.java  | 77 ++++++++++--------
 .../runtime/api/JobExecutionLauncher.java       |  8 ++
 .../gobblin/runtime/api/MutableJobCatalog.java  | 43 ++++++++++
 .../gobblin/runtime/api/MutableSpecCatalog.java | 42 ++++++++++
 .../apache/gobblin/runtime/api/SpecCatalog.java | 85 ++++++++++++++++----
 .../runtime/job_catalog/FSJobCatalog.java       | 17 +++-
 .../runtime/job_catalog/JobCatalogBase.java     | 20 ++++-
 .../runtime/job_catalog/StaticJobCatalog.java   |  3 +
 .../runtime/spec_catalog/FlowCatalog.java       | 27 +++++--
 .../runtime/spec_catalog/TopologyCatalog.java   |  1 +
 .../modules/core/GobblinServiceManager.java     | 62 +++++++++++++-
 .../scheduler/GobblinServiceJobScheduler.java   |  2 +-
 23 files changed, 615 insertions(+), 122 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1be74552/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
index 7948a8a..6b53c6c 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
@@ -20,6 +20,8 @@ package org.apache.gobblin.cluster;
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -35,6 +37,12 @@ import org.apache.commons.cli.HelpFormatter;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 import org.apache.commons.lang.StringUtils;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.instrumented.StandardMetricsBridge;
+import org.apache.gobblin.metrics.ContextAwareHistogram;
+import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.MetricContext;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -83,9 +91,9 @@ import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher;
 import org.apache.gobblin.scheduler.SchedulerService;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.JvmUtils;
-import org.apache.gobblin.util.logs.Log4jConfigurationHelper;
 import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 
+import javax.annotation.Nonnull;
 import lombok.Getter;
 
 
@@ -112,7 +120,7 @@ import lombok.Getter;
  * @author Yinan Li
  */
 @Alpha
-public class GobblinClusterManager implements ApplicationLauncher {
+public class GobblinClusterManager implements ApplicationLauncher, StandardMetricsBridge {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(GobblinClusterManager.class);
 
@@ -148,12 +156,14 @@ public class GobblinClusterManager implements ApplicationLauncher {
   private GobblinHelixJobScheduler jobScheduler;
   private final String clusterName;
   private final Config config;
-
+  private final MetricContext metricContext;
+  private final Metrics metrics;
   public GobblinClusterManager(String clusterName, String applicationId, Config config,
       Optional<Path> appWorkDirOptional) throws Exception {
     this.clusterName = clusterName;
     this.config = config;
-
+    this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(config), this.getClass());
+    this.metrics = new Metrics(this.metricContext);
     this.isStandaloneMode = ConfigUtils.getBoolean(config, GobblinClusterConfigurationKeys.STANDALONE_CLUSTER_MODE_KEY,
         GobblinClusterConfigurationKeys.DEFAULT_STANDALONE_CLUSTER_MODE);
 
@@ -239,6 +249,7 @@ public class GobblinClusterManager implements ApplicationLauncher {
    */
   @VisibleForTesting
   void handleLeadershipChange(NotificationContext changeContext) {
+    this.metrics.clusterLeadershipChange.update(1);
     if (this.helixManager.isLeader()) {
       // can get multiple notifications on a leadership change, so only start the application launcher the first time
       // the notification is received
@@ -531,6 +542,37 @@ public class GobblinClusterManager implements ApplicationLauncher {
     this.applicationLauncher.close();
   }
 
+  @Override
+  public StandardMetrics getStandardMetrics() {
+    return this.metrics;
+  }
+
+  @Nonnull
+  @Override
+  public MetricContext getMetricContext() {
+    return this.metricContext;
+  }
+
+  @Override
+  public boolean isInstrumentationEnabled() {
+    return GobblinMetrics.isEnabled(ConfigUtils.configToProperties(this.config));
+  }
+
+  @Override
+  public List<Tag<?>> generateTags(State state) {
+    return ImmutableList.of();
+  }
+
+  @Override
+  public void switchMetricContext(List<Tag<?>> tags) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void switchMetricContext(MetricContext context) {
+    throw new UnsupportedOperationException();
+  }
+
   /**
    * A custom implementation of {@link LiveInstanceChangeListener}.
    */
@@ -544,6 +586,24 @@ public class GobblinClusterManager implements ApplicationLauncher {
     }
   }
 
+  private class Metrics extends StandardMetrics {
+    public static final String CLUSTER_LEADERSHIP_CHANGE = "clusterLeadershipChange";
+    private ContextAwareHistogram clusterLeadershipChange;
+    public Metrics(final MetricContext metricContext) {
+      clusterLeadershipChange = metricContext.contextAwareHistogram(CLUSTER_LEADERSHIP_CHANGE, 1, TimeUnit.MINUTES);
+    }
+
+    @Override
+    public String getName() {
+      return GobblinClusterManager.class.getName();
+    }
+
+    @Override
+    public Collection<ContextAwareHistogram> getHistograms() {
+      return ImmutableList.of(this.clusterLeadershipChange);
+    }
+  }
+
   /**
    * A custom {@link MessageHandlerFactory} for {@link MessageHandler}s that handle messages of type
    * "SHUTDOWN" for shutting down the controller.

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1be74552/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
index 3a25df7..9fd5add 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
@@ -19,7 +19,6 @@ package org.apache.gobblin.cluster;
 
 import java.net.URI;
 import java.net.URISyntaxException;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -28,12 +27,13 @@ import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metrics.ContextAwareHistogram;
 import org.apache.hadoop.fs.Path;
 import org.apache.helix.HelixManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.codahale.metrics.Gauge;
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
@@ -50,8 +50,6 @@ import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.instrumented.StandardMetricsBridge;
 import org.apache.gobblin.metrics.ContextAwareCounter;
 import org.apache.gobblin.metrics.ContextAwareGauge;
-import org.apache.gobblin.metrics.ContextAwareHistogram;
-import org.apache.gobblin.metrics.ContextAwareMeter;
 import org.apache.gobblin.metrics.ContextAwareTimer;
 import org.apache.gobblin.metrics.GobblinMetrics;
 import org.apache.gobblin.metrics.MetricContext;
@@ -69,7 +67,6 @@ import org.apache.gobblin.scheduler.SchedulerService;
 
 
 import javax.annotation.Nonnull;
-import lombok.AllArgsConstructor;
 
 
 /**
@@ -96,7 +93,7 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe
   private final ConcurrentHashMap<String, Boolean> jobRunningMap;
   private final MutableJobCatalog jobCatalog;
   private final MetricContext metricContext;
-  private final InnerStandardMetrics metrics;
+  private final Metrics metrics;
 
   public GobblinHelixJobScheduler(Properties properties, HelixManager helixManager, EventBus eventBus,
       Path appWorkDir, List<? extends Tag<?>> metadataTags, SchedulerService schedulerService,
@@ -109,16 +106,8 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe
     this.appWorkDir = appWorkDir;
     this.metadataTags = metadataTags;
     this.jobCatalog = jobCatalog;
-    this.metricContext = getDefaultMetricContext(properties);
-    this.metrics = new InnerStandardMetrics(this.metricContext);
-  }
-
-  public MetricContext getDefaultMetricContext(Properties properties) {
-    org.apache.gobblin.configuration.State fakeState =
-        new org.apache.gobblin.configuration.State(properties);
-    List<Tag<?>> tags = new ArrayList<>();
-    MetricContext res = Instrumented.getMetricContext(fakeState, GobblinHelixJobScheduler.class, tags);
-    return res;
+    this.metricContext = Instrumented.getMetricContext(new org.apache.gobblin.configuration.State(properties), this.getClass());
+    this.metrics = new Metrics(this.metricContext);
   }
 
   @Nonnull
@@ -148,36 +137,61 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe
   }
 
   @Override
-  public StandardMetricsBridge.StandardMetrics getStandardMetrics() {
+  public StandardMetrics getStandardMetrics() {
     return metrics;
   }
 
-  private class InnerStandardMetrics implements StandardMetrics {
+  private class Metrics extends StandardMetrics {
 
     private final ContextAwareCounter numJobsLaunched;
     private final ContextAwareCounter numJobsCompleted;
     private final ContextAwareCounter numJobsCommitted;
     private final ContextAwareCounter numJobsFailed;
     private final ContextAwareCounter numJobsCancelled;
+    private final ContextAwareHistogram histogramJobsLaunched;
+    private final ContextAwareHistogram histogramJobsCompleted;
+    private final ContextAwareHistogram histogramJobsCommitted;
+    private final ContextAwareHistogram histogramJobsFailed;
+    private final ContextAwareHistogram histogramJobsCancelled;
+
     private final ContextAwareGauge<Integer> numJobsRunning;
     private final ContextAwareTimer timeForJobCompletion;
     private final ContextAwareTimer timeForJobFailure;
+    private final ContextAwareTimer timeBeforeJobScheduling;
+    private final ContextAwareTimer timeBeforeJobLaunching;
 
-    public InnerStandardMetrics(final MetricContext metricContext) {
+    public Metrics(final MetricContext metricContext) {
+      // All historical counters
       this.numJobsLaunched = metricContext.contextAwareCounter(JobExecutionLauncher.StandardMetrics.NUM_JOBS_LAUNCHED_COUNTER);
       this.numJobsCompleted = metricContext.contextAwareCounter(JobExecutionLauncher.StandardMetrics.NUM_JOBS_COMPLETED_COUNTER);
       this.numJobsCommitted = metricContext.contextAwareCounter(JobExecutionLauncher.StandardMetrics.NUM_JOBS_COMMITTED_COUNTER);
       this.numJobsFailed = metricContext.contextAwareCounter(JobExecutionLauncher.StandardMetrics.NUM_JOBS_FAILED_COUNTER);
       this.numJobsCancelled = metricContext.contextAwareCounter(JobExecutionLauncher.StandardMetrics.NUM_JOBS_CANCELLED_COUNTER);
+
+      // Counters within last 1 minute
+      this.histogramJobsLaunched = metricContext.contextAwareHistogram(JobExecutionLauncher.StandardMetrics.NUM_JOBS_LAUNCHED_HISTOGRAM, 1, TimeUnit.MINUTES);
+      this.histogramJobsCompleted = metricContext.contextAwareHistogram(JobExecutionLauncher.StandardMetrics.NUM_JOBS_COMPLETED_HISTOGRAM, 1, TimeUnit.MINUTES);
+      this.histogramJobsCommitted = metricContext.contextAwareHistogram(JobExecutionLauncher.StandardMetrics.NUM_JOBS_COMMITTED_HISTOGRAM, 1, TimeUnit.MINUTES);
+      this.histogramJobsFailed = metricContext.contextAwareHistogram(JobExecutionLauncher.StandardMetrics.NUM_JOBS_FAILED_HISTOGRAM, 1, TimeUnit.MINUTES);
+      this.histogramJobsCancelled = metricContext.contextAwareHistogram(JobExecutionLauncher.StandardMetrics.NUM_JOBS_CANCELLED_HISTOGRAM, 1, TimeUnit.MINUTES);
+
       this.numJobsRunning = metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_RUNNING_GAUGE,
-          new Gauge<Integer>() {
-            @Override public Integer getValue() {
-              return (int)(InnerStandardMetrics.this.numJobsLaunched.getCount() -
-                  InnerStandardMetrics.this.numJobsCompleted.getCount());
-            }
-          });
-      this.timeForJobCompletion = metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_FOR_JOB_COMPLETION);
-      this.timeForJobFailure = metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_FOR_JOB_FAILURE);
+          ()->(int)(Metrics.this.numJobsLaunched.getCount() - Metrics.this.numJobsCompleted.getCount()));
+
+      this.timeForJobCompletion = metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_FOR_JOB_COMPLETION, 1, TimeUnit.MINUTES);
+      this.timeForJobFailure = metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_FOR_JOB_FAILURE,1, TimeUnit.MINUTES);
+      this.timeBeforeJobScheduling = metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_BEFORE_JOB_SCHEDULING, 1, TimeUnit.MINUTES);
+      this.timeBeforeJobLaunching = metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_BEFORE_JOB_LAUNCHING, 1, TimeUnit.MINUTES);
+    }
+
+    private void updateTimeBeforeJobScheduling (Properties jobConfig) {
+      long jobCreationTime = Long.parseLong(jobConfig.getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, "0"));
+      Instrumented.updateTimer(Optional.of(timeBeforeJobScheduling), System.currentTimeMillis() - jobCreationTime, TimeUnit.MILLISECONDS);
+    }
+
+    private void updateTimeBeforeJobLaunching (Properties jobConfig) {
+      long jobCreationTime = Long.parseLong(jobConfig.getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, "0"));
+      Instrumented.updateTimer(Optional.of(timeBeforeJobLaunching), System.currentTimeMillis() - jobCreationTime, TimeUnit.MILLISECONDS);
     }
 
     @Override
@@ -202,25 +216,20 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe
     }
 
     @Override
-    public Collection<ContextAwareMeter> getMeters() {
-      return null;
-    }
-
-    @Override
     public Collection<ContextAwareTimer> getTimers() {
-      return ImmutableList.of(timeForJobCompletion, timeForJobFailure);
+      return ImmutableList.of(timeForJobCompletion, timeForJobFailure, timeBeforeJobScheduling, timeBeforeJobLaunching);
     }
 
     @Override
     public Collection<ContextAwareHistogram> getHistograms() {
-      return null;
+      return ImmutableList.of(histogramJobsCompleted, histogramJobsLaunched, histogramJobsFailed, histogramJobsCancelled, histogramJobsCommitted);
     }
   }
 
   private class MetricsTrackingListener extends AbstractJobListener {
-    private final InnerStandardMetrics metrics;
+    private final Metrics metrics;
     private static final String START_TIME = "startTime";
-    MetricsTrackingListener(InnerStandardMetrics metrics) {
+    MetricsTrackingListener(Metrics metrics) {
       this.metrics = metrics;
     }
 
@@ -308,6 +317,9 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe
       Properties jobConfig = new Properties();
       jobConfig.putAll(this.properties);
       jobConfig.putAll(newJobArrival.getJobConfig());
+
+      metrics.updateTimeBeforeJobScheduling(jobConfig);
+
       if (jobConfig.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
         LOGGER.info("Scheduling job " + newJobArrival.getJobName());
         scheduleJob(jobConfig, new MetricsTrackingListener(metrics));
@@ -365,6 +377,7 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe
     @Override
     public void run() {
       try {
+        ((MetricsTrackingListener)jobListener).metrics.updateTimeBeforeJobLaunching(this.jobConfig);
         GobblinHelixJobScheduler.this.runJob(this.jobConfig, this.jobListener);
 
         // remove non-scheduled job catalog once done so it won't be re-executed

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1be74552/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/StandardMetricsBridge.java
----------------------------------------------------------------------
diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/StandardMetricsBridge.java b/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/StandardMetricsBridge.java
index 087cf39..3993dce 100644
--- a/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/StandardMetricsBridge.java
+++ b/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/StandardMetricsBridge.java
@@ -25,6 +25,9 @@ import org.apache.gobblin.metrics.ContextAwareHistogram;
 import org.apache.gobblin.metrics.ContextAwareMeter;
 import org.apache.gobblin.metrics.ContextAwareTimer;
 
+import com.google.common.collect.ImmutableList;
+
+
 /**
  * This interface indicates a class will expose its metrics to some external systems.
  */
@@ -32,12 +35,30 @@ public interface StandardMetricsBridge extends Instrumentable {
 
   StandardMetrics getStandardMetrics();
 
-  interface StandardMetrics {
-    String getName();
-    Collection<ContextAwareGauge<?>> getGauges();
-    Collection<ContextAwareCounter> getCounters();
-    Collection<ContextAwareMeter> getMeters();
-    Collection<ContextAwareTimer> getTimers();
-    Collection<ContextAwareHistogram> getHistograms();
+  public class StandardMetrics {
+
+    public String getName() {
+      return this.getClass().getName();
+    }
+
+    public Collection<ContextAwareGauge<?>> getGauges() {
+      return ImmutableList.of();
+    }
+
+    public Collection<ContextAwareCounter> getCounters() {
+      return ImmutableList.of();
+    }
+
+    public Collection<ContextAwareMeter> getMeters() {
+      return ImmutableList.of();
+    }
+
+    public Collection<ContextAwareTimer> getTimers() {
+      return ImmutableList.of();
+    }
+
+    public Collection<ContextAwareHistogram> getHistograms() {
+      return ImmutableList.of();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1be74552/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/ContextAwareHistogram.java
----------------------------------------------------------------------
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/ContextAwareHistogram.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/ContextAwareHistogram.java
index a940715..a45f928 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/ContextAwareHistogram.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/ContextAwareHistogram.java
@@ -17,10 +17,13 @@
 
 package org.apache.gobblin.metrics;
 
+import java.util.concurrent.TimeUnit;
+
 import lombok.experimental.Delegate;
 
 import com.codahale.metrics.ExponentiallyDecayingReservoir;
 import com.codahale.metrics.Histogram;
+import com.codahale.metrics.SlidingTimeWindowReservoir;
 
 import org.apache.gobblin.metrics.metric.InnerMetric;
 
@@ -52,7 +55,12 @@ public class ContextAwareHistogram extends Histogram implements ContextAwareMetr
     super(new ExponentiallyDecayingReservoir());
     this.innerHistogram = new InnerHistogram(context, name, this);
     this.context = context;
+  }
 
+  ContextAwareHistogram(MetricContext context, String name, long windowSize, TimeUnit unit) {
+    super(new SlidingTimeWindowReservoir(windowSize, unit));
+    this.innerHistogram = new InnerHistogram(context, name, this, windowSize, unit);
+    this.context = context;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1be74552/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/ContextAwareMetricFactory.java
----------------------------------------------------------------------
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/ContextAwareMetricFactory.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/ContextAwareMetricFactory.java
index d699b06..e9f6d62 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/ContextAwareMetricFactory.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/ContextAwareMetricFactory.java
@@ -23,7 +23,6 @@ import com.codahale.metrics.Meter;
 import com.codahale.metrics.Metric;
 import com.codahale.metrics.Timer;
 
-
 /**
  * An interface for factory classes for {@link ContextAwareMetric}s.
  *
@@ -49,6 +48,10 @@ public interface ContextAwareMetricFactory<T extends ContextAwareMetric> {
    */
   public T newMetric(MetricContext context, String name);
 
+  default public T newMetric(ContextAwareMetricFactoryArgs args) {
+    return null;
+  }
+
   /**
    * Check if a given metric is an instance of the type of context-aware metrics created by this
    * {@link ContextAwareMetricFactory}.
@@ -102,6 +105,15 @@ public interface ContextAwareMetricFactory<T extends ContextAwareMetric> {
     }
 
     @Override
+    public ContextAwareHistogram newMetric(ContextAwareMetricFactoryArgs args) {
+      if (args instanceof ContextAwareMetricFactoryArgs.SlidingTimeWindowArgs) {
+        ContextAwareMetricFactoryArgs.SlidingTimeWindowArgs windowArgs = (ContextAwareMetricFactoryArgs.SlidingTimeWindowArgs)args;
+        return new ContextAwareHistogram(windowArgs.getContext(), windowArgs.getName(), windowArgs.getWindowSize(), windowArgs.getUnit());
+      }
+      throw new UnsupportedOperationException("Unknown factory arguments to create ContextAwareHistogram");
+    }
+
+    @Override
     public boolean isInstance(Metric metric) {
       return Histogram.class.isInstance(metric);
     }
@@ -118,6 +130,15 @@ public interface ContextAwareMetricFactory<T extends ContextAwareMetric> {
     }
 
     @Override
+    public ContextAwareTimer newMetric(ContextAwareMetricFactoryArgs args) {
+      if (args instanceof ContextAwareMetricFactoryArgs.SlidingTimeWindowArgs) {
+        ContextAwareMetricFactoryArgs.SlidingTimeWindowArgs windowArgs = (ContextAwareMetricFactoryArgs.SlidingTimeWindowArgs)args;
+        return new ContextAwareTimer(windowArgs.getContext(), windowArgs.getName(), windowArgs.getWindowSize(), windowArgs.getUnit());
+      }
+      throw new UnsupportedOperationException("Unknown factory arguments to create ContextAwareTimer");
+    }
+
+    @Override
     public boolean isInstance(Metric metric) {
       return Timer.class.isInstance(metric);
     }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1be74552/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/ContextAwareMetricFactoryArgs.java
----------------------------------------------------------------------
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/ContextAwareMetricFactoryArgs.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/ContextAwareMetricFactoryArgs.java
new file mode 100644
index 0000000..1e7e077
--- /dev/null
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/ContextAwareMetricFactoryArgs.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics;
+
+import java.util.concurrent.TimeUnit;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
+
+/**
+ * A class which wraps all arguments required by {@link ContextAwareMetricFactory}s.
+ *
+ * A concrete {@link ContextAwareMetricFactory} knows how to interpret this class into its corresponding sub-type.
+ */
+@AllArgsConstructor
+@Getter
+public class ContextAwareMetricFactoryArgs {
+  protected final MetricContext context;
+  protected final String name;
+
+  @Getter
+  public static class SlidingTimeWindowArgs extends  ContextAwareMetricFactoryArgs {
+    protected final long windowSize;
+    protected final TimeUnit unit;
+    public SlidingTimeWindowArgs(MetricContext context, String name, long windowSize, TimeUnit unit) {
+      super(context, name);
+      this.windowSize = windowSize;
+      this.unit = unit;
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1be74552/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/ContextAwareTimer.java
----------------------------------------------------------------------
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/ContextAwareTimer.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/ContextAwareTimer.java
index 33d68ed..839df20 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/ContextAwareTimer.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/ContextAwareTimer.java
@@ -17,6 +17,9 @@
 
 package org.apache.gobblin.metrics;
 
+import java.util.concurrent.TimeUnit;
+
+import com.codahale.metrics.SlidingTimeWindowReservoir;
 import com.codahale.metrics.Timer;
 
 import org.apache.gobblin.metrics.metric.InnerMetric;
@@ -51,6 +54,12 @@ public class ContextAwareTimer extends Timer implements ContextAwareMetric {
     this.context = context;
   }
 
+  ContextAwareTimer(MetricContext context, String name, long windowSize, TimeUnit unit) {
+    super(new SlidingTimeWindowReservoir(windowSize, unit));
+    this.innerTimer = new InnerTimer(context, name, this, windowSize, unit);
+    this.context = context;
+  }
+
   @Override
   public MetricContext getContext() {
     return this.context;

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1be74552/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/InnerHistogram.java
----------------------------------------------------------------------
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/InnerHistogram.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/InnerHistogram.java
index 7fb5401..66f8fd2 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/InnerHistogram.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/InnerHistogram.java
@@ -18,9 +18,11 @@
 package org.apache.gobblin.metrics;
 
 import java.lang.ref.WeakReference;
+import java.util.concurrent.TimeUnit;
 
 import com.codahale.metrics.ExponentiallyDecayingReservoir;
 import com.codahale.metrics.Histogram;
+import com.codahale.metrics.SlidingTimeWindowReservoir;
 import com.google.common.base.Optional;
 
 import org.apache.gobblin.metrics.metric.InnerMetric;
@@ -50,6 +52,21 @@ public class InnerHistogram extends Histogram implements InnerMetric {
     this.contextAwareHistogram = new WeakReference<>(contextAwareHistogram);
   }
 
+  InnerHistogram(MetricContext context, String name, ContextAwareHistogram contextAwareHistogram, long windowSize, TimeUnit unit) {
+    super(new SlidingTimeWindowReservoir(windowSize, unit));
+
+    this.name = name;
+
+    Optional<MetricContext> parentContext = context.getParent();
+    if (parentContext.isPresent()) {
+      this.parentHistogram = Optional.fromNullable(parentContext.get().contextAwareHistogram(name, windowSize, unit));
+    } else {
+      this.parentHistogram = Optional.absent();
+    }
+
+    this.contextAwareHistogram = new WeakReference<>(contextAwareHistogram);
+  }
+
   @Override
   public void update(int value) {
     update((long) value);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1be74552/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/InnerMetricContext.java
----------------------------------------------------------------------
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/InnerMetricContext.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/InnerMetricContext.java
index 70502ec..2738974 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/InnerMetricContext.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/InnerMetricContext.java
@@ -368,6 +368,23 @@ public class InnerMetricContext extends MetricRegistry implements ReportableCont
     return newMetric;
   }
 
+  @SuppressWarnings("unchecked")
+  protected synchronized <T extends ContextAwareMetric> T getOrCreate(
+      ContextAwareMetricFactory<T> factory, ContextAwareMetricFactoryArgs args) {
+    String name = args.getName();
+    InnerMetric metric = this.contextAwareMetrics.get(name);
+    if (metric != null) {
+      if (factory.isInstance(metric)) {
+        return (T) metric.getContextAwareMetric();
+      }
+      throw new IllegalArgumentException(name + " is already used for a different type of metric");
+    }
+
+    T newMetric = factory.newMetric(args);
+    this.register(name, newMetric);
+    return newMetric;
+  }
+
   private boolean removeChildrenMetrics(String name) {
     boolean removed = true;
     for (MetricContext child : getChildContextsAsMap().values()) {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1be74552/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/InnerTimer.java
----------------------------------------------------------------------
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/InnerTimer.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/InnerTimer.java
index db4fc0a..be3517f 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/InnerTimer.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/InnerTimer.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.metrics;
 import java.lang.ref.WeakReference;
 import java.util.concurrent.TimeUnit;
 
+import com.codahale.metrics.SlidingTimeWindowReservoir;
 import com.codahale.metrics.Timer;
 import com.google.common.base.Optional;
 
@@ -47,6 +48,19 @@ public class InnerTimer extends Timer implements InnerMetric {
     this.timer = new WeakReference<>(contextAwareTimer);
   }
 
+  InnerTimer(MetricContext context, String name, ContextAwareTimer contextAwareTimer, long windowSize, TimeUnit unit) {
+    super(new SlidingTimeWindowReservoir(windowSize, unit));
+    this.name = name;
+
+    Optional<MetricContext> parentContext = context.getParent();
+    if (parentContext.isPresent()) {
+      this.parentTimer = Optional.fromNullable(parentContext.get().contextAwareTimer(name, windowSize, unit));
+    } else {
+      this.parentTimer = Optional.absent();
+    }
+    this.timer = new WeakReference<>(contextAwareTimer);
+  }
+
   @Override
   public void update(long duration, TimeUnit unit) {
     super.update(duration, unit);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1be74552/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContext.java
----------------------------------------------------------------------
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContext.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContext.java
index dcc1029..46f8ab1 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContext.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContext.java
@@ -465,19 +465,21 @@ public class MetricContext extends MetricRegistry implements ReportableContext,
    * @return the {@link ContextAwareHistogram} with the given name
    */
   public ContextAwareHistogram contextAwareHistogram(String name) {
-    return contextAwareHistogram(name, ContextAwareMetricFactory.DEFAULT_CONTEXT_AWARE_HISTOGRAM_FACTORY);
+    return this.innerMetricContext.getOrCreate(name, ContextAwareMetricFactory.DEFAULT_CONTEXT_AWARE_HISTOGRAM_FACTORY);
   }
 
   /**
-   * Get a {@link ContextAwareHistogram} with a given name.
+   * Get a {@link ContextAwareHistogram} with a given name and a customized {@link com.codahale.metrics.SlidingTimeWindowReservoir}
    *
    * @param name name of the {@link ContextAwareHistogram}
-   * @param factory a {@link ContextAwareMetricFactory} for building {@link ContextAwareHistogram}s
+   * @param windowSize normally the duration of the time window
+   * @param unit the unit of time
    * @return the {@link ContextAwareHistogram} with the given name
    */
-  public ContextAwareHistogram contextAwareHistogram(String name,
-      ContextAwareMetricFactory<ContextAwareHistogram> factory) {
-    return this.innerMetricContext.getOrCreate(name, factory);
+  public ContextAwareHistogram contextAwareHistogram(String name, long windowSize, TimeUnit unit) {
+    ContextAwareMetricFactoryArgs.SlidingTimeWindowArgs args = new ContextAwareMetricFactoryArgs.SlidingTimeWindowArgs(
+        this.innerMetricContext.getMetricContext().get(), name, windowSize, unit);
+    return this.innerMetricContext.getOrCreate(ContextAwareMetricFactory.DEFAULT_CONTEXT_AWARE_HISTOGRAM_FACTORY, args);
   }
 
   /**
@@ -487,18 +489,21 @@ public class MetricContext extends MetricRegistry implements ReportableContext,
    * @return the {@link ContextAwareTimer} with the given name
    */
   public ContextAwareTimer contextAwareTimer(String name) {
-    return contextAwareTimer(name, ContextAwareMetricFactory.DEFAULT_CONTEXT_AWARE_TIMER_FACTORY);
+    return this.innerMetricContext.getOrCreate(name, ContextAwareMetricFactory.DEFAULT_CONTEXT_AWARE_TIMER_FACTORY);
   }
 
   /**
-   * Get a {@link ContextAwareTimer} with a given name.
+   * Get a {@link ContextAwareTimer} with a given name and a customized {@link com.codahale.metrics.SlidingTimeWindowReservoir}
    *
    * @param name name of the {@link ContextAwareTimer}
-   * @param factory a {@link ContextAwareMetricFactory} for building {@link ContextAwareTimer}s
+   * @param windowSize normally the duration of the time window
+   * @param unit the unit of time
    * @return the {@link ContextAwareTimer} with the given name
    */
-  public ContextAwareTimer contextAwareTimer(String name, ContextAwareMetricFactory<ContextAwareTimer> factory) {
-    return this.innerMetricContext.getOrCreate(name, factory);
+  public ContextAwareTimer contextAwareTimer(String name, long windowSize, TimeUnit unit) {
+    ContextAwareMetricFactoryArgs.SlidingTimeWindowArgs args = new ContextAwareMetricFactoryArgs.SlidingTimeWindowArgs(
+        this.innerMetricContext.getMetricContext().get(), name, windowSize, unit);
+    return this.innerMetricContext.getOrCreate(ContextAwareMetricFactory.DEFAULT_CONTEXT_AWARE_TIMER_FACTORY, args);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1be74552/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java
index df1dc44..6c0ea5b 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java
@@ -17,28 +17,30 @@
 package org.apache.gobblin.runtime.api;
 
 import java.net.URI;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 import com.codahale.metrics.Gauge;
+import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.util.concurrent.Service;
 
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.instrumented.GobblinMetricsKeys;
 import org.apache.gobblin.instrumented.Instrumentable;
+import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.instrumented.StandardMetricsBridge;
 import org.apache.gobblin.metrics.ContextAwareCounter;
 import org.apache.gobblin.metrics.ContextAwareGauge;
 import org.apache.gobblin.metrics.ContextAwareHistogram;
-import org.apache.gobblin.metrics.ContextAwareMeter;
 import org.apache.gobblin.metrics.ContextAwareTimer;
 import org.apache.gobblin.metrics.GobblinTrackingEvent;
+import org.apache.gobblin.metrics.MetricContext;
 
 import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
 
 /**
  * A catalog of all the {@link JobSpec}s a Gobblin instance is currently aware of.
@@ -50,10 +52,9 @@ public interface JobCatalog extends JobCatalogListenersContainer, Instrumentable
 
   /** Metrics for the job catalog; null if
    * ({@link #isInstrumentationEnabled()}) is false. */
-  StandardMetrics getMetrics();
+  JobCatalog.StandardMetrics getMetrics();
 
-  @Override
-  default StandardMetrics getStandardMetrics() {
+  default StandardMetricsBridge.StandardMetrics getStandardMetrics() {
     return getMetrics();
   }
 
@@ -63,11 +64,16 @@ public interface JobCatalog extends JobCatalogListenersContainer, Instrumentable
    **/
   JobSpec getJobSpec(URI uri) throws JobSpecNotFoundException;
 
-  public static class StandardMetrics implements JobCatalogListener, StandardMetricsBridge.StandardMetrics {
+  @Slf4j
+  public static class StandardMetrics extends StandardMetricsBridge.StandardMetrics implements JobCatalogListener {
     public static final String NUM_ACTIVE_JOBS_NAME = "numActiveJobs";
     public static final String NUM_ADDED_JOBS = "numAddedJobs";
     public static final String NUM_DELETED_JOBS = "numDeletedJobs";
     public static final String NUM_UPDATED_JOBS = "numUpdatedJobs";
+    public static final String TIME_FOR_JOB_CATALOG_GET = "timeForJobCatalogGet";
+    public static final String HISTOGRAM_FOR_JOB_ADD = "histogramForJobAdd";
+    public static final String HISTOGRAM_FOR_JOB_UPDATE = "histogramForJobUpdate";
+    public static final String HISTOGRAM_FOR_JOB_DELETE = "histogramForJobDelete";
     public static final String TRACKING_EVENT_NAME = "JobCatalogEvent";
     public static final String JOB_ADDED_OPERATION_TYPE = "JobAdded";
     public static final String JOB_DELETED_OPERATION_TYPE = "JobDeleted";
@@ -77,22 +83,36 @@ public interface JobCatalog extends JobCatalogListenersContainer, Instrumentable
     @Getter private final ContextAwareCounter numAddedJobs;
     @Getter private final ContextAwareCounter numDeletedJobs;
     @Getter private final ContextAwareCounter numUpdatedJobs;
-
-    public StandardMetrics(final JobCatalog parent) {
-      this.numAddedJobs = parent.getMetricContext().contextAwareCounter(NUM_ADDED_JOBS);
-      this.numDeletedJobs = parent.getMetricContext().contextAwareCounter(NUM_DELETED_JOBS);
-      this.numUpdatedJobs = parent.getMetricContext().contextAwareCounter(NUM_UPDATED_JOBS);
-      this.numActiveJobs = parent.getMetricContext().newContextAwareGauge(NUM_ACTIVE_JOBS_NAME,
-          new Gauge<Integer>() {
-            @Override public Integer getValue() {
-              return parent.getJobs().size();
-            }
+    @Getter private final ContextAwareTimer timeForJobCatalogGet;
+    @Getter private final ContextAwareHistogram histogramForJobAdd;
+    @Getter private final ContextAwareHistogram histogramForJobUpdate;
+    @Getter private final ContextAwareHistogram histogramForJobDelete;
+
+    public StandardMetrics(final JobCatalog jobCatalog) {
+      MetricContext context = jobCatalog.getMetricContext();
+      this.timeForJobCatalogGet = context.contextAwareTimer(TIME_FOR_JOB_CATALOG_GET, 1, TimeUnit.MINUTES);
+      this.numAddedJobs = context.contextAwareCounter(NUM_ADDED_JOBS);
+      this.numDeletedJobs = context.contextAwareCounter(NUM_DELETED_JOBS);
+      this.numUpdatedJobs = context.contextAwareCounter(NUM_UPDATED_JOBS);
+      this.numActiveJobs = context.newContextAwareGauge(NUM_ACTIVE_JOBS_NAME, ()->{
+          long startTime = System.currentTimeMillis();
+          int size = jobCatalog.getJobs().size();
+          updateGetJobTime(startTime);
+          return size;
       });
-      parent.addListener(this);
+      this.histogramForJobAdd = context.contextAwareHistogram(HISTOGRAM_FOR_JOB_ADD, 1, TimeUnit.MINUTES);
+      this.histogramForJobUpdate = context.contextAwareHistogram(HISTOGRAM_FOR_JOB_UPDATE, 1, TimeUnit.MINUTES);
+      this.histogramForJobDelete = context.contextAwareHistogram(HISTOGRAM_FOR_JOB_DELETE, 1, TimeUnit.MINUTES);
+    }
+
+    public void updateGetJobTime(long startTime) {
+      log.info("updateGetJobTime...");
+      Instrumented.updateTimer(Optional.of(this.timeForJobCatalogGet), System.currentTimeMillis() - startTime, TimeUnit.MILLISECONDS);
     }
 
     @Override public void onAddJob(JobSpec addedJob) {
       this.numAddedJobs.inc();
+      this.histogramForJobAdd.update(1);
       submitTrackingEvent(addedJob, JOB_ADDED_OPERATION_TYPE);
     }
 
@@ -116,44 +136,35 @@ public interface JobCatalog extends JobCatalogListenersContainer, Instrumentable
     @Override
     public void onDeleteJob(URI deletedJobURI, String deletedJobVersion) {
       this.numDeletedJobs.inc();
+      this.histogramForJobDelete.update(1);
       submitTrackingEvent(deletedJobURI, deletedJobVersion, JOB_DELETED_OPERATION_TYPE);
     }
 
     @Override
     public void onUpdateJob(JobSpec updatedJob) {
       this.numUpdatedJobs.inc();
+      this.histogramForJobUpdate.update(1);
       submitTrackingEvent(updatedJob, JOB_UPDATED_OPERATION_TYPE);
     }
 
     @Override
-    public String getName() {
-      return "JobCatalog";
-    }
-
-    @Override
     public Collection<ContextAwareGauge<?>> getGauges() {
       return Collections.singleton(this.numActiveJobs);
     }
 
     @Override
     public Collection<ContextAwareCounter> getCounters() {
-      List<ContextAwareCounter> counters = ImmutableList.of(numAddedJobs, numDeletedJobs, numDeletedJobs);
-      return counters;
-    }
-
-    @Override
-    public Collection<ContextAwareMeter> getMeters() {
-      return null;
+      return ImmutableList.of(numAddedJobs, numDeletedJobs, numUpdatedJobs);
     }
 
     @Override
     public Collection<ContextAwareTimer> getTimers() {
-      return null;
+      return ImmutableList.of(timeForJobCatalogGet);
     }
 
     @Override
     public Collection<ContextAwareHistogram> getHistograms() {
-      return null;
+      return ImmutableList.of(histogramForJobAdd, histogramForJobDelete, histogramForJobUpdate);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1be74552/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java
index e63cb4a..fb54139 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java
@@ -43,8 +43,16 @@ public interface JobExecutionLauncher extends Instrumentable {
     public static final String NUM_JOBS_FAILED_COUNTER = "numJobsFailed";
     public static final String NUM_JOBS_CANCELLED_COUNTER = "numJobsCancelled";
     public static final String NUM_JOBS_RUNNING_GAUGE = "numJobsRunning";
+    public static final String NUM_JOBS_LAUNCHED_HISTOGRAM = "histogramJobsLaunched";
+    public static final String NUM_JOBS_COMPLETED_HISTOGRAM = "histogramJobsCompleted";
+    public static final String NUM_JOBS_COMMITTED_HISTOGRAM = "histogramJobsCommitted";
+    public static final String NUM_JOBS_FAILED_HISTOGRAM = "histogramJobsFailed";
+    public static final String NUM_JOBS_CANCELLED_HISTOGRAM = "histogramJobsCancelled";
+
     public static final String TIMER_FOR_JOB_COMPLETION = "timerForJobCompletion";
     public static final String TIMER_FOR_JOB_FAILURE = "timerForJobFailure";
+    public static final String TIMER_BEFORE_JOB_SCHEDULING = "timerBeforeJobScheduling";
+    public static final String TIMER_BEFORE_JOB_LAUNCHING = "timerBeforeJobLaunching";
 
     public static final String TRACKING_EVENT_NAME = "JobExecutionLauncherEvent";
     public static final String JOB_EXECID_META = "jobExecId";

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1be74552/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableJobCatalog.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableJobCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableJobCatalog.java
index b5c82b7..8b6e98c 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableJobCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableJobCatalog.java
@@ -17,8 +17,19 @@
 package org.apache.gobblin.runtime.api;
 
 import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareTimer;
+
+import com.google.common.base.Optional;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
 
 /**
  * A {@link JobCatalog} that can have its {@link Collection} of {@link JobSpec}s modified
@@ -37,4 +48,36 @@ public interface MutableJobCatalog extends JobCatalog {
    * Removes an existing JobSpec with the given URI. A no-op if such JobSpec does not exist.
    */
   void remove(URI uri);
+
+  @Slf4j
+  public static class MutableStandardMetrics extends JobCatalog.StandardMetrics {
+    public static final String TIME_FOR_JOB_CATALOG_REMOVE = "timeForJobCatalogRemove";
+    public static final String TIME_FOR_JOB_CATALOG_PUT = "timeForJobCatalogPut";
+    @Getter private final ContextAwareTimer timeForJobCatalogPut;
+    @Getter private final ContextAwareTimer timeForJobCatalogRemove;
+    public MutableStandardMetrics(JobCatalog catalog) {
+      super(catalog);
+      timeForJobCatalogPut = catalog.getMetricContext().contextAwareTimer(TIME_FOR_JOB_CATALOG_PUT, 1, TimeUnit.MINUTES);
+      timeForJobCatalogRemove =  catalog.getMetricContext().contextAwareTimer(TIME_FOR_JOB_CATALOG_REMOVE, 1, TimeUnit.MINUTES);
+    }
+
+    public void updatePutJobTime(long startTime) {
+      log.info("updatePutJobTime...");
+      Instrumented.updateTimer(Optional.of(this.timeForJobCatalogPut), System.currentTimeMillis() - startTime, TimeUnit.MILLISECONDS);
+    }
+
+    public void updateRemoveJobTime(long startTime) {
+      log.info("updateRemoveJobTime...");
+      Instrumented.updateTimer(Optional.of(this.timeForJobCatalogRemove), System.currentTimeMillis() - startTime, TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public Collection<ContextAwareTimer> getTimers() {
+      Collection<ContextAwareTimer> all = new ArrayList<>();
+      all.addAll(super.getTimers());
+      all.add(this.timeForJobCatalogPut);
+      all.add(this.timeForJobCatalogRemove);
+      return all;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1be74552/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java
index f63600a..3aa16be 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java
@@ -18,7 +18,17 @@
 package org.apache.gobblin.runtime.api;
 
 import java.net.URI;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareTimer;
+
+import com.google.common.base.Optional;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
 
 
 /**
@@ -38,4 +48,36 @@ public interface MutableSpecCatalog extends SpecCatalog {
    * Throws SpecNotFoundException if such {@link Spec} does not exist.
    */
   void remove(URI uri) throws SpecNotFoundException;
+
+  @Slf4j
+  public static class MutableStandardMetrics extends StandardMetrics {
+    public static final String TIME_FOR_SPEC_CATALOG_REMOVE = "timeForSpecCatalogRemove";
+    public static final String TIME_FOR_SPEC_CATALOG_PUT = "timeForSpecCatalogPut";
+    @Getter private final ContextAwareTimer timeForSpecCatalogPut;
+    @Getter private final ContextAwareTimer timeForSpecCatalogRemove;
+    public MutableStandardMetrics(SpecCatalog catalog) {
+      super(catalog);
+      timeForSpecCatalogPut = catalog.getMetricContext().contextAwareTimer(TIME_FOR_SPEC_CATALOG_PUT, 1, TimeUnit.MINUTES);
+      timeForSpecCatalogRemove =  catalog.getMetricContext().contextAwareTimer(TIME_FOR_SPEC_CATALOG_REMOVE, 1, TimeUnit.MINUTES);
+    }
+
+    public void updatePutSpecTime(long startTime) {
+      log.info("updatePutSpecTime...");
+      Instrumented.updateTimer(Optional.of(this.timeForSpecCatalogPut), System.currentTimeMillis() - startTime, TimeUnit.MILLISECONDS);
+    }
+
+    public void updateRemoveSpecTime(long startTime) {
+      log.info("updateRemoveSpecTime...");
+      Instrumented.updateTimer(Optional.of(this.timeForSpecCatalogRemove), System.currentTimeMillis() - startTime, TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public Collection<ContextAwareTimer> getTimers() {
+      Collection<ContextAwareTimer> all = new ArrayList<>();
+      all.addAll(super.getTimers());
+      all.add(this.timeForSpecCatalogPut);
+      all.add(this.timeForSpecCatalogRemove);
+      return all;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1be74552/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java
index d2af9b8..4b85ea9 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java
@@ -19,20 +19,29 @@ package org.apache.gobblin.runtime.api;
 
 import java.net.URI;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
 
 import com.codahale.metrics.Gauge;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 
 import org.apache.gobblin.instrumented.GobblinMetricsKeys;
-import org.apache.gobblin.instrumented.Instrumentable;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.instrumented.StandardMetricsBridge;
 import org.apache.gobblin.metrics.ContextAwareCounter;
 import org.apache.gobblin.metrics.ContextAwareGauge;
+import org.apache.gobblin.metrics.ContextAwareHistogram;
+import org.apache.gobblin.metrics.ContextAwareTimer;
 import org.apache.gobblin.metrics.GobblinTrackingEvent;
+import org.apache.gobblin.metrics.MetricContext;
 
 import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
 
 
-public interface SpecCatalog extends SpecCatalogListenersContainer, Instrumentable {
+public interface SpecCatalog extends SpecCatalogListenersContainer, StandardMetricsBridge {
   /** Returns an immutable {@link Collection} of {@link Spec}s that are known to the catalog. */
   Collection<Spec> getSpecs();
 
@@ -40,13 +49,18 @@ public interface SpecCatalog extends SpecCatalogListenersContainer, Instrumentab
    * ({@link #isInstrumentationEnabled()}) is false. */
   SpecCatalog.StandardMetrics getMetrics();
 
+  default StandardMetricsBridge.StandardMetrics getStandardMetrics() {
+    return this.getMetrics();
+  }
+
   /**
    * Get a {@link Spec} by uri.
    * @throws SpecNotFoundException if no such Spec exists
    **/
   Spec getSpec(URI uri) throws SpecNotFoundException;
 
-  public static class StandardMetrics implements SpecCatalogListener {
+  @Slf4j
+  public static class StandardMetrics extends StandardMetricsBridge.StandardMetrics implements SpecCatalogListener {
     public static final String NUM_ACTIVE_SPECS_NAME = "numActiveSpecs";
     public static final String NUM_ADDED_SPECS = "numAddedSpecs";
     public static final String NUM_DELETED_SPECS = "numDeletedSpecs";
@@ -55,28 +69,65 @@ public interface SpecCatalog extends SpecCatalogListenersContainer, Instrumentab
     public static final String SPEC_ADDED_OPERATION_TYPE = "SpecAdded";
     public static final String SPEC_DELETED_OPERATION_TYPE = "SpecDeleted";
     public static final String SPEC_UPDATED_OPERATION_TYPE = "SpecUpdated";
+    public static final String TIME_FOR_SPEC_CATALOG_GET = "timeForSpecCatalogGet";
+    public static final String HISTOGRAM_FOR_SPEC_ADD = "histogramForSpecAdd";
+    public static final String HISTOGRAM_FOR_SPEC_UPDATE = "histogramForSpecUpdate";
+    public static final String HISTOGRAM_FOR_SPEC_DELETE = "histogramForSpecDelete";
 
-    @Getter
-    private final ContextAwareGauge<Integer> numActiveSpecs;
+    @Getter private final ContextAwareGauge<Integer> numActiveSpecs;
     @Getter private final ContextAwareCounter numAddedSpecs;
     @Getter private final ContextAwareCounter numDeletedSpecs;
     @Getter private final ContextAwareCounter numUpdatedSpecs;
+    @Getter private final ContextAwareTimer timeForSpecCatalogGet;
+    @Getter private final ContextAwareHistogram histogramForSpecAdd;
+    @Getter private final ContextAwareHistogram histogramForSpecUpdate;
+    @Getter private final ContextAwareHistogram histogramForSpecDelete;
+
+    public StandardMetrics(final SpecCatalog specCatalog) {
+      MetricContext context = specCatalog.getMetricContext();
+      this.timeForSpecCatalogGet = context.contextAwareTimer(TIME_FOR_SPEC_CATALOG_GET, 1, TimeUnit.MINUTES);
+      this.numAddedSpecs = context.contextAwareCounter(NUM_ADDED_SPECS);
+      this.numDeletedSpecs = context.contextAwareCounter(NUM_DELETED_SPECS);
+      this.numUpdatedSpecs = context.contextAwareCounter(NUM_UPDATED_SPECS);
+      this.numActiveSpecs = context.newContextAwareGauge(NUM_ACTIVE_SPECS_NAME,  ()->{
+          long startTime = System.currentTimeMillis();
+          int size = specCatalog.getSpecs().size();
+          updateGetSpecTime(startTime);
+          return size;
+      });
+      this.histogramForSpecAdd = context.contextAwareHistogram(HISTOGRAM_FOR_SPEC_ADD, 1, TimeUnit.MINUTES);
+      this.histogramForSpecUpdate = context.contextAwareHistogram(HISTOGRAM_FOR_SPEC_UPDATE, 1, TimeUnit.MINUTES);
+      this.histogramForSpecDelete = context.contextAwareHistogram(HISTOGRAM_FOR_SPEC_DELETE, 1, TimeUnit.MINUTES);
+    }
+
+    public void updateGetSpecTime(long startTime) {
+      log.info("updateGetSpecTime...");
+      Instrumented.updateTimer(Optional.of(this.timeForSpecCatalogGet), System.currentTimeMillis() - startTime, TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public Collection<ContextAwareGauge<?>> getGauges() {
+      return Collections.singleton(this.numActiveSpecs);
+    }
 
-    public StandardMetrics(final SpecCatalog parent) {
-      this.numAddedSpecs = parent.getMetricContext().contextAwareCounter(NUM_ADDED_SPECS);
-      this.numDeletedSpecs = parent.getMetricContext().contextAwareCounter(NUM_DELETED_SPECS);
-      this.numUpdatedSpecs = parent.getMetricContext().contextAwareCounter(NUM_UPDATED_SPECS);
-      this.numActiveSpecs = parent.getMetricContext().newContextAwareGauge(NUM_ACTIVE_SPECS_NAME,
-          new Gauge<Integer>() {
-            @Override public Integer getValue() {
-              return parent.getSpecs().size();
-            }
-          });
-      parent.addListener(this);
+    @Override
+    public Collection<ContextAwareCounter> getCounters() {
+      return ImmutableList.of(numAddedSpecs, numDeletedSpecs, numUpdatedSpecs);
+    }
+
+    @Override
+    public Collection<ContextAwareHistogram> getHistograms() {
+      return ImmutableList.of(histogramForSpecAdd, histogramForSpecDelete, histogramForSpecUpdate);
+    }
+
+    @Override
+    public Collection<ContextAwareTimer> getTimers() {
+      return ImmutableList.of(this.timeForSpecCatalogGet);
     }
 
     @Override public void onAddSpec(Spec addedSpec) {
       this.numAddedSpecs.inc();
+      this.histogramForSpecAdd.update(1);
       submitTrackingEvent(addedSpec, SPEC_ADDED_OPERATION_TYPE);
     }
 
@@ -100,12 +151,14 @@ public interface SpecCatalog extends SpecCatalogListenersContainer, Instrumentab
     @Override
     public void onDeleteSpec(URI deletedSpecURI, String deletedSpecVersion) {
       this.numDeletedSpecs.inc();
+      this.histogramForSpecDelete.update(1);
       submitTrackingEvent(deletedSpecURI, deletedSpecVersion, SPEC_DELETED_OPERATION_TYPE);
     }
 
     @Override
     public void onUpdateSpec(Spec updatedSpec) {
       this.numUpdatedSpecs.inc();
+      this.histogramForSpecUpdate.update(1);
       submitTrackingEvent(updatedSpec, SPEC_UPDATED_OPERATION_TYPE);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1be74552/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/FSJobCatalog.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/FSJobCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/FSJobCatalog.java
index be06b1e..8d89b97 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/FSJobCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/FSJobCatalog.java
@@ -17,6 +17,7 @@
 package org.apache.gobblin.runtime.job_catalog;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.JobCatalog;
 import org.apache.gobblin.runtime.api.JobCatalogWithTemplates;
 import org.apache.gobblin.runtime.api.JobTemplate;
 import org.apache.gobblin.runtime.api.SpecNotFoundException;
@@ -62,7 +63,7 @@ public class FSJobCatalog extends ImmutableFSJobCatalog implements MutableJobCat
   private static final Logger LOGGER = LoggerFactory.getLogger(FSJobCatalog.class);
   public static final String CONF_EXTENSION = ".conf";
   private static final String FS_SCHEME = "FS";
-
+  private final MutableStandardMetrics mutableMetrics;
   /**
    * Initialize the JobCatalog, fetch all jobs in jobConfDirPath.
    * @param sysConfig
@@ -71,15 +72,24 @@ public class FSJobCatalog extends ImmutableFSJobCatalog implements MutableJobCat
   public FSJobCatalog(Config sysConfig)
       throws IOException {
     super(sysConfig);
+    this.mutableMetrics = (MutableStandardMetrics)metrics;
   }
 
   public FSJobCatalog(GobblinInstanceEnvironment env) throws IOException {
     super(env);
+    this.mutableMetrics = (MutableStandardMetrics)metrics;
   }
 
   public FSJobCatalog(Config sysConfig, Optional<MetricContext> parentMetricContext,
       boolean instrumentationEnabled) throws IOException{
     super(sysConfig, null, parentMetricContext, instrumentationEnabled);
+    this.mutableMetrics = (MutableStandardMetrics)metrics;
+  }
+
+  @Override
+  protected JobCatalog.StandardMetrics createStandardMetrics() {
+    log.info("create standard metrics {} for {}", MutableStandardMetrics.class.getName(), this.getClass().getName());
+    return new MutableStandardMetrics(this);
   }
 
   /**
@@ -94,6 +104,7 @@ public class FSJobCatalog extends ImmutableFSJobCatalog implements MutableJobCat
   protected FSJobCatalog(Config sysConfig, PathAlterationObserver observer)
       throws IOException {
     super(sysConfig, observer);
+    this.mutableMetrics = (MutableStandardMetrics)this.metrics;
   }
 
   /**
@@ -108,8 +119,10 @@ public class FSJobCatalog extends ImmutableFSJobCatalog implements MutableJobCat
     Preconditions.checkState(state() == State.RUNNING, String.format("%s is not running.", this.getClass().getName()));
     Preconditions.checkNotNull(jobSpec);
     try {
+      long startTime = System.currentTimeMillis();
       Path jobSpecPath = getPathForURI(this.jobConfDirPath, jobSpec.getUri());
       materializedJobSpec(jobSpecPath, jobSpec, this.fs);
+      this.mutableMetrics.updatePutJobTime(startTime);
     } catch (IOException e) {
       throw new RuntimeException("When persisting a new JobSpec, unexpected issues happen:" + e.getMessage());
     } catch (JobSpecNotFoundException e) {
@@ -126,10 +139,12 @@ public class FSJobCatalog extends ImmutableFSJobCatalog implements MutableJobCat
   public synchronized void remove(URI jobURI) {
     Preconditions.checkState(state() == State.RUNNING, String.format("%s is not running.", this.getClass().getName()));
     try {
+      long startTime = System.currentTimeMillis();
       Path jobSpecPath = getPathForURI(this.jobConfDirPath, jobURI);
 
       if (fs.exists(jobSpecPath)) {
         fs.delete(jobSpecPath, false);
+        this.mutableMetrics.updateRemoveJobTime(startTime);
       } else {
         LOGGER.warn("No file with URI:" + jobSpecPath + " is found. Deletion failed.");
       }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1be74552/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/JobCatalogBase.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/JobCatalogBase.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/JobCatalogBase.java
index ec33226..d40c962 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/JobCatalogBase.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/JobCatalogBase.java
@@ -18,6 +18,7 @@
 package org.apache.gobblin.runtime.job_catalog;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 
@@ -68,7 +69,8 @@ public abstract class JobCatalogBase extends AbstractIdleService implements JobC
       MetricContext realParentCtx =
           parentMetricContext.or(Instrumented.getMetricContext(new org.apache.gobblin.configuration.State(), getClass()));
       this.metricContext = realParentCtx.childBuilder(JobCatalog.class.getSimpleName()).build();
-      this.metrics = new StandardMetrics(this);
+      this.metrics = createStandardMetrics();
+      this.addListener(this.metrics);
     }
     else {
       this.metricContext = null;
@@ -76,6 +78,10 @@ public abstract class JobCatalogBase extends AbstractIdleService implements JobC
     }
   }
 
+  protected StandardMetrics createStandardMetrics() {
+    return new StandardMetrics(this);
+  }
+
   @Override
   protected void startUp() throws IOException {
     notifyAllListeners();
@@ -87,11 +93,19 @@ public abstract class JobCatalogBase extends AbstractIdleService implements JobC
   }
 
   protected void notifyAllListeners() {
-    for (JobSpec jobSpec : getJobs()) {
+    Collection<JobSpec> jobSpecs = getJobsWithTimeUpdate();
+    for (JobSpec jobSpec : jobSpecs) {
       this.listeners.onAddJob(jobSpec);
     }
   }
 
+  private Collection<JobSpec> getJobsWithTimeUpdate() {
+    long startTime = System.currentTimeMillis();
+    Collection<JobSpec> jobSpecs = getJobs();
+    this.metrics.updateGetJobTime(startTime);
+    return jobSpecs;
+  }
+
   /**{@inheritDoc}*/
   @Override
   public synchronized void addListener(JobCatalogListener jobListener) {
@@ -99,7 +113,7 @@ public abstract class JobCatalogBase extends AbstractIdleService implements JobC
     this.listeners.addListener(jobListener);
 
     if (state() == State.RUNNING) {
-      for (JobSpec jobSpec : getJobs()) {
+      for (JobSpec jobSpec : getJobsWithTimeUpdate()) {
         JobCatalogListener.AddJobCallback addJobCallback = new JobCatalogListener.AddJobCallback(jobSpec);
         this.listeners.callbackOneListener(addJobCallback, jobListener);
       }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1be74552/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/StaticJobCatalog.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/StaticJobCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/StaticJobCatalog.java
index 80d5955..091984f 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/StaticJobCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/StaticJobCatalog.java
@@ -68,6 +68,9 @@ public class StaticJobCatalog extends JobCatalogBase {
     return mapBuilder.build();
   }
 
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings(
+      value = "UR_UNINIT_READ_CALLED_FROM_SUPER_CONSTRUCTOR",
+      justification = "Uninitialized variable has been checked.")
   @Override
   public void addListener(JobCatalogListener jobListener) {
     if (this.jobs == null) {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1be74552/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
index ecfe036..482825f 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
@@ -61,7 +61,7 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
   protected final SpecCatalogListenersList listeners;
   protected final Logger log;
   protected final MetricContext metricContext;
-  protected final FlowCatalog.StandardMetrics metrics;
+  protected final MutableStandardMetrics metrics;
   protected final SpecStore specStore;
 
   private final ClassAliasResolver<SpecStore> aliasResolver;
@@ -87,7 +87,8 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
       MetricContext realParentCtx =
           parentMetricContext.or(Instrumented.getMetricContext(new org.apache.gobblin.configuration.State(), getClass()));
       this.metricContext = realParentCtx.childBuilder(FlowCatalog.class.getSimpleName()).build();
-      this.metrics = new StandardMetrics(this);
+      this.metrics = new MutableStandardMetrics(this);
+      this.addListener(this.metrics);
     }
     else {
       this.metricContext = null;
@@ -133,7 +134,7 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
    /**************************************************/
 
   protected void notifyAllListeners() {
-    for (Spec spec : getSpecs()) {
+    for (Spec spec : getSpecsWithTimeUpdate()) {
       this.listeners.onAddSpec(spec);
     }
   }
@@ -144,7 +145,7 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
     this.listeners.addListener(specListener);
 
     if (state() == State.RUNNING) {
-      for (Spec spec : getSpecs()) {
+      for (Spec spec : getSpecsWithTimeUpdate()) {
         SpecCatalogListener.AddSpecCallback addJobCallback = new SpecCatalogListener.AddSpecCallback(spec);
         this.listeners.callbackOneListener(addJobCallback, specListener);
       }
@@ -192,7 +193,7 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
   }
 
   @Override
-  public StandardMetrics getMetrics() {
+  public SpecCatalog.StandardMetrics getMetrics() {
     return this.metrics;
   }
 
@@ -209,6 +210,17 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
     }
   }
 
+  public Collection<Spec> getSpecsWithTimeUpdate() {
+    try {
+      long startTime = System.currentTimeMillis();
+      Collection<Spec> specs = specStore.getSpecs();
+      this.metrics.updateGetSpecTime(startTime);
+      return specs;
+    } catch (IOException e) {
+      throw new RuntimeException("Cannot retrieve Specs from Spec store", e);
+    }
+  }
+
   public boolean exists(URI uri) {
     try {
       return specStore.exists(uri);
@@ -232,9 +244,11 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
       Preconditions.checkState(state() == State.RUNNING, String.format("%s is not running.", this.getClass().getName()));
       Preconditions.checkNotNull(spec);
 
+      long startTime = System.currentTimeMillis();
       log.info(String.format("Adding FlowSpec with URI: %s and Config: %s", spec.getUri(),
           ((FlowSpec) spec).getConfigAsProperties()));
       specStore.addSpec(spec);
+      metrics.updatePutSpecTime(startTime);
       this.listeners.onAddSpec(spec);
     } catch (IOException e) {
       throw new RuntimeException("Cannot add Spec to Spec store: " + spec, e);
@@ -246,9 +260,10 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
     try {
       Preconditions.checkState(state() == State.RUNNING, String.format("%s is not running.", this.getClass().getName()));
       Preconditions.checkNotNull(uri);
-
+      long startTime = System.currentTimeMillis();
       log.info(String.format("Removing FlowSpec with URI: %s", uri));
       specStore.deleteSpec(uri);
+      this.metrics.updateRemoveSpecTime(startTime);
       this.listeners.onDeleteSpec(uri, FlowSpec.Builder.DEFAULT_VERSION);
 
     } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1be74552/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
index 7bb8b9c..c334d2b 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
@@ -95,6 +95,7 @@ public class TopologyCatalog extends AbstractIdleService implements SpecCatalog,
           parentMetricContext.or(Instrumented.getMetricContext(new org.apache.gobblin.configuration.State(), getClass()));
       this.metricContext = realParentCtx.childBuilder(TopologyCatalog.class.getSimpleName()).build();
       this.metrics = new SpecCatalog.StandardMetrics(this);
+      this.addListener(this.metrics);
     }
     else {
       this.metricContext = null;

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1be74552/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
index a13ed28..2cbb113 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
@@ -17,6 +17,12 @@
 
 package org.apache.gobblin.service.modules.core;
 
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.instrumented.StandardMetricsBridge;
+import org.apache.gobblin.metrics.ContextAwareHistogram;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.Tag;
 import org.apache.gobblin.service.FlowId;
 import org.apache.gobblin.service.Schedule;
 import java.io.IOException;
@@ -28,7 +34,9 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.TimeUnit;
 
+import javax.annotation.Nonnull;
 import lombok.Getter;
 
 import org.apache.commons.cli.CommandLine;
@@ -37,7 +45,6 @@ import org.apache.commons.cli.HelpFormatter;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 import org.apache.commons.lang3.reflect.ConstructorUtils;
-import org.apache.gobblin.util.logs.Log4jConfigurationHelper;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -55,6 +62,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Splitter;
 import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
 import com.google.common.eventbus.EventBus;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -94,7 +102,7 @@ import org.apache.gobblin.util.ConfigUtils;
 
 
 @Alpha
-public class GobblinServiceManager implements ApplicationLauncher {
+public class GobblinServiceManager implements ApplicationLauncher, StandardMetricsBridge{
 
   private static final Logger LOGGER = LoggerFactory.getLogger(GobblinServiceManager.class);
 
@@ -134,6 +142,9 @@ public class GobblinServiceManager implements ApplicationLauncher {
   @Getter
   protected Config config;
 
+  private final MetricContext metricContext;
+  private final Metrics metrics;
+
   public GobblinServiceManager(String serviceName, String serviceId, Config config,
       Optional<Path> serviceWorkDirOptional) throws Exception {
 
@@ -143,7 +154,8 @@ public class GobblinServiceManager implements ApplicationLauncher {
       properties.setProperty(ServiceBasedAppLauncher.APP_STOP_TIME_SECONDS, Long.toString(300));
     }
     this.config = config;
-
+    this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(config), this.getClass());
+    this.metrics = new Metrics(this.metricContext);
     this.serviceId = serviceId;
     this.serviceLauncher = new ServiceBasedAppLauncher(properties, serviceName);
 
@@ -435,6 +447,50 @@ public class GobblinServiceManager implements ApplicationLauncher {
     this.serviceLauncher.close();
   }
 
+  @Override
+  public StandardMetrics getStandardMetrics() {
+    return this.metrics;
+  }
+
+  @Nonnull
+  @Override
+  public MetricContext getMetricContext() {
+    return this.metricContext;
+  }
+
+  @Override
+  public boolean isInstrumentationEnabled() {
+    return false;
+  }
+
+  @Override
+  public List<Tag<?>> generateTags(State state) {
+    return null;
+  }
+
+  @Override
+  public void switchMetricContext(List<Tag<?>> tags) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void switchMetricContext(MetricContext context) {
+    throw new UnsupportedOperationException();
+  }
+
+  private class Metrics extends StandardMetrics {
+    public static final String SERVICE_LEADERSHIP_CHANGE = "serviceLeadershipChange";
+    private ContextAwareHistogram serviceLeadershipChange;
+    public Metrics(final MetricContext metricContext) {
+      serviceLeadershipChange = metricContext.contextAwareHistogram(SERVICE_LEADERSHIP_CHANGE, 1, TimeUnit.MINUTES);
+    }
+
+    @Override
+    public Collection<ContextAwareHistogram> getHistograms() {
+      return ImmutableList.of(this.serviceLeadershipChange);
+    }
+  }
+
   /**
    * A custom {@link MessageHandlerFactory} for {@link ControllerUserDefinedMessageHandler}s that
    * handle messages of type {@link org.apache.helix.model.Message.MessageType#USER_DEFINE_MSG}.

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1be74552/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index 0c45daf..5c26445 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -109,7 +109,7 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
       // the onAddSpec will forward specs to the leader, which is itself.
       this.isActive = isActive;
       if (this.flowCatalog.isPresent()) {
-        Collection<Spec> specs = this.flowCatalog.get().getSpecs();
+        Collection<Spec> specs = this.flowCatalog.get().getSpecsWithTimeUpdate();
         for (Spec spec : specs) {
           onAddSpec(spec);
         }