You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ab...@apache.org on 2018/03/21 08:30:59 UTC

[41/50] incubator-gobblin git commit: [GOBBLIN-419] Add more metrics for cluster job monitoring

[GOBBLIN-419] Add more metrics for cluster job monitoring

Closes #2296 from yukuai518/metrics


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/5e6bfb07
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/5e6bfb07
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/5e6bfb07

Branch: refs/heads/0.12.0
Commit: 5e6bfb079cfdd3d2026e7cd674dd6673933437d3
Parents: 4c15fde
Author: Kuai Yu <ku...@linkedin.com>
Authored: Wed Feb 28 09:04:36 2018 -0800
Committer: Hung Tran <hu...@linkedin.com>
Committed: Wed Feb 28 09:04:36 2018 -0800

----------------------------------------------------------------------
 .../configuration/ConfigurationKeys.java        |   5 +
 .../gobblin/cluster/GobblinClusterManager.java  |  31 ++---
 .../cluster/GobblinHelixJobLauncher.java        |   3 +
 .../cluster/GobblinHelixJobScheduler.java       | 125 +++++++------------
 .../gobblin/cluster/GobblinTaskRunner.java      |  82 +++++++++---
 .../cluster/GobblinTaskRunnerMetrics.java       |  78 ++++++++++++
 .../instrumented/StandardMetricsBridge.java     |  52 ++++----
 .../service/StreamingKafkaSpecConsumer.java     |  44 ++-----
 .../apache/gobblin/runtime/TaskExecutor.java    |  33 ++++-
 .../apache/gobblin/runtime/api/JobCatalog.java  |  39 +++---
 .../runtime/api/JobExecutionLauncher.java       |   6 +-
 .../gobblin/runtime/api/MutableJobCatalog.java  |  23 ++--
 .../gobblin/runtime/api/MutableSpecCatalog.java |  23 ++--
 .../apache/gobblin/runtime/api/SpecCatalog.java |  39 +++---
 .../runtime/job_catalog/FSJobCatalog.java       |   4 +-
 .../job_catalog/ImmutableFSJobCatalog.java      |   2 +-
 .../runtime/job_catalog/JobCatalogBase.java     |  12 +-
 .../runtime/spec_catalog/FlowCatalog.java       |   2 +-
 .../runtime/spec_catalog/TopologyCatalog.java   |   2 +-
 .../modules/core/GobblinServiceManager.java     |  28 +----
 20 files changed, 359 insertions(+), 274 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5e6bfb07/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index c80ceaf..612fd8b 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -169,6 +169,9 @@ public class ConfigurationKeys {
 
   public static final String WORK_UNIT_RETRY_POLICY_KEY = "workunit.retry.policy";
   public static final String WORK_UNIT_RETRY_ENABLED_KEY = "workunit.retry.enabled";
+  public static final String WORK_UNIT_CREATION_TIME_IN_MILLIS = "workunit.creation.time.in.millis";
+  public static final String WORK_UNIT_CREATION_AND_RUN_INTERVAL = "workunit.creation.and.run.interval";
+
   public static final String JOB_RUN_ONCE_KEY = "job.runonce";
   public static final String JOB_DISABLED_KEY = "job.disabled";
   public static final String JOB_JAR_FILES_KEY = "job.jars";
@@ -631,6 +634,8 @@ public class ConfigurationKeys {
   public static final String METRICS_REPORT_INTERVAL_KEY = METRICS_CONFIGURATIONS_PREFIX + "report.interval";
   public static final String DEFAULT_METRICS_REPORT_INTERVAL = Long.toString(TimeUnit.SECONDS.toMillis(30));
   public static final String METRIC_CONTEXT_NAME_KEY = "metrics.context.name";
+  public static final String METRIC_TIMER_WINDOW_SIZE_IN_MINUTES = METRICS_CONFIGURATIONS_PREFIX + "timer.window.size.in.minutes";
+  public static final int DEFAULT_METRIC_TIMER_WINDOW_SIZE_IN_MINUTES = 15;
 
   // File-based reporting
   public static final String METRICS_REPORTING_FILE_ENABLED_KEY =

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5e6bfb07/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
index 3393df6..d57c61e 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
@@ -40,6 +40,7 @@ import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.instrumented.StandardMetricsBridge;
 import org.apache.gobblin.metrics.ContextAwareHistogram;
+import org.apache.gobblin.metrics.ContextAwareMetric;
 import org.apache.gobblin.metrics.GobblinMetrics;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.hadoop.conf.Configuration;
@@ -155,7 +156,7 @@ public class GobblinClusterManager implements ApplicationLauncher, StandardMetri
   private GobblinHelixJobScheduler jobScheduler;
   @Getter
   private JobConfigurationManager jobConfigurationManager;
-  
+
   private final String clusterName;
   private final Config config;
   private final MetricContext metricContext;
@@ -165,7 +166,7 @@ public class GobblinClusterManager implements ApplicationLauncher, StandardMetri
     this.clusterName = clusterName;
     this.config = config;
     this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(config), this.getClass());
-    this.metrics = new Metrics(this.metricContext);
+    this.metrics = new Metrics(this.metricContext, this.config);
     this.isStandaloneMode = ConfigUtils.getBoolean(config, GobblinClusterConfigurationKeys.STANDALONE_CLUSTER_MODE_KEY,
         GobblinClusterConfigurationKeys.DEFAULT_STANDALONE_CLUSTER_MODE);
 
@@ -557,21 +558,6 @@ public class GobblinClusterManager implements ApplicationLauncher, StandardMetri
     return GobblinMetrics.isEnabled(ConfigUtils.configToProperties(this.config));
   }
 
-  @Override
-  public List<Tag<?>> generateTags(State state) {
-    return ImmutableList.of();
-  }
-
-  @Override
-  public void switchMetricContext(List<Tag<?>> tags) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public void switchMetricContext(MetricContext context) {
-    throw new UnsupportedOperationException();
-  }
-
   /**
    * A custom implementation of {@link LiveInstanceChangeListener}.
    */
@@ -588,19 +574,16 @@ public class GobblinClusterManager implements ApplicationLauncher, StandardMetri
   private class Metrics extends StandardMetrics {
     public static final String CLUSTER_LEADERSHIP_CHANGE = "clusterLeadershipChange";
     private ContextAwareHistogram clusterLeadershipChange;
-    public Metrics(final MetricContext metricContext) {
-      clusterLeadershipChange = metricContext.contextAwareHistogram(CLUSTER_LEADERSHIP_CHANGE, 1, TimeUnit.MINUTES);
+    public Metrics(final MetricContext metricContext, final Config config) {
+      int timeWindowSizeInMinutes = ConfigUtils.getInt(config, ConfigurationKeys.METRIC_TIMER_WINDOW_SIZE_IN_MINUTES, ConfigurationKeys.DEFAULT_METRIC_TIMER_WINDOW_SIZE_IN_MINUTES);
+      this.clusterLeadershipChange = metricContext.contextAwareHistogram(CLUSTER_LEADERSHIP_CHANGE, timeWindowSizeInMinutes, TimeUnit.MINUTES);
+      this.contextAwareMetrics.add(clusterLeadershipChange);
     }
 
     @Override
     public String getName() {
       return GobblinClusterManager.class.getName();
     }
-
-    @Override
-    public Collection<ContextAwareHistogram> getHistograms() {
-      return ImmutableList.of(this.clusterLeadershipChange);
-    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5e6bfb07/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
index 62c9b3f..d502462 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
@@ -183,6 +183,9 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
   @Override
   protected void runWorkUnits(List<WorkUnit> workUnits) throws Exception {
     try {
+      long workUnitStartTime = System.currentTimeMillis();
+      workUnits.forEach((k) -> k.setProp(ConfigurationKeys.WORK_UNIT_CREATION_TIME_IN_MILLIS, workUnitStartTime));
+
       // Start the output TaskState collector service
       this.taskStateCollectorService.startAsync().awaitRunning();
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5e6bfb07/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
index 141e3d1..48b12f2 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
@@ -28,13 +28,13 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.gobblin.util.ConfigUtils;
 import org.apache.hadoop.fs.Path;
 import org.apache.helix.HelixManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.eventbus.EventBus;
@@ -47,7 +47,7 @@ import org.apache.gobblin.cluster.event.NewJobConfigArrivalEvent;
 import org.apache.gobblin.cluster.event.UpdateJobConfigArrivalEvent;
 import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.instrumented.StandardMetricsBridge;
-import org.apache.gobblin.metrics.ContextAwareCounter;
+import org.apache.gobblin.metrics.ContextAwareMetric;
 import org.apache.gobblin.metrics.ContextAwareGauge;
 import org.apache.gobblin.metrics.ContextAwareTimer;
 import org.apache.gobblin.metrics.GobblinMetrics;
@@ -121,21 +121,6 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe
   }
 
   @Override
-  public List<Tag<?>> generateTags(org.apache.gobblin.configuration.State state) {
-    return null;
-  }
-
-  @Override
-  public void switchMetricContext(List<Tag<?>> tags) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public void switchMetricContext(MetricContext context) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
   public StandardMetrics getStandardMetrics() {
     return metrics;
   }
@@ -147,29 +132,25 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe
     private final AtomicLong totalJobsCommitted;
     private final AtomicLong totalJobsFailed;
     private final AtomicLong totalJobsCancelled;
-    private final ContextAwareGauge<Long> numJobsLaunched;
-    private final ContextAwareGauge<Long> numJobsCompleted;
-    private final ContextAwareGauge<Long> numJobsCommitted;
-    private final ContextAwareGauge<Long> numJobsFailed;
-    private final ContextAwareGauge<Long> numJobsCancelled;
-    private final ContextAwareGauge<Integer> numJobsRunning;
-
-    private final ContextAwareTimer timeForJobCompletion;
-    private final ContextAwareTimer timeForJobFailure;
+
+    private final ContextAwareTimer timeForCompletedJobs;
+    private final ContextAwareTimer timeForFailedJobs;
+    private final ContextAwareTimer timeForCommittedJobs;
     private final ContextAwareTimer timeBeforeJobScheduling;
     private final ContextAwareTimer timeBeforeJobLaunching;
+    private final ContextAwareTimer timeBetwenJobSchedulingAndLaunching;
 
     private final ThreadPoolExecutor threadPoolExecutor;
-    private final ContextAwareGauge<Integer> executorActiveCount;
-    private final ContextAwareGauge<Integer> executorMaximumPoolSize;
-    private final ContextAwareGauge<Integer> executorPoolSize;
-    private final ContextAwareGauge<Integer> executorCorePoolSize;
-    private final ContextAwareGauge<Integer> executorQueueSize;
 
     public Metrics(final MetricContext metricContext) {
       // Thread executor reference from job scheduler
       this.threadPoolExecutor = (ThreadPoolExecutor)GobblinHelixJobScheduler.this.jobExecutor;
 
+      // timer duration setup
+      int windowSize = ConfigUtils.getInt(ConfigUtils.propertiesToConfig(GobblinHelixJobScheduler.this.properties),
+          ConfigurationKeys.METRIC_TIMER_WINDOW_SIZE_IN_MINUTES,
+          ConfigurationKeys.DEFAULT_METRIC_TIMER_WINDOW_SIZE_IN_MINUTES);
+
       // All historical counters
       this.totalJobsLaunched = new AtomicLong(0);
       this.totalJobsCompleted = new AtomicLong(0);
@@ -177,25 +158,34 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe
       this.totalJobsFailed = new AtomicLong(0);
       this.totalJobsCancelled = new AtomicLong(0);
 
-      this.numJobsLaunched = metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_LAUNCHED, ()->this.totalJobsLaunched.get());
-      this.numJobsCompleted = metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_COMPLETED, ()->this.totalJobsCompleted.get());
-      this.numJobsCommitted = metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_COMMITTED, ()->this.totalJobsCommitted.get());
-      this.numJobsFailed = metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_FAILED, ()->this.totalJobsFailed.get());
-      this.numJobsCancelled = metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_CANCELLED, ()->this.totalJobsCancelled.get());
-      this.numJobsRunning = metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_RUNNING,
-          ()->(int)(Metrics.this.totalJobsLaunched.get() - Metrics.this.totalJobsCompleted.get()));
-
-      this.timeForJobCompletion = metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_FOR_JOB_COMPLETION, 1, TimeUnit.MINUTES);
-      this.timeForJobFailure = metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_FOR_JOB_FAILURE,1, TimeUnit.MINUTES);
-      this.timeBeforeJobScheduling = metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_BEFORE_JOB_SCHEDULING, 1, TimeUnit.MINUTES);
-      this.timeBeforeJobLaunching = metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_BEFORE_JOB_LAUNCHING, 1, TimeUnit.MINUTES);
+      this.contextAwareMetrics.add(metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_LAUNCHED, ()->this.totalJobsLaunched.get()));
+      this.contextAwareMetrics.add(metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_COMPLETED, ()->this.totalJobsCompleted.get()));
+      this.contextAwareMetrics.add(metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_COMMITTED, ()->this.totalJobsCommitted.get()));
+      this.contextAwareMetrics.add(metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_FAILED, ()->this.totalJobsFailed.get()));
+      this.contextAwareMetrics.add(metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_CANCELLED, ()->this.totalJobsCancelled.get()));
+      this.contextAwareMetrics.add(metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_RUNNING,
+          ()->(int)(Metrics.this.totalJobsLaunched.get() - Metrics.this.totalJobsCompleted.get())));
+
+      this.timeForCompletedJobs = metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_FOR_COMPLETED_JOBS, windowSize, TimeUnit.MINUTES);
+      this.timeForFailedJobs = metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_FOR_FAILED_JOBS, windowSize, TimeUnit.MINUTES);
+      this.timeForCommittedJobs = metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_FOR_COMMITTED_JOBS, windowSize, TimeUnit.MINUTES);
+      this.timeBeforeJobScheduling = metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_BEFORE_JOB_SCHEDULING, windowSize, TimeUnit.MINUTES);
+      this.timeBeforeJobLaunching = metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_BEFORE_JOB_LAUNCHING, windowSize, TimeUnit.MINUTES);
+      this.timeBetwenJobSchedulingAndLaunching = metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_BETWEEN_JOB_SCHEDULING_AND_LAUNCHING, windowSize, TimeUnit.MINUTES);
 
       // executor metrics
-      this.executorActiveCount = metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.EXECUTOR_ACTIVE_COUNT, ()->this.threadPoolExecutor.getActiveCount());
-      this.executorMaximumPoolSize = metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.EXECUTOR_MAX_POOL_SIZE, ()->this.threadPoolExecutor.getMaximumPoolSize());
-      this.executorPoolSize = metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.EXECUTOR_POOL_SIZE, ()->this.threadPoolExecutor.getPoolSize());
-      this.executorCorePoolSize =  metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.EXECUTOR_CORE_POOL_SIZE, ()->this.threadPoolExecutor.getCorePoolSize());
-      this.executorQueueSize = metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.EXECUTOR_QUEUE_SIZE, ()->this.threadPoolExecutor.getQueue().size());
+      this.contextAwareMetrics.add(metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.EXECUTOR_ACTIVE_COUNT, ()->this.threadPoolExecutor.getActiveCount()));
+      this.contextAwareMetrics.add(metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.EXECUTOR_MAX_POOL_SIZE, ()->this.threadPoolExecutor.getMaximumPoolSize()));
+      this.contextAwareMetrics.add(metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.EXECUTOR_POOL_SIZE, ()->this.threadPoolExecutor.getPoolSize()));
+      this.contextAwareMetrics.add(metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.EXECUTOR_CORE_POOL_SIZE, ()->this.threadPoolExecutor.getCorePoolSize()));
+      this.contextAwareMetrics.add(metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.EXECUTOR_QUEUE_SIZE, ()->this.threadPoolExecutor.getQueue().size()));
+
+      this.contextAwareMetrics.add(timeForCommittedJobs);
+      this.contextAwareMetrics.add(timeForCompletedJobs);
+      this.contextAwareMetrics.add(timeForFailedJobs);
+      this.contextAwareMetrics.add(timeBeforeJobScheduling);
+      this.contextAwareMetrics.add(timeBeforeJobLaunching);
+      this.contextAwareMetrics.add(timeBetwenJobSchedulingAndLaunching);
     }
 
     private void updateTimeBeforeJobScheduling (Properties jobConfig) {
@@ -208,36 +198,13 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe
       Instrumented.updateTimer(Optional.of(timeBeforeJobLaunching), System.currentTimeMillis() - jobCreationTime, TimeUnit.MILLISECONDS);
     }
 
-    @Override
-    public String getName() {
-      return GobblinHelixJobScheduler.class.getName();
-    }
-
-    @Override
-    public Collection<ContextAwareGauge<?>> getGauges() {
-      List<ContextAwareGauge<?>> list = Lists.newArrayList();
-      list.add(numJobsRunning);
-      list.add(numJobsLaunched);
-      list.add(numJobsCompleted);
-      list.add(numJobsCommitted);
-      list.add(numJobsFailed);
-      list.add(numJobsCancelled);
-      list.add(executorActiveCount);
-      list.add(executorMaximumPoolSize);
-      list.add(executorPoolSize);
-      list.add(executorCorePoolSize);
-      list.add(executorQueueSize);
-      return list;
+    private void updateTimeBetweenJobSchedulingAndJobLaunching (long scheduledTime, long launchingTime) {
+      Instrumented.updateTimer(Optional.of(timeBetwenJobSchedulingAndLaunching), launchingTime - scheduledTime, TimeUnit.MILLISECONDS);
     }
 
     @Override
-    public Collection<ContextAwareCounter> getCounters() {
-      return ImmutableList.of();
-    }
-
-    @Override
-    public Collection<ContextAwareTimer> getTimers() {
-      return ImmutableList.of(timeForJobCompletion, timeForJobFailure, timeBeforeJobScheduling, timeBeforeJobLaunching);
+    public String getName() {
+      return GobblinHelixJobScheduler.class.getName();
     }
   }
 
@@ -265,12 +232,13 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe
       long startTime = jobContext.getJobState().getPropAsLong(START_TIME);
       if (GobblinHelixJobScheduler.this.isInstrumentationEnabled()) {
         metrics.totalJobsCompleted.incrementAndGet();
-        Instrumented.updateTimer(Optional.of(metrics.timeForJobCompletion), System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
+        Instrumented.updateTimer(Optional.of(metrics.timeForCompletedJobs), System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
         if (jobContext.getJobState().getState() == JobState.RunningState.FAILED) {
             metrics.totalJobsFailed.incrementAndGet();
-            Instrumented.updateTimer(Optional.of(metrics.timeForJobFailure), System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
+            Instrumented.updateTimer(Optional.of(metrics.timeForFailedJobs), System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
         } else {
             metrics.totalJobsCommitted.incrementAndGet();
+            Instrumented.updateTimer(Optional.of(metrics.timeForCommittedJobs), System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
         }
       }
     }
@@ -382,17 +350,20 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe
     private final String jobUri;
     private final Properties jobConfig;
     private final JobListener jobListener;
+    private final Long creationTimeInMillis;
 
     public NonScheduledJobRunner(String jobUri, Properties jobConfig, JobListener jobListener) {
       this.jobUri = jobUri;
       this.jobConfig = jobConfig;
       this.jobListener = jobListener;
+      this.creationTimeInMillis = System.currentTimeMillis();
     }
 
     @Override
     public void run() {
       try {
         ((MetricsTrackingListener)jobListener).metrics.updateTimeBeforeJobLaunching(this.jobConfig);
+        ((MetricsTrackingListener)jobListener).metrics.updateTimeBetweenJobSchedulingAndJobLaunching(this.creationTimeInMillis, System.currentTimeMillis());
         GobblinHelixJobScheduler.this.runJob(this.jobConfig, this.jobListener);
 
         // remove non-scheduled job catalog once done so it won't be re-executed

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5e6bfb07/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
index a3fddab..3ec40dc 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
@@ -36,6 +36,11 @@ import org.apache.commons.cli.DefaultParser;
 import org.apache.commons.cli.HelpFormatter;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.instrumented.StandardMetricsBridge;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.Tag;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -81,6 +86,9 @@ import org.apache.gobblin.util.HadoopUtils;
 import org.apache.gobblin.util.JvmUtils;
 import org.apache.gobblin.util.PathUtils;
 
+import javax.annotation.Nonnull;
+import lombok.Getter;
+
 import static org.apache.gobblin.cluster.GobblinClusterConfigurationKeys.CLUSTER_WORK_DIR;
 
 
@@ -109,7 +117,7 @@ import static org.apache.gobblin.cluster.GobblinClusterConfigurationKeys.CLUSTER
  * @author Yinan Li
  */
 @Alpha
-public class GobblinTaskRunner {
+public class GobblinTaskRunner implements StandardMetricsBridge {
 
   private static final Logger logger = LoggerFactory.getLogger(GobblinTaskRunner.class);
   static final java.nio.file.Path CLUSTER_CONF_PATH = Paths.get("generated-gobblin-cluster.conf");
@@ -141,6 +149,8 @@ public class GobblinTaskRunner {
   private final String applicationName;
   private final String applicationId;
   private final Path appWorkPath;
+  private final MetricContext metricContext;
+  private final StandardMetricsBridge.StandardMetrics metrics;
 
   public GobblinTaskRunner(String applicationName, String helixInstanceName, String applicationId,
       String taskRunnerId, Config config, Optional<Path> appWorkDirOptional)
@@ -160,8 +170,10 @@ public class GobblinTaskRunner {
     initHelixManager();
 
     this.containerMetrics = buildContainerMetrics();
-
-    this.taskStateModelFactory = registerHelixTaskFactory();
+    TaskFactoryBuilder builder = new TaskFactoryBuilder(this.config);
+    this.taskStateModelFactory = createTaskStateModelFactory(builder.build());
+    this.metrics = builder.getTaskMetrics();
+    this.metricContext = builder.getMetricContext();
 
     services.addAll(getServices());
     if (services.isEmpty()) {
@@ -174,6 +186,38 @@ public class GobblinTaskRunner {
         applicationName, helixInstanceName, applicationId, taskRunnerId, config, appWorkDirOptional);
   }
 
+  private class TaskFactoryBuilder {
+    private final boolean isRunTaskInSeparateProcessEnabled;
+    private final TaskFactory taskFactory;
+    @Getter
+    private final MetricContext metricContext;
+    @Getter
+    private StandardMetricsBridge.StandardMetrics taskMetrics;
+
+    public TaskFactoryBuilder(Config config) {
+      isRunTaskInSeparateProcessEnabled = getIsRunTaskInSeparateProcessEnabled(config);
+      metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(config), this.getClass());
+      if (isRunTaskInSeparateProcessEnabled) {
+        logger.info("Running a task in a separate process is enabled.");
+        taskFactory = new HelixTaskFactory(GobblinTaskRunner.this.containerMetrics, CLUSTER_CONF_PATH, config);
+        taskMetrics = new GobblinTaskRunnerMetrics.JvmTaskRunnerMetrics();
+      } else {
+        Properties properties = ConfigUtils.configToProperties(config);
+        TaskExecutor taskExecutor = new TaskExecutor(properties);
+        taskFactory = getInProcessTaskFactory(taskExecutor);
+        taskMetrics = new GobblinTaskRunnerMetrics.InProcessTaskRunnerMetrics(taskExecutor, metricContext);
+      }
+    }
+
+    public TaskFactory build(){
+       return taskFactory;
+    }
+
+    private Boolean getIsRunTaskInSeparateProcessEnabled(Config config) {
+      return ConfigUtils.getBoolean(config, GobblinClusterConfigurationKeys.ENABLE_TASK_IN_SEPARATE_PROCESS, false);
+    }
+  }
+
   private Path initAppWorkDir(Config config, Optional<Path> appWorkDirOptional) {
     return appWorkDirOptional.isPresent() ? appWorkDirOptional.get() : GobblinClusterUtils
         .getAppWorkDirPathFromConfig(config, this.fs, this.applicationName, this.applicationId);
@@ -189,19 +233,10 @@ public class GobblinTaskRunner {
         this.helixInstanceName, InstanceType.PARTICIPANT, zkConnectionString);
   }
 
-  private TaskStateModelFactory registerHelixTaskFactory() {
+  private TaskStateModelFactory createTaskStateModelFactory(TaskFactory factory) {
     Map<String, TaskFactory> taskFactoryMap = Maps.newHashMap();
 
-    boolean isRunTaskInSeparateProcessEnabled = getIsRunTaskInSeparateProcessEnabled();
-    TaskFactory taskFactory;
-    if (isRunTaskInSeparateProcessEnabled) {
-      logger.info("Running a task in a separate process is enabled.");
-      taskFactory = new HelixTaskFactory(this.containerMetrics, CLUSTER_CONF_PATH, config);
-    } else {
-      taskFactory = getInProcessTaskFactory();
-    }
-
-    taskFactoryMap.put(GOBBLIN_TASK_FACTORY_NAME, taskFactory);
+    taskFactoryMap.put(GOBBLIN_TASK_FACTORY_NAME, factory);
     TaskStateModelFactory taskStateModelFactory =
         new TaskStateModelFactory(this.helixManager, taskFactoryMap);
     this.helixManager.getStateMachineEngine()
@@ -209,14 +244,13 @@ public class GobblinTaskRunner {
     return taskStateModelFactory;
   }
 
-  private TaskFactory getInProcessTaskFactory() {
+  private TaskFactory getInProcessTaskFactory(TaskExecutor taskExecutor) {
     Properties properties = ConfigUtils.configToProperties(this.config);
     URI rootPathUri = PathUtils.getRootPath(this.appWorkPath).toUri();
     Config stateStoreJobConfig = ConfigUtils.propertiesToConfig(properties)
         .withValue(ConfigurationKeys.STATE_STORE_FS_URI_KEY,
             ConfigValueFactory.fromAnyRef(rootPathUri.toString()));
 
-    TaskExecutor taskExecutor = new TaskExecutor(properties);
     TaskStateTracker taskStateTracker = new GobblinHelixTaskStateTracker(properties);
 
     services.add(taskExecutor);
@@ -385,6 +419,22 @@ public class GobblinTaskRunner {
     }
   }
 
+  @Override
+  public StandardMetrics getStandardMetrics() {
+    return this.metrics;
+  }
+
+  @Nonnull
+  @Override
+  public MetricContext getMetricContext() {
+    return this.metricContext;
+  }
+
+  @Override
+  public boolean isInstrumentationEnabled() {
+    return GobblinMetrics.isEnabled(this.config);
+  }
+
   /**
    * A custom {@link MessageHandlerFactory} for {@link ParticipantShutdownMessageHandler}s that handle messages
    * of type "SHUTDOWN" for shutting down the participants.

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5e6bfb07/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunnerMetrics.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunnerMetrics.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunnerMetrics.java
new file mode 100644
index 0000000..51e8b36
--- /dev/null
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunnerMetrics.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.cluster;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import com.codahale.metrics.Metric;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.StandardMetricsBridge;
+import org.apache.gobblin.metrics.ContextAwareMetric;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.runtime.TaskExecutor;
+
+
+public class GobblinTaskRunnerMetrics {
+
+  static class InProcessTaskRunnerMetrics extends StandardMetricsBridge.StandardMetrics {
+    private TaskExecutor taskExecutor;
+    private static String CURRENT_QUEUED_TASK_COUNT = "currentQueuedTaskCount";
+    private static String HISTORICAL_QUEUED_TASK_COUNT = "historicalQueuedTaskCount";
+    private static String QUEUED_TASK_COUNT = "queuedTaskCount";
+    private static String CURRENT_QUEUED_TASK_TOTAL_TIME = "currentQueuedTaskTotalTime";
+    private static String HISTORICAL_QUEUED_TASK_TOTAL_TIME = "historicalQueuedTaskTotalTime";
+    private static String QUEUED_TASK_TOTAL_TIME = "queuedTaskTotalTime";
+    private static String FAILED_TASK_COUNT = "failedTaskCount";
+    private static String SUCCESSFUL_TASK_COUNT = "successfulTaskCount";
+    private static String RUNNING_TASK_COUNT = "runningTaskCount";
+
+    public InProcessTaskRunnerMetrics (TaskExecutor executor, MetricContext context) {
+      taskExecutor = executor;
+      contextAwareMetrics.add(context.newContextAwareGauge(CURRENT_QUEUED_TASK_COUNT, ()->this.taskExecutor.getCurrentQueuedTaskCount().longValue()));
+      contextAwareMetrics.add(context.newContextAwareGauge(CURRENT_QUEUED_TASK_TOTAL_TIME, ()->this.taskExecutor.getCurrentQueuedTaskTotalTime().longValue()));
+      contextAwareMetrics.add(context.newContextAwareGauge(HISTORICAL_QUEUED_TASK_COUNT, ()->this.taskExecutor.getHistoricalQueuedTaskCount().longValue()));
+      contextAwareMetrics.add(context.newContextAwareGauge(HISTORICAL_QUEUED_TASK_TOTAL_TIME, ()->this.taskExecutor.getHistoricalQueuedTaskTotalTime().longValue()));
+      contextAwareMetrics.add(context.newContextAwareGauge(QUEUED_TASK_COUNT, ()->this.taskExecutor.getQueuedTaskCount().longValue()));
+      contextAwareMetrics.add(context.newContextAwareGauge(QUEUED_TASK_TOTAL_TIME, ()->this.taskExecutor.getQueuedTaskTotalTime().longValue()));
+      contextAwareMetrics.add(context.newContextAwareGauge(FAILED_TASK_COUNT, ()->this.taskExecutor.getFailedTaskCount().getCount()));
+      contextAwareMetrics.add(context.newContextAwareGauge(SUCCESSFUL_TASK_COUNT, ()->this.taskExecutor.getSuccessfulTaskCount().getCount()));
+      contextAwareMetrics.add(context.newContextAwareGauge(RUNNING_TASK_COUNT, ()->this.taskExecutor.getRunningTaskCount().getCount()));
+
+      this.rawMetrics.put(ConfigurationKeys.WORK_UNIT_CREATION_AND_RUN_INTERVAL, this.taskExecutor.getTaskCreateAndRunTimer());
+    }
+
+    @Override
+    public String getName() {
+      return InProcessTaskRunnerMetrics.class.getName();
+    }
+  }
+
+  static class JvmTaskRunnerMetrics extends StandardMetricsBridge.StandardMetrics {
+    //TODO: add metrics to monitor the process execution status (will be revisited after process isolation work is done)
+    @Override
+    public String getName() {
+      return JvmTaskRunnerMetrics.class.getName();
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5e6bfb07/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/StandardMetricsBridge.java
----------------------------------------------------------------------
diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/StandardMetricsBridge.java b/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/StandardMetricsBridge.java
index 3993dce..355139b 100644
--- a/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/StandardMetricsBridge.java
+++ b/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/StandardMetricsBridge.java
@@ -18,15 +18,18 @@
 package org.apache.gobblin.instrumented;
 
 import java.util.Collection;
+import java.util.List;
+import java.util.Map;
 
-import org.apache.gobblin.metrics.ContextAwareCounter;
-import org.apache.gobblin.metrics.ContextAwareGauge;
-import org.apache.gobblin.metrics.ContextAwareHistogram;
-import org.apache.gobblin.metrics.ContextAwareMeter;
-import org.apache.gobblin.metrics.ContextAwareTimer;
-
+import com.codahale.metrics.Metric;
+import com.codahale.metrics.MetricSet;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 
+import org.apache.gobblin.metrics.ContextAwareMetric;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.Tag;
 
 /**
  * This interface indicates a class will expose its metrics to some external systems.
@@ -35,30 +38,37 @@ public interface StandardMetricsBridge extends Instrumentable {
 
   StandardMetrics getStandardMetrics();
 
-  public class StandardMetrics {
+  default void switchMetricContext(MetricContext context) {
+    throw new UnsupportedOperationException();
+  }
 
-    public String getName() {
-      return this.getClass().getName();
-    }
+  default void switchMetricContext(List<Tag<?>> tags) {
+    throw new UnsupportedOperationException();
+  }
 
-    public Collection<ContextAwareGauge<?>> getGauges() {
-      return ImmutableList.of();
-    }
+  default List<Tag<?>> generateTags(org.apache.gobblin.configuration.State state) {
+    return ImmutableList.of();
+  }
+
+  public class StandardMetrics implements MetricSet {
+    protected final List<ContextAwareMetric> contextAwareMetrics;
+    protected final Map<String, Metric> rawMetrics;
 
-    public Collection<ContextAwareCounter> getCounters() {
-      return ImmutableList.of();
+    public StandardMetrics() {
+      this.contextAwareMetrics = Lists.newArrayList();
+      this.rawMetrics = Maps.newHashMap();
     }
 
-    public Collection<ContextAwareMeter> getMeters() {
-      return ImmutableList.of();
+    public String getName() {
+      return this.getClass().getName();
     }
 
-    public Collection<ContextAwareTimer> getTimers() {
-      return ImmutableList.of();
+    public Collection<ContextAwareMetric> getContextAwareMetrics() {
+      return contextAwareMetrics;
     }
 
-    public Collection<ContextAwareHistogram> getHistograms() {
-      return ImmutableList.of();
+    public Map<String, Metric> getMetrics() {
+      return rawMetrics;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5e6bfb07/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java
index 5fd5413..6d8de39 100644
--- a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java
+++ b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java
@@ -34,6 +34,7 @@ import org.apache.commons.lang3.tuple.Pair;
 import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.instrumented.StandardMetricsBridge;
 import org.apache.gobblin.metrics.ContextAwareGauge;
+import org.apache.gobblin.metrics.ContextAwareMetric;
 import org.apache.gobblin.metrics.GobblinMetrics;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.Tag;
@@ -199,12 +200,6 @@ public class StreamingKafkaSpecConsumer extends AbstractIdleService implements S
   }
 
   private class Metrics extends StandardMetricsBridge.StandardMetrics {
-    private ContextAwareGauge<Integer> jobSpecQueueSize;
-    private ContextAwareGauge<Long> jobSpecEnq;
-    private ContextAwareGauge<Long> jobSpecDeq;
-    private ContextAwareGauge<Long> jobSpecConsumed;
-    private ContextAwareGauge<Long> jobSpecParseFailures;
-
     private AtomicLong jobSpecEnqCount = new AtomicLong(0);
     private AtomicLong jobSpecDeqCount = new AtomicLong(0);
 
@@ -215,12 +210,12 @@ public class StreamingKafkaSpecConsumer extends AbstractIdleService implements S
     public static final String SPEC_CONSUMER_JOB_SPEC_PARSE_FAILURES = "specConsumerJobSpecParseFailures";
 
     public Metrics(MetricContext context) {
-      this.jobSpecQueueSize = context.newContextAwareGauge(SPEC_CONSUMER_JOB_SPEC_QUEUE_SIZE, ()->StreamingKafkaSpecConsumer.this._jobSpecQueue.size());
-      this.jobSpecEnq = context.newContextAwareGauge(SPEC_CONSUMER_JOB_SPEC_ENQ, ()->jobSpecEnqCount.get());
-      this.jobSpecDeq = context.newContextAwareGauge(SPEC_CONSUMER_JOB_SPEC_DEQ, ()->jobSpecDeqCount.get());
-      this.jobSpecConsumed = context.newContextAwareGauge(SPEC_CONSUMER_JOB_SPEC_CONSUMED,
-          ()->getNewSpecs() + getRemovedSpecs() + getMessageParseFailures());
-      this.jobSpecParseFailures = context.newContextAwareGauge(SPEC_CONSUMER_JOB_SPEC_PARSE_FAILURES, ()->getMessageParseFailures());
+      this.contextAwareMetrics.add(context.newContextAwareGauge(SPEC_CONSUMER_JOB_SPEC_QUEUE_SIZE, ()->StreamingKafkaSpecConsumer.this._jobSpecQueue.size()));
+      this.contextAwareMetrics.add(context.newContextAwareGauge(SPEC_CONSUMER_JOB_SPEC_ENQ, ()->jobSpecEnqCount.get()));
+      this.contextAwareMetrics.add(context.newContextAwareGauge(SPEC_CONSUMER_JOB_SPEC_DEQ, ()->jobSpecDeqCount.get()));
+      this.contextAwareMetrics.add(context.newContextAwareGauge(SPEC_CONSUMER_JOB_SPEC_CONSUMED,
+          ()->getNewSpecs() + getRemovedSpecs() + getMessageParseFailures()));
+      this.contextAwareMetrics.add(context.newContextAwareGauge(SPEC_CONSUMER_JOB_SPEC_PARSE_FAILURES, ()->getMessageParseFailures()));
     }
 
     private long getNewSpecs() {
@@ -237,16 +232,6 @@ public class StreamingKafkaSpecConsumer extends AbstractIdleService implements S
       return StreamingKafkaSpecConsumer.this._jobMonitor.getMessageParseFailures() != null?
           StreamingKafkaSpecConsumer.this._jobMonitor.getMessageParseFailures().getCount():0;
     }
-
-    public Collection<ContextAwareGauge<?>> getGauges() {
-      List list = Lists.newArrayList();
-      list.add(jobSpecQueueSize);
-      list.add(jobSpecEnq);
-      list.add(jobSpecDeq);
-      list.add(jobSpecConsumed);
-      list.add(jobSpecParseFailures);
-      return list;
-    }
   }
 
   @Override
@@ -264,19 +249,4 @@ public class StreamingKafkaSpecConsumer extends AbstractIdleService implements S
   public boolean isInstrumentationEnabled() {
     return _isInstrumentedEnabled;
   }
-
-  @Override
-  public List<Tag<?>> generateTags(org.apache.gobblin.configuration.State state) {
-    return ImmutableList.of();
-  }
-
-  @Override
-  public void switchMetricContext(List<Tag<?>> tags) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public void switchMetricContext(MetricContext context) {
-    throw new UnsupportedOperationException();
-  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5e6bfb07/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskExecutor.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskExecutor.java
index be78275..b868893 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskExecutor.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskExecutor.java
@@ -39,6 +39,8 @@ import com.codahale.metrics.Gauge;
 import com.codahale.metrics.Meter;
 import com.codahale.metrics.Metric;
 import com.codahale.metrics.MetricSet;
+import com.codahale.metrics.SlidingTimeWindowReservoir;
+import com.codahale.metrics.Timer;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
@@ -75,12 +77,15 @@ public class TaskExecutor extends AbstractIdleService {
   private final ExecutorService forkExecutor;
 
   // Task retry interval
+  @Getter
   private final long retryIntervalInSeconds;
 
   // The maximum number of items in the queued task time map.
+  @Getter
   private final int queuedTaskTimeMaxSize;
 
   // The maximum age of the items in the queued task time map.
+  @Getter
   private final long queuedTaskTimeMaxAge ;
 
   // Map of queued task ids to queue times.  The key is the task id, the value is the time the task was queued.  If the
@@ -96,32 +101,44 @@ public class TaskExecutor extends AbstractIdleService {
   private long lastCalculationTime = 0;
 
   // The total number of tasks currently queued and queued over the historical lookback period.
+  @Getter
   private AtomicInteger queuedTaskCount = new AtomicInteger();
 
   // The total number of tasks currently queued.
+  @Getter
   private AtomicInteger currentQueuedTaskCount = new AtomicInteger();
 
   // The total number of tasks queued over the historical lookback period.
+  @Getter
   private AtomicInteger historicalQueuedTaskCount = new AtomicInteger();
 
   // The total time tasks have currently been in the queue and were in the queue during the historical lookback period.
+  @Getter
   private AtomicLong queuedTaskTotalTime = new AtomicLong();
 
   // The total time tasks have currently been in the queue.
+  @Getter
   private AtomicLong currentQueuedTaskTotalTime = new AtomicLong();
 
   // The total time tasks have been in the queue during the historical lookback period.
+  @Getter
   private AtomicLong historicalQueuedTaskTotalTime = new AtomicLong();
 
   // Count of running tasks.
+  @Getter
   private final Counter runningTaskCount = new Counter();
 
   // Count of failed tasks.
+  @Getter
   private final Meter successfulTaskCount = new Meter();
 
   // Count of failed tasks.
+  @Getter
   private final Meter failedTaskCount = new Meter();
 
+  @Getter
+  private final Timer taskCreateAndRunTimer;
+
   // The metric set exposed from the task executor.
   private final TaskExecutorQueueMetricSet metricSet = new TaskExecutorQueueMetricSet();
 
@@ -129,7 +146,7 @@ public class TaskExecutor extends AbstractIdleService {
    * Constructor used internally.
    */
   private TaskExecutor(int taskExecutorThreadPoolSize, int coreRetryThreadPoolSize, long retryIntervalInSeconds,
-                       int queuedTaskTimeMaxSize, long queuedTaskTimeMaxAge) {
+                       int queuedTaskTimeMaxSize, long queuedTaskTimeMaxAge, int timerWindowSize) {
     Preconditions.checkArgument(taskExecutorThreadPoolSize > 0, "Task executor thread pool size should be positive");
     Preconditions.checkArgument(retryIntervalInSeconds > 0, "Task retry interval should be positive");
     Preconditions.checkArgument(queuedTaskTimeMaxSize > 0, "Queued task time max size should be positive");
@@ -143,6 +160,7 @@ public class TaskExecutor extends AbstractIdleService {
     this.retryIntervalInSeconds = retryIntervalInSeconds;
     this.queuedTaskTimeMaxSize = queuedTaskTimeMaxSize;
     this.queuedTaskTimeMaxAge = queuedTaskTimeMaxAge;
+    this.taskCreateAndRunTimer = new Timer(new SlidingTimeWindowReservoir(timerWindowSize, TimeUnit.MINUTES));
 
     this.forkExecutor = ExecutorsUtils.loggingDecorator(
         new ThreadPoolExecutor(
@@ -175,7 +193,9 @@ public class TaskExecutor extends AbstractIdleService {
         Integer.parseInt(properties.getProperty(ConfigurationKeys.QUEUED_TASK_TIME_MAX_SIZE,
             Integer.toString(ConfigurationKeys.DEFAULT_QUEUED_TASK_TIME_MAX_SIZE))),
         Long.parseLong(properties.getProperty(ConfigurationKeys.QUEUED_TASK_TIME_MAX_AGE,
-            Long.toString(ConfigurationKeys.DEFAULT_QUEUED_TASK_TIME_MAX_AGE))));
+            Long.toString(ConfigurationKeys.DEFAULT_QUEUED_TASK_TIME_MAX_AGE))),
+        Integer.parseInt(properties.getProperty(ConfigurationKeys.METRIC_TIMER_WINDOW_SIZE_IN_MINUTES,
+            Integer.toString(ConfigurationKeys.DEFAULT_METRIC_TIMER_WINDOW_SIZE_IN_MINUTES))));
   }
 
   /**
@@ -191,7 +211,9 @@ public class TaskExecutor extends AbstractIdleService {
         conf.getInt(ConfigurationKeys.QUEUED_TASK_TIME_MAX_SIZE,
             ConfigurationKeys.DEFAULT_QUEUED_TASK_TIME_MAX_SIZE),
         conf.getLong(ConfigurationKeys.QUEUED_TASK_TIME_MAX_AGE,
-            ConfigurationKeys.DEFAULT_QUEUED_TASK_TIME_MAX_AGE));
+            ConfigurationKeys.DEFAULT_QUEUED_TASK_TIME_MAX_AGE),
+        conf.getInt(ConfigurationKeys.METRIC_TIMER_WINDOW_SIZE_IN_MINUTES,
+            ConfigurationKeys.DEFAULT_METRIC_TIMER_WINDOW_SIZE_IN_MINUTES));
   }
 
   @Override
@@ -430,7 +452,12 @@ public class TaskExecutor extends AbstractIdleService {
 
     private void onStart(long startTime) {
       Long queueTime = queuedTasks.remove(this.underlyingTask.getTaskId());
+      long workUnitCreationTime = this.underlyingTask.getTaskContext().getTaskState().getPropAsLong(ConfigurationKeys.WORK_UNIT_CREATION_TIME_IN_MILLIS, 0);
       long timeInQueue = startTime - queueTime;
+      long timeSinceWorkUnitCreation = startTime - workUnitCreationTime;
+
+      taskCreateAndRunTimer.update(timeSinceWorkUnitCreation, TimeUnit.MILLISECONDS);
+
       LOG.debug(String.format("Task %s started. Saving queued time of %d ms to history.", underlyingTask.getTaskId(), timeInQueue));
       queuedTaskTimeHistorical.putIfAbsent(System.currentTimeMillis(), timeInQueue);
       runningTaskCount.inc();

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5e6bfb07/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java
index 42ecef3..6afa94a 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java
@@ -18,23 +18,27 @@ package org.apache.gobblin.runtime.api;
 
 import java.net.URI;
 import java.util.Collection;
+import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
 
 import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.instrumented.GobblinMetricsKeys;
 import org.apache.gobblin.instrumented.Instrumentable;
 import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.instrumented.StandardMetricsBridge;
-import org.apache.gobblin.metrics.ContextAwareCounter;
 import org.apache.gobblin.metrics.ContextAwareGauge;
+import org.apache.gobblin.metrics.ContextAwareMetric;
 import org.apache.gobblin.metrics.ContextAwareTimer;
 import org.apache.gobblin.metrics.GobblinTrackingEvent;
 import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.util.ConfigUtils;
 
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
@@ -76,6 +80,7 @@ public interface JobCatalog extends JobCatalogListenersContainer, Instrumentable
     public static final String JOB_UPDATED_OPERATION_TYPE = "JobUpdated";
 
     private final MetricContext metricsContext;
+    protected final int timeWindowSizeInMinutes;
     @Getter private final AtomicLong totalAddedJobs;
     @Getter private final AtomicLong totalDeletedJobs;
     @Getter private final AtomicLong totalUpdatedJobs;
@@ -85,17 +90,28 @@ public interface JobCatalog extends JobCatalogListenersContainer, Instrumentable
     @Getter private final ContextAwareGauge<Long> totalUpdateCalls;
     @Getter private final ContextAwareGauge<Integer> numActiveJobs;
 
-    public StandardMetrics(final JobCatalog jobCatalog) {
+    public StandardMetrics(final JobCatalog jobCatalog, Optional<Config> sysConfig) {
+      // timer window size
+      this.timeWindowSizeInMinutes = sysConfig.isPresent()?
+          ConfigUtils.getInt(sysConfig.get(), ConfigurationKeys.METRIC_TIMER_WINDOW_SIZE_IN_MINUTES, ConfigurationKeys.DEFAULT_METRIC_TIMER_WINDOW_SIZE_IN_MINUTES) :
+          ConfigurationKeys.DEFAULT_METRIC_TIMER_WINDOW_SIZE_IN_MINUTES;
+
       this.metricsContext = jobCatalog.getMetricContext();
       this.totalAddedJobs = new AtomicLong(0);
       this.totalDeletedJobs = new AtomicLong(0);
       this.totalUpdatedJobs = new AtomicLong(0);
 
-      this.timeForJobCatalogGet = metricsContext.contextAwareTimer(TIME_FOR_JOB_CATALOG_GET, 1, TimeUnit.MINUTES);
+      this.timeForJobCatalogGet = metricsContext.contextAwareTimer(TIME_FOR_JOB_CATALOG_GET, timeWindowSizeInMinutes, TimeUnit.MINUTES);
       this.totalAddCalls = metricsContext.newContextAwareGauge(TOTAL_ADD_CALLS, ()->this.totalAddedJobs.get());
       this.totalUpdateCalls = metricsContext.newContextAwareGauge(TOTAL_UPDATE_CALLS, ()->this.totalUpdatedJobs.get());
       this.totalDeleteCalls = metricsContext.newContextAwareGauge(TOTAL_DELETE_CALLS, ()->this.totalDeletedJobs.get());
       this.numActiveJobs = metricsContext.newContextAwareGauge(NUM_ACTIVE_JOBS_NAME, ()->(int)(totalAddedJobs.get() - totalDeletedJobs.get()));
+
+      this.contextAwareMetrics.add(timeForJobCatalogGet);
+      this.contextAwareMetrics.add(totalAddCalls);
+      this.contextAwareMetrics.add(totalDeleteCalls);
+      this.contextAwareMetrics.add(totalUpdateCalls);
+      this.contextAwareMetrics.add(numActiveJobs);
     }
 
     public void updateGetJobTime(long startTime) {
@@ -136,20 +152,5 @@ public interface JobCatalog extends JobCatalogListenersContainer, Instrumentable
       this.totalUpdatedJobs.incrementAndGet();
       submitTrackingEvent(updatedJob, JOB_UPDATED_OPERATION_TYPE);
     }
-
-    @Override
-    public Collection<ContextAwareGauge<?>> getGauges() {
-      return ImmutableList.of(totalAddCalls, totalDeleteCalls, totalUpdateCalls, numActiveJobs);
-    }
-
-    @Override
-    public Collection<ContextAwareCounter> getCounters() {
-      return ImmutableList.of();
-    }
-
-    @Override
-    public Collection<ContextAwareTimer> getTimers() {
-      return ImmutableList.of(timeForJobCatalogGet);
-    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5e6bfb07/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java
index 3f50ee7..7bc9cc0 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java
@@ -44,10 +44,12 @@ public interface JobExecutionLauncher extends Instrumentable {
     public static final String NUM_JOBS_CANCELLED = "numJobsCancelled";
     public static final String NUM_JOBS_RUNNING = "numJobsRunning";
 
-    public static final String TIMER_FOR_JOB_COMPLETION = "timerForJobCompletion";
-    public static final String TIMER_FOR_JOB_FAILURE = "timerForJobFailure";
+    public static final String TIMER_FOR_COMPLETED_JOBS = "timeForCompletedJobs";
+    public static final String TIMER_FOR_FAILED_JOBS = "timeForFailedJobs";
+    public static final String TIMER_FOR_COMMITTED_JOBS = "timerForCommittedJobs";
     public static final String TIMER_BEFORE_JOB_SCHEDULING = "timerBeforeJobScheduling";
     public static final String TIMER_BEFORE_JOB_LAUNCHING = "timerBeforeJobLaunching";
+    public static final String TIMER_BETWEEN_JOB_SCHEDULING_AND_LAUNCHING = "timerBetwenJobSchedulingAndLaunching";
 
     public static final String EXECUTOR_ACTIVE_COUNT = "executorActiveCount";
     public static final String EXECUTOR_MAX_POOL_SIZE = "executorMaximumPoolSize";

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5e6bfb07/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableJobCatalog.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableJobCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableJobCatalog.java
index 8b6e98c..57dfce5 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableJobCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableJobCatalog.java
@@ -22,10 +22,14 @@ import java.util.Collection;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareMetric;
 import org.apache.gobblin.metrics.ContextAwareTimer;
+import org.apache.gobblin.util.ConfigUtils;
 
 import com.google.common.base.Optional;
+import com.typesafe.config.Config;
 
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
@@ -55,10 +59,12 @@ public interface MutableJobCatalog extends JobCatalog {
     public static final String TIME_FOR_JOB_CATALOG_PUT = "timeForJobCatalogPut";
     @Getter private final ContextAwareTimer timeForJobCatalogPut;
     @Getter private final ContextAwareTimer timeForJobCatalogRemove;
-    public MutableStandardMetrics(JobCatalog catalog) {
-      super(catalog);
-      timeForJobCatalogPut = catalog.getMetricContext().contextAwareTimer(TIME_FOR_JOB_CATALOG_PUT, 1, TimeUnit.MINUTES);
-      timeForJobCatalogRemove =  catalog.getMetricContext().contextAwareTimer(TIME_FOR_JOB_CATALOG_REMOVE, 1, TimeUnit.MINUTES);
+    public MutableStandardMetrics(JobCatalog catalog, Optional<Config> sysConfig) {
+      super(catalog, sysConfig);
+      timeForJobCatalogPut = catalog.getMetricContext().contextAwareTimer(TIME_FOR_JOB_CATALOG_PUT, timeWindowSizeInMinutes, TimeUnit.MINUTES);
+      timeForJobCatalogRemove =  catalog.getMetricContext().contextAwareTimer(TIME_FOR_JOB_CATALOG_REMOVE, this.timeWindowSizeInMinutes, TimeUnit.MINUTES);
+      this.contextAwareMetrics.add(timeForJobCatalogPut);
+      this.contextAwareMetrics.add(timeForJobCatalogRemove);
     }
 
     public void updatePutJobTime(long startTime) {
@@ -70,14 +76,5 @@ public interface MutableJobCatalog extends JobCatalog {
       log.info("updateRemoveJobTime...");
       Instrumented.updateTimer(Optional.of(this.timeForJobCatalogRemove), System.currentTimeMillis() - startTime, TimeUnit.MILLISECONDS);
     }
-
-    @Override
-    public Collection<ContextAwareTimer> getTimers() {
-      Collection<ContextAwareTimer> all = new ArrayList<>();
-      all.addAll(super.getTimers());
-      all.add(this.timeForJobCatalogPut);
-      all.add(this.timeForJobCatalogRemove);
-      return all;
-    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5e6bfb07/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java
index 3aa16be..108a324 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java
@@ -22,10 +22,14 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareMetric;
 import org.apache.gobblin.metrics.ContextAwareTimer;
+import org.apache.gobblin.util.ConfigUtils;
 
 import com.google.common.base.Optional;
+import com.typesafe.config.Config;
 
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
@@ -55,10 +59,12 @@ public interface MutableSpecCatalog extends SpecCatalog {
     public static final String TIME_FOR_SPEC_CATALOG_PUT = "timeForSpecCatalogPut";
     @Getter private final ContextAwareTimer timeForSpecCatalogPut;
     @Getter private final ContextAwareTimer timeForSpecCatalogRemove;
-    public MutableStandardMetrics(SpecCatalog catalog) {
-      super(catalog);
-      timeForSpecCatalogPut = catalog.getMetricContext().contextAwareTimer(TIME_FOR_SPEC_CATALOG_PUT, 1, TimeUnit.MINUTES);
-      timeForSpecCatalogRemove =  catalog.getMetricContext().contextAwareTimer(TIME_FOR_SPEC_CATALOG_REMOVE, 1, TimeUnit.MINUTES);
+    public MutableStandardMetrics(SpecCatalog catalog, Optional<Config> sysConfig) {
+      super(catalog, sysConfig);
+      timeForSpecCatalogPut = catalog.getMetricContext().contextAwareTimer(TIME_FOR_SPEC_CATALOG_PUT, this.timeWindowSizeInMinutes, TimeUnit.MINUTES);
+      timeForSpecCatalogRemove =  catalog.getMetricContext().contextAwareTimer(TIME_FOR_SPEC_CATALOG_REMOVE, this.timeWindowSizeInMinutes, TimeUnit.MINUTES);
+      this.contextAwareMetrics.add(timeForSpecCatalogPut);
+      this.contextAwareMetrics.add(timeForSpecCatalogRemove);
     }
 
     public void updatePutSpecTime(long startTime) {
@@ -70,14 +76,5 @@ public interface MutableSpecCatalog extends SpecCatalog {
       log.info("updateRemoveSpecTime...");
       Instrumented.updateTimer(Optional.of(this.timeForSpecCatalogRemove), System.currentTimeMillis() - startTime, TimeUnit.MILLISECONDS);
     }
-
-    @Override
-    public Collection<ContextAwareTimer> getTimers() {
-      Collection<ContextAwareTimer> all = new ArrayList<>();
-      all.addAll(super.getTimers());
-      all.add(this.timeForSpecCatalogPut);
-      all.add(this.timeForSpecCatalogRemove);
-      return all;
-    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5e6bfb07/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java
index 6e8510f..457be9a 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java
@@ -19,21 +19,25 @@ package org.apache.gobblin.runtime.api;
 
 import java.net.URI;
 import java.util.Collection;
+import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
 
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.instrumented.GobblinMetricsKeys;
 import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.instrumented.StandardMetricsBridge;
-import org.apache.gobblin.metrics.ContextAwareCounter;
 import org.apache.gobblin.metrics.ContextAwareGauge;
+import org.apache.gobblin.metrics.ContextAwareMetric;
 import org.apache.gobblin.metrics.ContextAwareTimer;
 import org.apache.gobblin.metrics.GobblinTrackingEvent;
 import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.util.ConfigUtils;
 
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
@@ -70,6 +74,8 @@ public interface SpecCatalog extends SpecCatalogListenersContainer, StandardMetr
     public static final String TIME_FOR_SPEC_CATALOG_GET = "timeForSpecCatalogGet";
 
     private final MetricContext metricsContext;
+    protected final int timeWindowSizeInMinutes;
+
     @Getter private final AtomicLong totalAddedSpecs;
     @Getter private final AtomicLong totalDeletedSpecs;
     @Getter private final AtomicLong totalUpdatedSpecs;
@@ -80,9 +86,13 @@ public interface SpecCatalog extends SpecCatalogListenersContainer, StandardMetr
 
     @Getter private final ContextAwareTimer timeForSpecCatalogGet;
 
-    public StandardMetrics(final SpecCatalog specCatalog) {
+    public StandardMetrics(final SpecCatalog specCatalog, Optional<Config> sysConfig) {
+      this.timeWindowSizeInMinutes = sysConfig.isPresent()?
+          ConfigUtils.getInt(sysConfig.get(), ConfigurationKeys.METRIC_TIMER_WINDOW_SIZE_IN_MINUTES, ConfigurationKeys.DEFAULT_METRIC_TIMER_WINDOW_SIZE_IN_MINUTES) :
+          ConfigurationKeys.DEFAULT_METRIC_TIMER_WINDOW_SIZE_IN_MINUTES;
+
       this.metricsContext = specCatalog.getMetricContext();
-      this.timeForSpecCatalogGet = metricsContext.contextAwareTimer(TIME_FOR_SPEC_CATALOG_GET, 1, TimeUnit.MINUTES);
+      this.timeForSpecCatalogGet = metricsContext.contextAwareTimer(TIME_FOR_SPEC_CATALOG_GET, timeWindowSizeInMinutes, TimeUnit.MINUTES);
       this.totalAddedSpecs = new AtomicLong(0);
       this.totalDeletedSpecs = new AtomicLong(0);
       this.totalUpdatedSpecs = new AtomicLong(0);
@@ -95,6 +105,12 @@ public interface SpecCatalog extends SpecCatalogListenersContainer, StandardMetr
       this.totalAddCalls = metricsContext.newContextAwareGauge(TOTAL_ADD_CALLS, ()->this.totalAddedSpecs.get());
       this.totalUpdateCalls = metricsContext.newContextAwareGauge(TOTAL_UPDATE_CALLS, ()->this.totalUpdatedSpecs.get());
       this.totalDeleteCalls = metricsContext.newContextAwareGauge(TOTAL_DELETE_CALLS, ()->this.totalDeletedSpecs.get());
+
+      this.contextAwareMetrics.add(numActiveSpecs);
+      this.contextAwareMetrics.add(totalAddCalls);
+      this.contextAwareMetrics.add(totalUpdateCalls);
+      this.contextAwareMetrics.add(totalDeleteCalls);
+      this.contextAwareMetrics.add(timeForSpecCatalogGet);
     }
 
     public void updateGetSpecTime(long startTime) {
@@ -102,21 +118,6 @@ public interface SpecCatalog extends SpecCatalogListenersContainer, StandardMetr
       Instrumented.updateTimer(Optional.of(this.timeForSpecCatalogGet), System.currentTimeMillis() - startTime, TimeUnit.MILLISECONDS);
     }
 
-    @Override
-    public Collection<ContextAwareGauge<?>> getGauges() {
-      return ImmutableList.of(numActiveSpecs, totalAddCalls, totalUpdateCalls, totalDeleteCalls);
-    }
-
-    @Override
-    public Collection<ContextAwareCounter> getCounters() {
-      return ImmutableList.of();
-    }
-
-    @Override
-    public Collection<ContextAwareTimer> getTimers() {
-      return ImmutableList.of(this.timeForSpecCatalogGet);
-    }
-
     @Override public void onAddSpec(Spec addedSpec) {
       this.totalAddedSpecs.incrementAndGet();
       submitTrackingEvent(addedSpec, SPEC_ADDED_OPERATION_TYPE);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5e6bfb07/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/FSJobCatalog.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/FSJobCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/FSJobCatalog.java
index 0f99235..f48b42c 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/FSJobCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/FSJobCatalog.java
@@ -87,9 +87,9 @@ public class FSJobCatalog extends ImmutableFSJobCatalog implements MutableJobCat
   }
 
   @Override
-  protected JobCatalog.StandardMetrics createStandardMetrics() {
+  protected JobCatalog.StandardMetrics createStandardMetrics(Optional<Config> sysConfig) {
     log.info("create standard metrics {} for {}", MutableStandardMetrics.class.getName(), this.getClass().getName());
-    return new MutableStandardMetrics(this);
+    return new MutableStandardMetrics(this, sysConfig);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5e6bfb07/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/ImmutableFSJobCatalog.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/ImmutableFSJobCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/ImmutableFSJobCatalog.java
index 7162c81..626cac2 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/ImmutableFSJobCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/ImmutableFSJobCatalog.java
@@ -102,7 +102,7 @@ public class ImmutableFSJobCatalog extends JobCatalogBase implements JobCatalog
   public ImmutableFSJobCatalog(Config sysConfig, PathAlterationObserver observer, Optional<MetricContext> parentMetricContext,
       boolean instrumentationEnabled)
       throws IOException {
-    super(Optional.of(LOGGER), parentMetricContext, instrumentationEnabled);
+    super(Optional.of(LOGGER), parentMetricContext, instrumentationEnabled, Optional.of(sysConfig));
     this.sysConfig = sysConfig;
     ConfigAccessor cfgAccessor = new ConfigAccessor(this.sysConfig);
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5e6bfb07/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/JobCatalogBase.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/JobCatalogBase.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/JobCatalogBase.java
index d40c962..dcb0723 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/JobCatalogBase.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/JobCatalogBase.java
@@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.AbstractIdleService;
+import com.typesafe.config.Config;
 
 import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.metrics.MetricContext;
@@ -63,13 +64,18 @@ public abstract class JobCatalogBase extends AbstractIdleService implements JobC
 
   public JobCatalogBase(Optional<Logger> log, Optional<MetricContext> parentMetricContext,
       boolean instrumentationEnabled) {
+    this(log, parentMetricContext, instrumentationEnabled, Optional.absent());
+  }
+
+  public JobCatalogBase(Optional<Logger> log, Optional<MetricContext> parentMetricContext,
+      boolean instrumentationEnabled, Optional<Config> sysConfig) {
     this.log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass());
     this.listeners = new JobCatalogListenersList(log);
     if (instrumentationEnabled) {
       MetricContext realParentCtx =
           parentMetricContext.or(Instrumented.getMetricContext(new org.apache.gobblin.configuration.State(), getClass()));
       this.metricContext = realParentCtx.childBuilder(JobCatalog.class.getSimpleName()).build();
-      this.metrics = createStandardMetrics();
+      this.metrics = createStandardMetrics(sysConfig);
       this.addListener(this.metrics);
     }
     else {
@@ -78,8 +84,8 @@ public abstract class JobCatalogBase extends AbstractIdleService implements JobC
     }
   }
 
-  protected StandardMetrics createStandardMetrics() {
-    return new StandardMetrics(this);
+  protected StandardMetrics createStandardMetrics(Optional<Config> sysConfig) {
+    return new StandardMetrics(this, sysConfig);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5e6bfb07/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
index 482825f..a91baed 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
@@ -87,7 +87,7 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
       MetricContext realParentCtx =
           parentMetricContext.or(Instrumented.getMetricContext(new org.apache.gobblin.configuration.State(), getClass()));
       this.metricContext = realParentCtx.childBuilder(FlowCatalog.class.getSimpleName()).build();
-      this.metrics = new MutableStandardMetrics(this);
+      this.metrics = new MutableStandardMetrics(this, Optional.of(config));
       this.addListener(this.metrics);
     }
     else {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5e6bfb07/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
index c334d2b..5c25a67 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
@@ -94,7 +94,7 @@ public class TopologyCatalog extends AbstractIdleService implements SpecCatalog,
       MetricContext realParentCtx =
           parentMetricContext.or(Instrumented.getMetricContext(new org.apache.gobblin.configuration.State(), getClass()));
       this.metricContext = realParentCtx.childBuilder(TopologyCatalog.class.getSimpleName()).build();
-      this.metrics = new SpecCatalog.StandardMetrics(this);
+      this.metrics = new SpecCatalog.StandardMetrics(this, Optional.of(config));
       this.addListener(this.metrics);
     }
     else {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5e6bfb07/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
index 2cbb113..3137c21 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
@@ -21,6 +21,7 @@ import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.instrumented.StandardMetricsBridge;
 import org.apache.gobblin.metrics.ContextAwareHistogram;
+import org.apache.gobblin.metrics.ContextAwareMetric;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.Tag;
 import org.apache.gobblin.service.FlowId;
@@ -155,7 +156,7 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
     }
     this.config = config;
     this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(config), this.getClass());
-    this.metrics = new Metrics(this.metricContext);
+    this.metrics = new Metrics(this.metricContext, this.config);
     this.serviceId = serviceId;
     this.serviceLauncher = new ServiceBasedAppLauncher(properties, serviceName);
 
@@ -463,31 +464,14 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
     return false;
   }
 
-  @Override
-  public List<Tag<?>> generateTags(State state) {
-    return null;
-  }
-
-  @Override
-  public void switchMetricContext(List<Tag<?>> tags) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public void switchMetricContext(MetricContext context) {
-    throw new UnsupportedOperationException();
-  }
-
   private class Metrics extends StandardMetrics {
     public static final String SERVICE_LEADERSHIP_CHANGE = "serviceLeadershipChange";
     private ContextAwareHistogram serviceLeadershipChange;
-    public Metrics(final MetricContext metricContext) {
-      serviceLeadershipChange = metricContext.contextAwareHistogram(SERVICE_LEADERSHIP_CHANGE, 1, TimeUnit.MINUTES);
-    }
 
-    @Override
-    public Collection<ContextAwareHistogram> getHistograms() {
-      return ImmutableList.of(this.serviceLeadershipChange);
+    public Metrics(final MetricContext metricContext, Config config) {
+      int timeWindowSizeInMinutes = ConfigUtils.getInt(config, ConfigurationKeys.METRIC_TIMER_WINDOW_SIZE_IN_MINUTES, ConfigurationKeys.DEFAULT_METRIC_TIMER_WINDOW_SIZE_IN_MINUTES);
+      this.serviceLeadershipChange = metricContext.contextAwareHistogram(SERVICE_LEADERSHIP_CHANGE, timeWindowSizeInMinutes, TimeUnit.MINUTES);
+      this.contextAwareMetrics.add(this.serviceLeadershipChange);
     }
   }