You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ab...@apache.org on 2017/12/24 00:29:32 UTC

incubator-gobblin git commit: [GOBBLIN-349] Add gauges for number of job scheduler for gobblin cluster

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 0fabaa7a4 -> ab034478c


[GOBBLIN-349] Add gauges for number of job scheduler for gobblin cluster

Change number counter to guage type for gobblin
cluster metrics

Fix build

Minor change

Closes #2212 from yukuai518/me


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/ab034478
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/ab034478
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/ab034478

Branch: refs/heads/master
Commit: ab034478c9dfa9eaa3d1b81f698fec3606833328
Parents: 0fabaa7
Author: Kuai Yu <ku...@linkedin.com>
Authored: Sun Dec 24 05:59:19 2017 +0530
Committer: Abhishek Tiwari <ab...@gmail.com>
Committed: Sun Dec 24 05:59:19 2017 +0530

----------------------------------------------------------------------
 .../cluster/GobblinHelixJobScheduler.java       | 80 ++++++++------------
 .../apache/gobblin/runtime/api/JobCatalog.java  | 68 +++++++----------
 .../runtime/api/JobExecutionLauncher.java       | 29 +++----
 .../apache/gobblin/runtime/api/SpecCatalog.java | 65 +++++++---------
 .../runtime/job_catalog/FSJobCatalog.java       |  2 +-
 .../job_catalog/NonObservingFSJobCatalog.java   |  6 +-
 .../job_catalog/TestInMemoryJobCatalog.java     | 42 +++++-----
 7 files changed, 126 insertions(+), 166 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ab034478/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
index 9fd5add..36ba542 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
@@ -20,15 +20,13 @@ package org.apache.gobblin.cluster;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.gobblin.configuration.State;
-import org.apache.gobblin.metrics.ContextAwareHistogram;
 import org.apache.hadoop.fs.Path;
 import org.apache.helix.HelixManager;
 import org.slf4j.Logger;
@@ -36,7 +34,6 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.eventbus.EventBus;
 import com.google.common.eventbus.Subscribe;
@@ -143,18 +140,18 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe
 
   private class Metrics extends StandardMetrics {
 
-    private final ContextAwareCounter numJobsLaunched;
-    private final ContextAwareCounter numJobsCompleted;
-    private final ContextAwareCounter numJobsCommitted;
-    private final ContextAwareCounter numJobsFailed;
-    private final ContextAwareCounter numJobsCancelled;
-    private final ContextAwareHistogram histogramJobsLaunched;
-    private final ContextAwareHistogram histogramJobsCompleted;
-    private final ContextAwareHistogram histogramJobsCommitted;
-    private final ContextAwareHistogram histogramJobsFailed;
-    private final ContextAwareHistogram histogramJobsCancelled;
-
+    private final AtomicLong totalJobsLaunched;
+    private final AtomicLong totalJobsCompleted;
+    private final AtomicLong totalJobsCommitted;
+    private final AtomicLong totalJobsFailed;
+    private final AtomicLong totalJobsCancelled;
+    private final ContextAwareGauge<Long> numJobsLaunched;
+    private final ContextAwareGauge<Long> numJobsCompleted;
+    private final ContextAwareGauge<Long> numJobsCommitted;
+    private final ContextAwareGauge<Long> numJobsFailed;
+    private final ContextAwareGauge<Long> numJobsCancelled;
     private final ContextAwareGauge<Integer> numJobsRunning;
+
     private final ContextAwareTimer timeForJobCompletion;
     private final ContextAwareTimer timeForJobFailure;
     private final ContextAwareTimer timeBeforeJobScheduling;
@@ -162,21 +159,19 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe
 
     public Metrics(final MetricContext metricContext) {
       // All historical counters
-      this.numJobsLaunched = metricContext.contextAwareCounter(JobExecutionLauncher.StandardMetrics.NUM_JOBS_LAUNCHED_COUNTER);
-      this.numJobsCompleted = metricContext.contextAwareCounter(JobExecutionLauncher.StandardMetrics.NUM_JOBS_COMPLETED_COUNTER);
-      this.numJobsCommitted = metricContext.contextAwareCounter(JobExecutionLauncher.StandardMetrics.NUM_JOBS_COMMITTED_COUNTER);
-      this.numJobsFailed = metricContext.contextAwareCounter(JobExecutionLauncher.StandardMetrics.NUM_JOBS_FAILED_COUNTER);
-      this.numJobsCancelled = metricContext.contextAwareCounter(JobExecutionLauncher.StandardMetrics.NUM_JOBS_CANCELLED_COUNTER);
-
-      // Counters within last 1 minute
-      this.histogramJobsLaunched = metricContext.contextAwareHistogram(JobExecutionLauncher.StandardMetrics.NUM_JOBS_LAUNCHED_HISTOGRAM, 1, TimeUnit.MINUTES);
-      this.histogramJobsCompleted = metricContext.contextAwareHistogram(JobExecutionLauncher.StandardMetrics.NUM_JOBS_COMPLETED_HISTOGRAM, 1, TimeUnit.MINUTES);
-      this.histogramJobsCommitted = metricContext.contextAwareHistogram(JobExecutionLauncher.StandardMetrics.NUM_JOBS_COMMITTED_HISTOGRAM, 1, TimeUnit.MINUTES);
-      this.histogramJobsFailed = metricContext.contextAwareHistogram(JobExecutionLauncher.StandardMetrics.NUM_JOBS_FAILED_HISTOGRAM, 1, TimeUnit.MINUTES);
-      this.histogramJobsCancelled = metricContext.contextAwareHistogram(JobExecutionLauncher.StandardMetrics.NUM_JOBS_CANCELLED_HISTOGRAM, 1, TimeUnit.MINUTES);
-
-      this.numJobsRunning = metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_RUNNING_GAUGE,
-          ()->(int)(Metrics.this.numJobsLaunched.getCount() - Metrics.this.numJobsCompleted.getCount()));
+      this.totalJobsLaunched = new AtomicLong(0);
+      this.totalJobsCompleted = new AtomicLong(0);
+      this.totalJobsCommitted = new AtomicLong(0);
+      this.totalJobsFailed = new AtomicLong(0);
+      this.totalJobsCancelled = new AtomicLong(0);
+
+      this.numJobsLaunched = metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_LAUNCHED, ()->this.totalJobsLaunched.get());
+      this.numJobsCompleted = metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_COMPLETED, ()->this.totalJobsCompleted.get());
+      this.numJobsCommitted = metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_COMMITTED, ()->this.totalJobsCommitted.get());
+      this.numJobsFailed = metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_FAILED, ()->this.totalJobsFailed.get());
+      this.numJobsCancelled = metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_CANCELLED, ()->this.totalJobsCancelled.get());
+      this.numJobsRunning = metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_RUNNING,
+          ()->(int)(Metrics.this.totalJobsLaunched.get() - Metrics.this.totalJobsCompleted.get()));
 
       this.timeForJobCompletion = metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_FOR_JOB_COMPLETION, 1, TimeUnit.MINUTES);
       this.timeForJobFailure = metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_FOR_JOB_FAILURE,1, TimeUnit.MINUTES);
@@ -201,29 +196,18 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe
 
     @Override
     public Collection<ContextAwareGauge<?>> getGauges() {
-      return Collections.singleton(numJobsRunning);
+      return ImmutableList.of(numJobsRunning, numJobsLaunched, numJobsCompleted, numJobsCommitted, numJobsFailed, numJobsCancelled);
     }
 
     @Override
     public Collection<ContextAwareCounter> getCounters() {
-      List<ContextAwareCounter> counters = Lists.newArrayList();
-      counters.add(numJobsLaunched);
-      counters.add(numJobsCompleted);
-      counters.add(numJobsCommitted);
-      counters.add(numJobsFailed);
-      counters.add(numJobsCancelled);
-      return counters;
+      return ImmutableList.of();
     }
 
     @Override
     public Collection<ContextAwareTimer> getTimers() {
       return ImmutableList.of(timeForJobCompletion, timeForJobFailure, timeBeforeJobScheduling, timeBeforeJobLaunching);
     }
-
-    @Override
-    public Collection<ContextAwareHistogram> getHistograms() {
-      return ImmutableList.of(histogramJobsCompleted, histogramJobsLaunched, histogramJobsFailed, histogramJobsCancelled, histogramJobsCommitted);
-    }
   }
 
   private class MetricsTrackingListener extends AbstractJobListener {
@@ -239,7 +223,7 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe
       super.onJobPrepare(jobContext);
       jobContext.getJobState().setProp(START_TIME, Long.toString(System.nanoTime()));
       if (GobblinHelixJobScheduler.this.isInstrumentationEnabled()) {
-        metrics.numJobsLaunched.inc();
+        metrics.totalJobsLaunched.incrementAndGet();
       }
     }
 
@@ -249,13 +233,13 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe
       super.onJobCompletion(jobContext);
       long startTime = jobContext.getJobState().getPropAsLong(START_TIME);
       if (GobblinHelixJobScheduler.this.isInstrumentationEnabled()) {
-        metrics.numJobsCompleted.inc();
+        metrics.totalJobsCompleted.incrementAndGet();
         Instrumented.updateTimer(Optional.of(metrics.timeForJobCompletion), System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
         if (jobContext.getJobState().getState() == JobState.RunningState.FAILED) {
-            metrics.numJobsFailed.inc();
+            metrics.totalJobsFailed.incrementAndGet();
             Instrumented.updateTimer(Optional.of(metrics.timeForJobFailure), System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
         } else {
-            metrics.numJobsCommitted.inc();
+            metrics.totalJobsCommitted.incrementAndGet();
         }
       }
     }
@@ -265,7 +249,7 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe
         throws Exception {
       super.onJobCancellation(jobContext);
       if (GobblinHelixJobScheduler.this.isInstrumentationEnabled()) {
-        metrics.numJobsCancelled.inc();
+        metrics.totalJobsCancelled.incrementAndGet();
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ab034478/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java
index 6c0ea5b..950b86d 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java
@@ -18,10 +18,9 @@ package org.apache.gobblin.runtime.api;
 
 import java.net.URI;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
-import com.codahale.metrics.Gauge;
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
@@ -33,7 +32,6 @@ import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.instrumented.StandardMetricsBridge;
 import org.apache.gobblin.metrics.ContextAwareCounter;
 import org.apache.gobblin.metrics.ContextAwareGauge;
-import org.apache.gobblin.metrics.ContextAwareHistogram;
 import org.apache.gobblin.metrics.ContextAwareTimer;
 import org.apache.gobblin.metrics.GobblinTrackingEvent;
 import org.apache.gobblin.metrics.MetricContext;
@@ -67,42 +65,42 @@ public interface JobCatalog extends JobCatalogListenersContainer, Instrumentable
   @Slf4j
   public static class StandardMetrics extends StandardMetricsBridge.StandardMetrics implements JobCatalogListener {
     public static final String NUM_ACTIVE_JOBS_NAME = "numActiveJobs";
-    public static final String NUM_ADDED_JOBS = "numAddedJobs";
-    public static final String NUM_DELETED_JOBS = "numDeletedJobs";
-    public static final String NUM_UPDATED_JOBS = "numUpdatedJobs";
+    public static final String TOTAL_ADD_CALLS = "totalAddCalls";
+    public static final String TOTAL_DELETE_CALLS = "totalDeleteCalls";
+    public static final String TOTAL_UPDATE_CALLS = "totalUpdateCalls";
     public static final String TIME_FOR_JOB_CATALOG_GET = "timeForJobCatalogGet";
-    public static final String HISTOGRAM_FOR_JOB_ADD = "histogramForJobAdd";
-    public static final String HISTOGRAM_FOR_JOB_UPDATE = "histogramForJobUpdate";
-    public static final String HISTOGRAM_FOR_JOB_DELETE = "histogramForJobDelete";
+
     public static final String TRACKING_EVENT_NAME = "JobCatalogEvent";
     public static final String JOB_ADDED_OPERATION_TYPE = "JobAdded";
     public static final String JOB_DELETED_OPERATION_TYPE = "JobDeleted";
     public static final String JOB_UPDATED_OPERATION_TYPE = "JobUpdated";
 
-    @Getter private final ContextAwareGauge<Integer> numActiveJobs;
-    @Getter private final ContextAwareCounter numAddedJobs;
-    @Getter private final ContextAwareCounter numDeletedJobs;
-    @Getter private final ContextAwareCounter numUpdatedJobs;
+    private final MetricContext metricsContext;
+    @Getter private final AtomicLong totalAddedJobs;
+    @Getter private final AtomicLong totalDeletedJobs;
+    @Getter private final AtomicLong totalUpdatedJobs;
     @Getter private final ContextAwareTimer timeForJobCatalogGet;
-    @Getter private final ContextAwareHistogram histogramForJobAdd;
-    @Getter private final ContextAwareHistogram histogramForJobUpdate;
-    @Getter private final ContextAwareHistogram histogramForJobDelete;
+    @Getter private final ContextAwareGauge<Long> totalAddCalls;
+    @Getter private final ContextAwareGauge<Long> totalDeleteCalls;
+    @Getter private final ContextAwareGauge<Long> totalUpdateCalls;
+    @Getter private final ContextAwareGauge<Integer> numActiveJobs;
 
     public StandardMetrics(final JobCatalog jobCatalog) {
-      MetricContext context = jobCatalog.getMetricContext();
-      this.timeForJobCatalogGet = context.contextAwareTimer(TIME_FOR_JOB_CATALOG_GET, 1, TimeUnit.MINUTES);
-      this.numAddedJobs = context.contextAwareCounter(NUM_ADDED_JOBS);
-      this.numDeletedJobs = context.contextAwareCounter(NUM_DELETED_JOBS);
-      this.numUpdatedJobs = context.contextAwareCounter(NUM_UPDATED_JOBS);
-      this.numActiveJobs = context.newContextAwareGauge(NUM_ACTIVE_JOBS_NAME, ()->{
+      this.metricsContext = jobCatalog.getMetricContext();
+      this.totalAddedJobs = new AtomicLong(0);
+      this.totalDeletedJobs = new AtomicLong(0);
+      this.totalUpdatedJobs = new AtomicLong(0);
+
+      this.timeForJobCatalogGet = metricsContext.contextAwareTimer(TIME_FOR_JOB_CATALOG_GET, 1, TimeUnit.MINUTES);
+      this.totalAddCalls = metricsContext.newContextAwareGauge(TOTAL_ADD_CALLS, ()->this.totalAddedJobs.get());
+      this.totalUpdateCalls = metricsContext.newContextAwareGauge(TOTAL_UPDATE_CALLS, ()->this.totalUpdatedJobs.get());
+      this.totalDeleteCalls = metricsContext.newContextAwareGauge(TOTAL_DELETE_CALLS, ()->this.totalDeletedJobs.get());
+      this.numActiveJobs = metricsContext.newContextAwareGauge(NUM_ACTIVE_JOBS_NAME, ()->{
           long startTime = System.currentTimeMillis();
           int size = jobCatalog.getJobs().size();
           updateGetJobTime(startTime);
           return size;
       });
-      this.histogramForJobAdd = context.contextAwareHistogram(HISTOGRAM_FOR_JOB_ADD, 1, TimeUnit.MINUTES);
-      this.histogramForJobUpdate = context.contextAwareHistogram(HISTOGRAM_FOR_JOB_UPDATE, 1, TimeUnit.MINUTES);
-      this.histogramForJobDelete = context.contextAwareHistogram(HISTOGRAM_FOR_JOB_DELETE, 1, TimeUnit.MINUTES);
     }
 
     public void updateGetJobTime(long startTime) {
@@ -111,8 +109,7 @@ public interface JobCatalog extends JobCatalogListenersContainer, Instrumentable
     }
 
     @Override public void onAddJob(JobSpec addedJob) {
-      this.numAddedJobs.inc();
-      this.histogramForJobAdd.update(1);
+      this.totalAddedJobs.incrementAndGet();
       submitTrackingEvent(addedJob, JOB_ADDED_OPERATION_TYPE);
     }
 
@@ -130,41 +127,34 @@ public interface JobCatalog extends JobCatalogListenersContainer, Instrumentable
               .put(GobblinMetricsKeys.JOB_SPEC_VERSION_META, jobSpecVersion)
               .build())
           .build();
-      this.numAddedJobs.getContext().submitEvent(e);
+      this.metricsContext.submitEvent(e);
     }
 
     @Override
     public void onDeleteJob(URI deletedJobURI, String deletedJobVersion) {
-      this.numDeletedJobs.inc();
-      this.histogramForJobDelete.update(1);
+      this.totalDeletedJobs.incrementAndGet();
       submitTrackingEvent(deletedJobURI, deletedJobVersion, JOB_DELETED_OPERATION_TYPE);
     }
 
     @Override
     public void onUpdateJob(JobSpec updatedJob) {
-      this.numUpdatedJobs.inc();
-      this.histogramForJobUpdate.update(1);
+      this.totalUpdatedJobs.incrementAndGet();
       submitTrackingEvent(updatedJob, JOB_UPDATED_OPERATION_TYPE);
     }
 
     @Override
     public Collection<ContextAwareGauge<?>> getGauges() {
-      return Collections.singleton(this.numActiveJobs);
+      return ImmutableList.of(totalAddCalls, totalDeleteCalls, totalUpdateCalls, numActiveJobs);
     }
 
     @Override
     public Collection<ContextAwareCounter> getCounters() {
-      return ImmutableList.of(numAddedJobs, numDeletedJobs, numUpdatedJobs);
+      return ImmutableList.of();
     }
 
     @Override
     public Collection<ContextAwareTimer> getTimers() {
       return ImmutableList.of(timeForJobCatalogGet);
     }
-
-    @Override
-    public Collection<ContextAwareHistogram> getHistograms() {
-      return ImmutableList.of(histogramForJobAdd, histogramForJobDelete, histogramForJobUpdate);
-    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ab034478/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java
index fb54139..a7e5878 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java
@@ -37,17 +37,12 @@ public interface JobExecutionLauncher extends Instrumentable {
   StandardMetrics getMetrics();
 
   public static class StandardMetrics {
-    public static final String NUM_JOBS_LAUNCHED_COUNTER = "numJobsLaunched";
-    public static final String NUM_JOBS_COMPLETED_COUNTER = "numJobsCompleted";
-    public static final String NUM_JOBS_COMMITTED_COUNTER = "numJobsCommitted";
-    public static final String NUM_JOBS_FAILED_COUNTER = "numJobsFailed";
-    public static final String NUM_JOBS_CANCELLED_COUNTER = "numJobsCancelled";
-    public static final String NUM_JOBS_RUNNING_GAUGE = "numJobsRunning";
-    public static final String NUM_JOBS_LAUNCHED_HISTOGRAM = "histogramJobsLaunched";
-    public static final String NUM_JOBS_COMPLETED_HISTOGRAM = "histogramJobsCompleted";
-    public static final String NUM_JOBS_COMMITTED_HISTOGRAM = "histogramJobsCommitted";
-    public static final String NUM_JOBS_FAILED_HISTOGRAM = "histogramJobsFailed";
-    public static final String NUM_JOBS_CANCELLED_HISTOGRAM = "histogramJobsCancelled";
+    public static final String NUM_JOBS_LAUNCHED = "numJobsLaunched";
+    public static final String NUM_JOBS_COMPLETED = "numJobsCompleted";
+    public static final String NUM_JOBS_COMMITTED = "numJobsCommitted";
+    public static final String NUM_JOBS_FAILED = "numJobsFailed";
+    public static final String NUM_JOBS_CANCELLED = "numJobsCancelled";
+    public static final String NUM_JOBS_RUNNING = "numJobsRunning";
 
     public static final String TIMER_FOR_JOB_COMPLETION = "timerForJobCompletion";
     public static final String TIMER_FOR_JOB_FAILURE = "timerForJobFailure";
@@ -70,12 +65,12 @@ public interface JobExecutionLauncher extends Instrumentable {
     @Getter private final ContextAwareGauge<Integer> numJobsRunning;
 
     public StandardMetrics(final JobExecutionLauncher parent) {
-      this.numJobsLaunched = parent.getMetricContext().contextAwareCounter(NUM_JOBS_LAUNCHED_COUNTER);
-      this.numJobsCompleted = parent.getMetricContext().contextAwareCounter(NUM_JOBS_COMPLETED_COUNTER);
-      this.numJobsCommitted = parent.getMetricContext().contextAwareCounter(NUM_JOBS_COMMITTED_COUNTER);
-      this.numJobsFailed = parent.getMetricContext().contextAwareCounter(NUM_JOBS_FAILED_COUNTER);
-      this.numJobsCancelled = parent.getMetricContext().contextAwareCounter(NUM_JOBS_CANCELLED_COUNTER);
-      this.numJobsRunning = parent.getMetricContext().newContextAwareGauge(NUM_JOBS_RUNNING_GAUGE,
+      this.numJobsLaunched = parent.getMetricContext().contextAwareCounter(NUM_JOBS_LAUNCHED);
+      this.numJobsCompleted = parent.getMetricContext().contextAwareCounter(NUM_JOBS_COMPLETED);
+      this.numJobsCommitted = parent.getMetricContext().contextAwareCounter(NUM_JOBS_COMMITTED);
+      this.numJobsFailed = parent.getMetricContext().contextAwareCounter(NUM_JOBS_FAILED);
+      this.numJobsCancelled = parent.getMetricContext().contextAwareCounter(NUM_JOBS_CANCELLED);
+      this.numJobsRunning = parent.getMetricContext().newContextAwareGauge(NUM_JOBS_RUNNING,
             new Gauge<Integer>() {
               @Override public Integer getValue() {
                 return (int)(StandardMetrics.this.getNumJobsLaunched().getCount() -

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ab034478/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java
index 4b85ea9..6e8510f 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java
@@ -19,10 +19,9 @@ package org.apache.gobblin.runtime.api;
 
 import java.net.URI;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
-import com.codahale.metrics.Gauge;
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
@@ -32,7 +31,6 @@ import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.instrumented.StandardMetricsBridge;
 import org.apache.gobblin.metrics.ContextAwareCounter;
 import org.apache.gobblin.metrics.ContextAwareGauge;
-import org.apache.gobblin.metrics.ContextAwareHistogram;
 import org.apache.gobblin.metrics.ContextAwareTimer;
 import org.apache.gobblin.metrics.GobblinTrackingEvent;
 import org.apache.gobblin.metrics.MetricContext;
@@ -62,42 +60,41 @@ public interface SpecCatalog extends SpecCatalogListenersContainer, StandardMetr
   @Slf4j
   public static class StandardMetrics extends StandardMetricsBridge.StandardMetrics implements SpecCatalogListener {
     public static final String NUM_ACTIVE_SPECS_NAME = "numActiveSpecs";
-    public static final String NUM_ADDED_SPECS = "numAddedSpecs";
-    public static final String NUM_DELETED_SPECS = "numDeletedSpecs";
-    public static final String NUM_UPDATED_SPECS = "numUpdatedSpecs";
+    public static final String TOTAL_ADD_CALLS = "totalAddCalls";
+    public static final String TOTAL_DELETE_CALLS = "totalDeleteCalls";
+    public static final String TOTAL_UPDATE_CALLS = "totalUpdateCalls";
     public static final String TRACKING_EVENT_NAME = "SpecCatalogEvent";
     public static final String SPEC_ADDED_OPERATION_TYPE = "SpecAdded";
     public static final String SPEC_DELETED_OPERATION_TYPE = "SpecDeleted";
     public static final String SPEC_UPDATED_OPERATION_TYPE = "SpecUpdated";
     public static final String TIME_FOR_SPEC_CATALOG_GET = "timeForSpecCatalogGet";
-    public static final String HISTOGRAM_FOR_SPEC_ADD = "histogramForSpecAdd";
-    public static final String HISTOGRAM_FOR_SPEC_UPDATE = "histogramForSpecUpdate";
-    public static final String HISTOGRAM_FOR_SPEC_DELETE = "histogramForSpecDelete";
 
+    private final MetricContext metricsContext;
+    @Getter private final AtomicLong totalAddedSpecs;
+    @Getter private final AtomicLong totalDeletedSpecs;
+    @Getter private final AtomicLong totalUpdatedSpecs;
+    @Getter private final ContextAwareGauge<Long> totalAddCalls;
+    @Getter private final ContextAwareGauge<Long> totalDeleteCalls;
+    @Getter private final ContextAwareGauge<Long> totalUpdateCalls;
     @Getter private final ContextAwareGauge<Integer> numActiveSpecs;
-    @Getter private final ContextAwareCounter numAddedSpecs;
-    @Getter private final ContextAwareCounter numDeletedSpecs;
-    @Getter private final ContextAwareCounter numUpdatedSpecs;
+
     @Getter private final ContextAwareTimer timeForSpecCatalogGet;
-    @Getter private final ContextAwareHistogram histogramForSpecAdd;
-    @Getter private final ContextAwareHistogram histogramForSpecUpdate;
-    @Getter private final ContextAwareHistogram histogramForSpecDelete;
 
     public StandardMetrics(final SpecCatalog specCatalog) {
-      MetricContext context = specCatalog.getMetricContext();
-      this.timeForSpecCatalogGet = context.contextAwareTimer(TIME_FOR_SPEC_CATALOG_GET, 1, TimeUnit.MINUTES);
-      this.numAddedSpecs = context.contextAwareCounter(NUM_ADDED_SPECS);
-      this.numDeletedSpecs = context.contextAwareCounter(NUM_DELETED_SPECS);
-      this.numUpdatedSpecs = context.contextAwareCounter(NUM_UPDATED_SPECS);
-      this.numActiveSpecs = context.newContextAwareGauge(NUM_ACTIVE_SPECS_NAME,  ()->{
+      this.metricsContext = specCatalog.getMetricContext();
+      this.timeForSpecCatalogGet = metricsContext.contextAwareTimer(TIME_FOR_SPEC_CATALOG_GET, 1, TimeUnit.MINUTES);
+      this.totalAddedSpecs = new AtomicLong(0);
+      this.totalDeletedSpecs = new AtomicLong(0);
+      this.totalUpdatedSpecs = new AtomicLong(0);
+      this.numActiveSpecs = metricsContext.newContextAwareGauge(NUM_ACTIVE_SPECS_NAME,  ()->{
           long startTime = System.currentTimeMillis();
           int size = specCatalog.getSpecs().size();
           updateGetSpecTime(startTime);
           return size;
       });
-      this.histogramForSpecAdd = context.contextAwareHistogram(HISTOGRAM_FOR_SPEC_ADD, 1, TimeUnit.MINUTES);
-      this.histogramForSpecUpdate = context.contextAwareHistogram(HISTOGRAM_FOR_SPEC_UPDATE, 1, TimeUnit.MINUTES);
-      this.histogramForSpecDelete = context.contextAwareHistogram(HISTOGRAM_FOR_SPEC_DELETE, 1, TimeUnit.MINUTES);
+      this.totalAddCalls = metricsContext.newContextAwareGauge(TOTAL_ADD_CALLS, ()->this.totalAddedSpecs.get());
+      this.totalUpdateCalls = metricsContext.newContextAwareGauge(TOTAL_UPDATE_CALLS, ()->this.totalUpdatedSpecs.get());
+      this.totalDeleteCalls = metricsContext.newContextAwareGauge(TOTAL_DELETE_CALLS, ()->this.totalDeletedSpecs.get());
     }
 
     public void updateGetSpecTime(long startTime) {
@@ -107,17 +104,12 @@ public interface SpecCatalog extends SpecCatalogListenersContainer, StandardMetr
 
     @Override
     public Collection<ContextAwareGauge<?>> getGauges() {
-      return Collections.singleton(this.numActiveSpecs);
+      return ImmutableList.of(numActiveSpecs, totalAddCalls, totalUpdateCalls, totalDeleteCalls);
     }
 
     @Override
     public Collection<ContextAwareCounter> getCounters() {
-      return ImmutableList.of(numAddedSpecs, numDeletedSpecs, numUpdatedSpecs);
-    }
-
-    @Override
-    public Collection<ContextAwareHistogram> getHistograms() {
-      return ImmutableList.of(histogramForSpecAdd, histogramForSpecDelete, histogramForSpecUpdate);
+      return ImmutableList.of();
     }
 
     @Override
@@ -126,8 +118,7 @@ public interface SpecCatalog extends SpecCatalogListenersContainer, StandardMetr
     }
 
     @Override public void onAddSpec(Spec addedSpec) {
-      this.numAddedSpecs.inc();
-      this.histogramForSpecAdd.update(1);
+      this.totalAddedSpecs.incrementAndGet();
       submitTrackingEvent(addedSpec, SPEC_ADDED_OPERATION_TYPE);
     }
 
@@ -145,20 +136,18 @@ public interface SpecCatalog extends SpecCatalogListenersContainer, StandardMetr
               .put(GobblinMetricsKeys.SPEC_VERSION_META, specSpecVersion)
               .build())
           .build();
-      this.numAddedSpecs.getContext().submitEvent(e);
+      this.metricsContext.submitEvent(e);
     }
 
     @Override
     public void onDeleteSpec(URI deletedSpecURI, String deletedSpecVersion) {
-      this.numDeletedSpecs.inc();
-      this.histogramForSpecDelete.update(1);
+      this.totalDeletedSpecs.incrementAndGet();
       submitTrackingEvent(deletedSpecURI, deletedSpecVersion, SPEC_DELETED_OPERATION_TYPE);
     }
 
     @Override
     public void onUpdateSpec(Spec updatedSpec) {
-      this.numUpdatedSpecs.inc();
-      this.histogramForSpecUpdate.update(1);
+      this.totalUpdatedSpecs.incrementAndGet();
       submitTrackingEvent(updatedSpec, SPEC_UPDATED_OPERATION_TYPE);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ab034478/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/FSJobCatalog.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/FSJobCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/FSJobCatalog.java
index 8d89b97..0f99235 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/FSJobCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/FSJobCatalog.java
@@ -63,7 +63,7 @@ public class FSJobCatalog extends ImmutableFSJobCatalog implements MutableJobCat
   private static final Logger LOGGER = LoggerFactory.getLogger(FSJobCatalog.class);
   public static final String CONF_EXTENSION = ".conf";
   private static final String FS_SCHEME = "FS";
-  private final MutableStandardMetrics mutableMetrics;
+  protected final MutableStandardMetrics mutableMetrics;
   /**
    * Initialize the JobCatalog, fetch all jobs in jobConfDirPath.
    * @param sysConfig

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ab034478/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/NonObservingFSJobCatalog.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/NonObservingFSJobCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/NonObservingFSJobCatalog.java
index eebf6f9..76adca4 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/NonObservingFSJobCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/NonObservingFSJobCatalog.java
@@ -77,11 +77,12 @@ public class NonObservingFSJobCatalog extends FSJobCatalog {
     Preconditions.checkState(state() == State.RUNNING, String.format("%s is not running.", this.getClass().getName()));
     Preconditions.checkNotNull(jobSpec);
     try {
+      long startTime = System.currentTimeMillis();
       Path jobSpecPath = getPathForURI(this.jobConfDirPath, jobSpec.getUri());
       boolean isUpdate = fs.exists(jobSpecPath);
 
       materializedJobSpec(jobSpecPath, jobSpec, this.fs);
-
+      this.mutableMetrics.updatePutJobTime(startTime);
       if (isUpdate) {
         this.listeners.onUpdateJob(jobSpec);
       } else {
@@ -103,12 +104,13 @@ public class NonObservingFSJobCatalog extends FSJobCatalog {
   public synchronized void remove(URI jobURI) {
     Preconditions.checkState(state() == State.RUNNING, String.format("%s is not running.", this.getClass().getName()));
     try {
+      long startTime = System.currentTimeMillis();
       JobSpec jobSpec = getJobSpec(jobURI);
-
       Path jobSpecPath = getPathForURI(this.jobConfDirPath, jobURI);
 
       if (fs.exists(jobSpecPath)) {
         fs.delete(jobSpecPath, false);
+        this.mutableMetrics.updateRemoveJobTime(startTime);
       } else {
         LOGGER.warn("No file with URI:" + jobSpecPath + " is found. Deletion failed.");
       }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ab034478/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_catalog/TestInMemoryJobCatalog.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_catalog/TestInMemoryJobCatalog.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_catalog/TestInMemoryJobCatalog.java
index 5920f78..026c888 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_catalog/TestInMemoryJobCatalog.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_catalog/TestInMemoryJobCatalog.java
@@ -94,9 +94,9 @@ public class TestInMemoryJobCatalog {
 
     cat.put(js1_1);
     Assert.assertEquals(cat.getMetrics().getNumActiveJobs().getValue().intValue(), 1);
-    Assert.assertEquals(cat.getMetrics().getNumAddedJobs().getCount(), 1);
-    Assert.assertEquals(cat.getMetrics().getNumUpdatedJobs().getCount(), 0);
-    Assert.assertEquals(cat.getMetrics().getNumDeletedJobs().getCount(), 0);
+    Assert.assertEquals(cat.getMetrics().getTotalAddCalls().getValue().longValue(), 1);
+    Assert.assertEquals(cat.getMetrics().getTotalUpdateCalls().getValue().longValue(), 0);
+    Assert.assertEquals(cat.getMetrics().getTotalDeleteCalls().getValue().longValue(), 0);
     ma.assertEvent(Predicates.and(
         MetricsAssert.eqEventNamespace(JobCatalog.class.getName()),
         MetricsAssert.eqEventName(JobCatalog.StandardMetrics.TRACKING_EVENT_NAME),
@@ -109,9 +109,9 @@ public class TestInMemoryJobCatalog {
 
     cat.put(js1_2);
     Assert.assertEquals(cat.getMetrics().getNumActiveJobs().getValue().intValue(), 1);
-    Assert.assertEquals(cat.getMetrics().getNumAddedJobs().getCount(), 1);
-    Assert.assertEquals(cat.getMetrics().getNumUpdatedJobs().getCount(), 1);
-    Assert.assertEquals(cat.getMetrics().getNumDeletedJobs().getCount(), 0);
+    Assert.assertEquals(cat.getMetrics().getTotalAddCalls().getValue().longValue(), 1);
+    Assert.assertEquals(cat.getMetrics().getTotalUpdateCalls().getValue().longValue(), 1);
+    Assert.assertEquals(cat.getMetrics().getTotalDeleteCalls().getValue().longValue(), 0);
     ma.assertEvent(Predicates.and(
         MetricsAssert.eqEventNamespace(JobCatalog.class.getName()),
         MetricsAssert.eqEventName(JobCatalog.StandardMetrics.TRACKING_EVENT_NAME),
@@ -124,9 +124,9 @@ public class TestInMemoryJobCatalog {
 
     cat.put(js2);
     Assert.assertEquals(cat.getMetrics().getNumActiveJobs().getValue().intValue(), 2);
-    Assert.assertEquals(cat.getMetrics().getNumAddedJobs().getCount(), 2);
-    Assert.assertEquals(cat.getMetrics().getNumUpdatedJobs().getCount(), 1);
-    Assert.assertEquals(cat.getMetrics().getNumDeletedJobs().getCount(), 0);
+    Assert.assertEquals(cat.getMetrics().getTotalAddCalls().getValue().longValue(), 2);
+    Assert.assertEquals(cat.getMetrics().getTotalUpdateCalls().getValue().longValue(), 1);
+    Assert.assertEquals(cat.getMetrics().getTotalDeleteCalls().getValue().longValue(), 0);
     ma.assertEvent(Predicates.and(
         MetricsAssert.eqEventNamespace(JobCatalog.class.getName()),
         MetricsAssert.eqEventName(JobCatalog.StandardMetrics.TRACKING_EVENT_NAME),
@@ -139,9 +139,9 @@ public class TestInMemoryJobCatalog {
 
     cat.put(js1_3);
     Assert.assertEquals(cat.getMetrics().getNumActiveJobs().getValue().intValue(), 2);
-    Assert.assertEquals(cat.getMetrics().getNumAddedJobs().getCount(), 2);
-    Assert.assertEquals(cat.getMetrics().getNumUpdatedJobs().getCount(), 2);
-    Assert.assertEquals(cat.getMetrics().getNumDeletedJobs().getCount(), 0);
+    Assert.assertEquals(cat.getMetrics().getTotalAddCalls().getValue().longValue(), 2);
+    Assert.assertEquals(cat.getMetrics().getTotalUpdateCalls().getValue().longValue(), 2);
+    Assert.assertEquals(cat.getMetrics().getTotalDeleteCalls().getValue().longValue(), 0);
     ma.assertEvent(Predicates.and(
         MetricsAssert.eqEventNamespace(JobCatalog.class.getName()),
         MetricsAssert.eqEventName(JobCatalog.StandardMetrics.TRACKING_EVENT_NAME),
@@ -154,9 +154,9 @@ public class TestInMemoryJobCatalog {
 
     cat.remove(js2.getUri());
     Assert.assertEquals(cat.getMetrics().getNumActiveJobs().getValue().intValue(), 1);
-    Assert.assertEquals(cat.getMetrics().getNumAddedJobs().getCount(), 2);
-    Assert.assertEquals(cat.getMetrics().getNumUpdatedJobs().getCount(), 2);
-    Assert.assertEquals(cat.getMetrics().getNumDeletedJobs().getCount(), 1);
+    Assert.assertEquals(cat.getMetrics().getTotalAddCalls().getValue().longValue(), 2);
+    Assert.assertEquals(cat.getMetrics().getTotalUpdateCalls().getValue().longValue(), 2);
+    Assert.assertEquals(cat.getMetrics().getTotalDeleteCalls().getValue().longValue(), 1);
     ma.assertEvent(Predicates.and(
         MetricsAssert.eqEventNamespace(JobCatalog.class.getName()),
         MetricsAssert.eqEventName(JobCatalog.StandardMetrics.TRACKING_EVENT_NAME),
@@ -169,15 +169,15 @@ public class TestInMemoryJobCatalog {
 
     cat.remove(new URI("test:dummy_job"));
     Assert.assertEquals(cat.getMetrics().getNumActiveJobs().getValue().intValue(), 1);
-    Assert.assertEquals(cat.getMetrics().getNumAddedJobs().getCount(), 2);
-    Assert.assertEquals(cat.getMetrics().getNumUpdatedJobs().getCount(), 2);
-    Assert.assertEquals(cat.getMetrics().getNumDeletedJobs().getCount(), 1);
+    Assert.assertEquals(cat.getMetrics().getTotalAddCalls().getValue().longValue(), 2);
+    Assert.assertEquals(cat.getMetrics().getTotalUpdateCalls().getValue().longValue(), 2);
+    Assert.assertEquals(cat.getMetrics().getTotalDeleteCalls().getValue().longValue(), 1);
 
     cat.remove(js1_3.getUri());
     Assert.assertEquals(cat.getMetrics().getNumActiveJobs().getValue().intValue(), 0);
-    Assert.assertEquals(cat.getMetrics().getNumAddedJobs().getCount(), 2);
-    Assert.assertEquals(cat.getMetrics().getNumUpdatedJobs().getCount(), 2);
-    Assert.assertEquals(cat.getMetrics().getNumDeletedJobs().getCount(), 2);
+    Assert.assertEquals(cat.getMetrics().getTotalAddCalls().getValue().longValue(), 2);
+    Assert.assertEquals(cat.getMetrics().getTotalUpdateCalls().getValue().longValue(), 2);
+    Assert.assertEquals(cat.getMetrics().getTotalDeleteCalls().getValue().longValue(), 2);
     ma.assertEvent(Predicates.and(
         MetricsAssert.eqEventNamespace(JobCatalog.class.getName()),
         MetricsAssert.eqEventName(JobCatalog.StandardMetrics.TRACKING_EVENT_NAME),