You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ab...@apache.org on 2017/12/24 00:29:32 UTC
incubator-gobblin git commit: [GOBBLIN-349] Add gauges for number of
job scheduler for gobblin cluster
Repository: incubator-gobblin
Updated Branches:
refs/heads/master 0fabaa7a4 -> ab034478c
[GOBBLIN-349] Add gauges for number of job scheduler for gobblin cluster
Change number counter to guage type for gobblin
cluster metrics
Fix build
Minor change
Closes #2212 from yukuai518/me
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/ab034478
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/ab034478
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/ab034478
Branch: refs/heads/master
Commit: ab034478c9dfa9eaa3d1b81f698fec3606833328
Parents: 0fabaa7
Author: Kuai Yu <ku...@linkedin.com>
Authored: Sun Dec 24 05:59:19 2017 +0530
Committer: Abhishek Tiwari <ab...@gmail.com>
Committed: Sun Dec 24 05:59:19 2017 +0530
----------------------------------------------------------------------
.../cluster/GobblinHelixJobScheduler.java | 80 ++++++++------------
.../apache/gobblin/runtime/api/JobCatalog.java | 68 +++++++----------
.../runtime/api/JobExecutionLauncher.java | 29 +++----
.../apache/gobblin/runtime/api/SpecCatalog.java | 65 +++++++---------
.../runtime/job_catalog/FSJobCatalog.java | 2 +-
.../job_catalog/NonObservingFSJobCatalog.java | 6 +-
.../job_catalog/TestInMemoryJobCatalog.java | 42 +++++-----
7 files changed, 126 insertions(+), 166 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ab034478/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
index 9fd5add..36ba542 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
@@ -20,15 +20,13 @@ package org.apache.gobblin.cluster;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collection;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
-import org.apache.gobblin.configuration.State;
-import org.apache.gobblin.metrics.ContextAwareHistogram;
import org.apache.hadoop.fs.Path;
import org.apache.helix.HelixManager;
import org.slf4j.Logger;
@@ -36,7 +34,6 @@ import org.slf4j.LoggerFactory;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
@@ -143,18 +140,18 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe
private class Metrics extends StandardMetrics {
- private final ContextAwareCounter numJobsLaunched;
- private final ContextAwareCounter numJobsCompleted;
- private final ContextAwareCounter numJobsCommitted;
- private final ContextAwareCounter numJobsFailed;
- private final ContextAwareCounter numJobsCancelled;
- private final ContextAwareHistogram histogramJobsLaunched;
- private final ContextAwareHistogram histogramJobsCompleted;
- private final ContextAwareHistogram histogramJobsCommitted;
- private final ContextAwareHistogram histogramJobsFailed;
- private final ContextAwareHistogram histogramJobsCancelled;
-
+ private final AtomicLong totalJobsLaunched;
+ private final AtomicLong totalJobsCompleted;
+ private final AtomicLong totalJobsCommitted;
+ private final AtomicLong totalJobsFailed;
+ private final AtomicLong totalJobsCancelled;
+ private final ContextAwareGauge<Long> numJobsLaunched;
+ private final ContextAwareGauge<Long> numJobsCompleted;
+ private final ContextAwareGauge<Long> numJobsCommitted;
+ private final ContextAwareGauge<Long> numJobsFailed;
+ private final ContextAwareGauge<Long> numJobsCancelled;
private final ContextAwareGauge<Integer> numJobsRunning;
+
private final ContextAwareTimer timeForJobCompletion;
private final ContextAwareTimer timeForJobFailure;
private final ContextAwareTimer timeBeforeJobScheduling;
@@ -162,21 +159,19 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe
public Metrics(final MetricContext metricContext) {
// All historical counters
- this.numJobsLaunched = metricContext.contextAwareCounter(JobExecutionLauncher.StandardMetrics.NUM_JOBS_LAUNCHED_COUNTER);
- this.numJobsCompleted = metricContext.contextAwareCounter(JobExecutionLauncher.StandardMetrics.NUM_JOBS_COMPLETED_COUNTER);
- this.numJobsCommitted = metricContext.contextAwareCounter(JobExecutionLauncher.StandardMetrics.NUM_JOBS_COMMITTED_COUNTER);
- this.numJobsFailed = metricContext.contextAwareCounter(JobExecutionLauncher.StandardMetrics.NUM_JOBS_FAILED_COUNTER);
- this.numJobsCancelled = metricContext.contextAwareCounter(JobExecutionLauncher.StandardMetrics.NUM_JOBS_CANCELLED_COUNTER);
-
- // Counters within last 1 minute
- this.histogramJobsLaunched = metricContext.contextAwareHistogram(JobExecutionLauncher.StandardMetrics.NUM_JOBS_LAUNCHED_HISTOGRAM, 1, TimeUnit.MINUTES);
- this.histogramJobsCompleted = metricContext.contextAwareHistogram(JobExecutionLauncher.StandardMetrics.NUM_JOBS_COMPLETED_HISTOGRAM, 1, TimeUnit.MINUTES);
- this.histogramJobsCommitted = metricContext.contextAwareHistogram(JobExecutionLauncher.StandardMetrics.NUM_JOBS_COMMITTED_HISTOGRAM, 1, TimeUnit.MINUTES);
- this.histogramJobsFailed = metricContext.contextAwareHistogram(JobExecutionLauncher.StandardMetrics.NUM_JOBS_FAILED_HISTOGRAM, 1, TimeUnit.MINUTES);
- this.histogramJobsCancelled = metricContext.contextAwareHistogram(JobExecutionLauncher.StandardMetrics.NUM_JOBS_CANCELLED_HISTOGRAM, 1, TimeUnit.MINUTES);
-
- this.numJobsRunning = metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_RUNNING_GAUGE,
- ()->(int)(Metrics.this.numJobsLaunched.getCount() - Metrics.this.numJobsCompleted.getCount()));
+ this.totalJobsLaunched = new AtomicLong(0);
+ this.totalJobsCompleted = new AtomicLong(0);
+ this.totalJobsCommitted = new AtomicLong(0);
+ this.totalJobsFailed = new AtomicLong(0);
+ this.totalJobsCancelled = new AtomicLong(0);
+
+ this.numJobsLaunched = metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_LAUNCHED, ()->this.totalJobsLaunched.get());
+ this.numJobsCompleted = metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_COMPLETED, ()->this.totalJobsCompleted.get());
+ this.numJobsCommitted = metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_COMMITTED, ()->this.totalJobsCommitted.get());
+ this.numJobsFailed = metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_FAILED, ()->this.totalJobsFailed.get());
+ this.numJobsCancelled = metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_CANCELLED, ()->this.totalJobsCancelled.get());
+ this.numJobsRunning = metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_RUNNING,
+ ()->(int)(Metrics.this.totalJobsLaunched.get() - Metrics.this.totalJobsCompleted.get()));
this.timeForJobCompletion = metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_FOR_JOB_COMPLETION, 1, TimeUnit.MINUTES);
this.timeForJobFailure = metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_FOR_JOB_FAILURE,1, TimeUnit.MINUTES);
@@ -201,29 +196,18 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe
@Override
public Collection<ContextAwareGauge<?>> getGauges() {
- return Collections.singleton(numJobsRunning);
+ return ImmutableList.of(numJobsRunning, numJobsLaunched, numJobsCompleted, numJobsCommitted, numJobsFailed, numJobsCancelled);
}
@Override
public Collection<ContextAwareCounter> getCounters() {
- List<ContextAwareCounter> counters = Lists.newArrayList();
- counters.add(numJobsLaunched);
- counters.add(numJobsCompleted);
- counters.add(numJobsCommitted);
- counters.add(numJobsFailed);
- counters.add(numJobsCancelled);
- return counters;
+ return ImmutableList.of();
}
@Override
public Collection<ContextAwareTimer> getTimers() {
return ImmutableList.of(timeForJobCompletion, timeForJobFailure, timeBeforeJobScheduling, timeBeforeJobLaunching);
}
-
- @Override
- public Collection<ContextAwareHistogram> getHistograms() {
- return ImmutableList.of(histogramJobsCompleted, histogramJobsLaunched, histogramJobsFailed, histogramJobsCancelled, histogramJobsCommitted);
- }
}
private class MetricsTrackingListener extends AbstractJobListener {
@@ -239,7 +223,7 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe
super.onJobPrepare(jobContext);
jobContext.getJobState().setProp(START_TIME, Long.toString(System.nanoTime()));
if (GobblinHelixJobScheduler.this.isInstrumentationEnabled()) {
- metrics.numJobsLaunched.inc();
+ metrics.totalJobsLaunched.incrementAndGet();
}
}
@@ -249,13 +233,13 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe
super.onJobCompletion(jobContext);
long startTime = jobContext.getJobState().getPropAsLong(START_TIME);
if (GobblinHelixJobScheduler.this.isInstrumentationEnabled()) {
- metrics.numJobsCompleted.inc();
+ metrics.totalJobsCompleted.incrementAndGet();
Instrumented.updateTimer(Optional.of(metrics.timeForJobCompletion), System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
if (jobContext.getJobState().getState() == JobState.RunningState.FAILED) {
- metrics.numJobsFailed.inc();
+ metrics.totalJobsFailed.incrementAndGet();
Instrumented.updateTimer(Optional.of(metrics.timeForJobFailure), System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
} else {
- metrics.numJobsCommitted.inc();
+ metrics.totalJobsCommitted.incrementAndGet();
}
}
}
@@ -265,7 +249,7 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe
throws Exception {
super.onJobCancellation(jobContext);
if (GobblinHelixJobScheduler.this.isInstrumentationEnabled()) {
- metrics.numJobsCancelled.inc();
+ metrics.totalJobsCancelled.incrementAndGet();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ab034478/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java
index 6c0ea5b..950b86d 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java
@@ -18,10 +18,9 @@ package org.apache.gobblin.runtime.api;
import java.net.URI;
import java.util.Collection;
-import java.util.Collections;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
-import com.codahale.metrics.Gauge;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@@ -33,7 +32,6 @@ import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.instrumented.StandardMetricsBridge;
import org.apache.gobblin.metrics.ContextAwareCounter;
import org.apache.gobblin.metrics.ContextAwareGauge;
-import org.apache.gobblin.metrics.ContextAwareHistogram;
import org.apache.gobblin.metrics.ContextAwareTimer;
import org.apache.gobblin.metrics.GobblinTrackingEvent;
import org.apache.gobblin.metrics.MetricContext;
@@ -67,42 +65,42 @@ public interface JobCatalog extends JobCatalogListenersContainer, Instrumentable
@Slf4j
public static class StandardMetrics extends StandardMetricsBridge.StandardMetrics implements JobCatalogListener {
public static final String NUM_ACTIVE_JOBS_NAME = "numActiveJobs";
- public static final String NUM_ADDED_JOBS = "numAddedJobs";
- public static final String NUM_DELETED_JOBS = "numDeletedJobs";
- public static final String NUM_UPDATED_JOBS = "numUpdatedJobs";
+ public static final String TOTAL_ADD_CALLS = "totalAddCalls";
+ public static final String TOTAL_DELETE_CALLS = "totalDeleteCalls";
+ public static final String TOTAL_UPDATE_CALLS = "totalUpdateCalls";
public static final String TIME_FOR_JOB_CATALOG_GET = "timeForJobCatalogGet";
- public static final String HISTOGRAM_FOR_JOB_ADD = "histogramForJobAdd";
- public static final String HISTOGRAM_FOR_JOB_UPDATE = "histogramForJobUpdate";
- public static final String HISTOGRAM_FOR_JOB_DELETE = "histogramForJobDelete";
+
public static final String TRACKING_EVENT_NAME = "JobCatalogEvent";
public static final String JOB_ADDED_OPERATION_TYPE = "JobAdded";
public static final String JOB_DELETED_OPERATION_TYPE = "JobDeleted";
public static final String JOB_UPDATED_OPERATION_TYPE = "JobUpdated";
- @Getter private final ContextAwareGauge<Integer> numActiveJobs;
- @Getter private final ContextAwareCounter numAddedJobs;
- @Getter private final ContextAwareCounter numDeletedJobs;
- @Getter private final ContextAwareCounter numUpdatedJobs;
+ private final MetricContext metricsContext;
+ @Getter private final AtomicLong totalAddedJobs;
+ @Getter private final AtomicLong totalDeletedJobs;
+ @Getter private final AtomicLong totalUpdatedJobs;
@Getter private final ContextAwareTimer timeForJobCatalogGet;
- @Getter private final ContextAwareHistogram histogramForJobAdd;
- @Getter private final ContextAwareHistogram histogramForJobUpdate;
- @Getter private final ContextAwareHistogram histogramForJobDelete;
+ @Getter private final ContextAwareGauge<Long> totalAddCalls;
+ @Getter private final ContextAwareGauge<Long> totalDeleteCalls;
+ @Getter private final ContextAwareGauge<Long> totalUpdateCalls;
+ @Getter private final ContextAwareGauge<Integer> numActiveJobs;
public StandardMetrics(final JobCatalog jobCatalog) {
- MetricContext context = jobCatalog.getMetricContext();
- this.timeForJobCatalogGet = context.contextAwareTimer(TIME_FOR_JOB_CATALOG_GET, 1, TimeUnit.MINUTES);
- this.numAddedJobs = context.contextAwareCounter(NUM_ADDED_JOBS);
- this.numDeletedJobs = context.contextAwareCounter(NUM_DELETED_JOBS);
- this.numUpdatedJobs = context.contextAwareCounter(NUM_UPDATED_JOBS);
- this.numActiveJobs = context.newContextAwareGauge(NUM_ACTIVE_JOBS_NAME, ()->{
+ this.metricsContext = jobCatalog.getMetricContext();
+ this.totalAddedJobs = new AtomicLong(0);
+ this.totalDeletedJobs = new AtomicLong(0);
+ this.totalUpdatedJobs = new AtomicLong(0);
+
+ this.timeForJobCatalogGet = metricsContext.contextAwareTimer(TIME_FOR_JOB_CATALOG_GET, 1, TimeUnit.MINUTES);
+ this.totalAddCalls = metricsContext.newContextAwareGauge(TOTAL_ADD_CALLS, ()->this.totalAddedJobs.get());
+ this.totalUpdateCalls = metricsContext.newContextAwareGauge(TOTAL_UPDATE_CALLS, ()->this.totalUpdatedJobs.get());
+ this.totalDeleteCalls = metricsContext.newContextAwareGauge(TOTAL_DELETE_CALLS, ()->this.totalDeletedJobs.get());
+ this.numActiveJobs = metricsContext.newContextAwareGauge(NUM_ACTIVE_JOBS_NAME, ()->{
long startTime = System.currentTimeMillis();
int size = jobCatalog.getJobs().size();
updateGetJobTime(startTime);
return size;
});
- this.histogramForJobAdd = context.contextAwareHistogram(HISTOGRAM_FOR_JOB_ADD, 1, TimeUnit.MINUTES);
- this.histogramForJobUpdate = context.contextAwareHistogram(HISTOGRAM_FOR_JOB_UPDATE, 1, TimeUnit.MINUTES);
- this.histogramForJobDelete = context.contextAwareHistogram(HISTOGRAM_FOR_JOB_DELETE, 1, TimeUnit.MINUTES);
}
public void updateGetJobTime(long startTime) {
@@ -111,8 +109,7 @@ public interface JobCatalog extends JobCatalogListenersContainer, Instrumentable
}
@Override public void onAddJob(JobSpec addedJob) {
- this.numAddedJobs.inc();
- this.histogramForJobAdd.update(1);
+ this.totalAddedJobs.incrementAndGet();
submitTrackingEvent(addedJob, JOB_ADDED_OPERATION_TYPE);
}
@@ -130,41 +127,34 @@ public interface JobCatalog extends JobCatalogListenersContainer, Instrumentable
.put(GobblinMetricsKeys.JOB_SPEC_VERSION_META, jobSpecVersion)
.build())
.build();
- this.numAddedJobs.getContext().submitEvent(e);
+ this.metricsContext.submitEvent(e);
}
@Override
public void onDeleteJob(URI deletedJobURI, String deletedJobVersion) {
- this.numDeletedJobs.inc();
- this.histogramForJobDelete.update(1);
+ this.totalDeletedJobs.incrementAndGet();
submitTrackingEvent(deletedJobURI, deletedJobVersion, JOB_DELETED_OPERATION_TYPE);
}
@Override
public void onUpdateJob(JobSpec updatedJob) {
- this.numUpdatedJobs.inc();
- this.histogramForJobUpdate.update(1);
+ this.totalUpdatedJobs.incrementAndGet();
submitTrackingEvent(updatedJob, JOB_UPDATED_OPERATION_TYPE);
}
@Override
public Collection<ContextAwareGauge<?>> getGauges() {
- return Collections.singleton(this.numActiveJobs);
+ return ImmutableList.of(totalAddCalls, totalDeleteCalls, totalUpdateCalls, numActiveJobs);
}
@Override
public Collection<ContextAwareCounter> getCounters() {
- return ImmutableList.of(numAddedJobs, numDeletedJobs, numUpdatedJobs);
+ return ImmutableList.of();
}
@Override
public Collection<ContextAwareTimer> getTimers() {
return ImmutableList.of(timeForJobCatalogGet);
}
-
- @Override
- public Collection<ContextAwareHistogram> getHistograms() {
- return ImmutableList.of(histogramForJobAdd, histogramForJobDelete, histogramForJobUpdate);
- }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ab034478/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java
index fb54139..a7e5878 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java
@@ -37,17 +37,12 @@ public interface JobExecutionLauncher extends Instrumentable {
StandardMetrics getMetrics();
public static class StandardMetrics {
- public static final String NUM_JOBS_LAUNCHED_COUNTER = "numJobsLaunched";
- public static final String NUM_JOBS_COMPLETED_COUNTER = "numJobsCompleted";
- public static final String NUM_JOBS_COMMITTED_COUNTER = "numJobsCommitted";
- public static final String NUM_JOBS_FAILED_COUNTER = "numJobsFailed";
- public static final String NUM_JOBS_CANCELLED_COUNTER = "numJobsCancelled";
- public static final String NUM_JOBS_RUNNING_GAUGE = "numJobsRunning";
- public static final String NUM_JOBS_LAUNCHED_HISTOGRAM = "histogramJobsLaunched";
- public static final String NUM_JOBS_COMPLETED_HISTOGRAM = "histogramJobsCompleted";
- public static final String NUM_JOBS_COMMITTED_HISTOGRAM = "histogramJobsCommitted";
- public static final String NUM_JOBS_FAILED_HISTOGRAM = "histogramJobsFailed";
- public static final String NUM_JOBS_CANCELLED_HISTOGRAM = "histogramJobsCancelled";
+ public static final String NUM_JOBS_LAUNCHED = "numJobsLaunched";
+ public static final String NUM_JOBS_COMPLETED = "numJobsCompleted";
+ public static final String NUM_JOBS_COMMITTED = "numJobsCommitted";
+ public static final String NUM_JOBS_FAILED = "numJobsFailed";
+ public static final String NUM_JOBS_CANCELLED = "numJobsCancelled";
+ public static final String NUM_JOBS_RUNNING = "numJobsRunning";
public static final String TIMER_FOR_JOB_COMPLETION = "timerForJobCompletion";
public static final String TIMER_FOR_JOB_FAILURE = "timerForJobFailure";
@@ -70,12 +65,12 @@ public interface JobExecutionLauncher extends Instrumentable {
@Getter private final ContextAwareGauge<Integer> numJobsRunning;
public StandardMetrics(final JobExecutionLauncher parent) {
- this.numJobsLaunched = parent.getMetricContext().contextAwareCounter(NUM_JOBS_LAUNCHED_COUNTER);
- this.numJobsCompleted = parent.getMetricContext().contextAwareCounter(NUM_JOBS_COMPLETED_COUNTER);
- this.numJobsCommitted = parent.getMetricContext().contextAwareCounter(NUM_JOBS_COMMITTED_COUNTER);
- this.numJobsFailed = parent.getMetricContext().contextAwareCounter(NUM_JOBS_FAILED_COUNTER);
- this.numJobsCancelled = parent.getMetricContext().contextAwareCounter(NUM_JOBS_CANCELLED_COUNTER);
- this.numJobsRunning = parent.getMetricContext().newContextAwareGauge(NUM_JOBS_RUNNING_GAUGE,
+ this.numJobsLaunched = parent.getMetricContext().contextAwareCounter(NUM_JOBS_LAUNCHED);
+ this.numJobsCompleted = parent.getMetricContext().contextAwareCounter(NUM_JOBS_COMPLETED);
+ this.numJobsCommitted = parent.getMetricContext().contextAwareCounter(NUM_JOBS_COMMITTED);
+ this.numJobsFailed = parent.getMetricContext().contextAwareCounter(NUM_JOBS_FAILED);
+ this.numJobsCancelled = parent.getMetricContext().contextAwareCounter(NUM_JOBS_CANCELLED);
+ this.numJobsRunning = parent.getMetricContext().newContextAwareGauge(NUM_JOBS_RUNNING,
new Gauge<Integer>() {
@Override public Integer getValue() {
return (int)(StandardMetrics.this.getNumJobsLaunched().getCount() -
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ab034478/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java
index 4b85ea9..6e8510f 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java
@@ -19,10 +19,9 @@ package org.apache.gobblin.runtime.api;
import java.net.URI;
import java.util.Collection;
-import java.util.Collections;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
-import com.codahale.metrics.Gauge;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@@ -32,7 +31,6 @@ import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.instrumented.StandardMetricsBridge;
import org.apache.gobblin.metrics.ContextAwareCounter;
import org.apache.gobblin.metrics.ContextAwareGauge;
-import org.apache.gobblin.metrics.ContextAwareHistogram;
import org.apache.gobblin.metrics.ContextAwareTimer;
import org.apache.gobblin.metrics.GobblinTrackingEvent;
import org.apache.gobblin.metrics.MetricContext;
@@ -62,42 +60,41 @@ public interface SpecCatalog extends SpecCatalogListenersContainer, StandardMetr
@Slf4j
public static class StandardMetrics extends StandardMetricsBridge.StandardMetrics implements SpecCatalogListener {
public static final String NUM_ACTIVE_SPECS_NAME = "numActiveSpecs";
- public static final String NUM_ADDED_SPECS = "numAddedSpecs";
- public static final String NUM_DELETED_SPECS = "numDeletedSpecs";
- public static final String NUM_UPDATED_SPECS = "numUpdatedSpecs";
+ public static final String TOTAL_ADD_CALLS = "totalAddCalls";
+ public static final String TOTAL_DELETE_CALLS = "totalDeleteCalls";
+ public static final String TOTAL_UPDATE_CALLS = "totalUpdateCalls";
public static final String TRACKING_EVENT_NAME = "SpecCatalogEvent";
public static final String SPEC_ADDED_OPERATION_TYPE = "SpecAdded";
public static final String SPEC_DELETED_OPERATION_TYPE = "SpecDeleted";
public static final String SPEC_UPDATED_OPERATION_TYPE = "SpecUpdated";
public static final String TIME_FOR_SPEC_CATALOG_GET = "timeForSpecCatalogGet";
- public static final String HISTOGRAM_FOR_SPEC_ADD = "histogramForSpecAdd";
- public static final String HISTOGRAM_FOR_SPEC_UPDATE = "histogramForSpecUpdate";
- public static final String HISTOGRAM_FOR_SPEC_DELETE = "histogramForSpecDelete";
+ private final MetricContext metricsContext;
+ @Getter private final AtomicLong totalAddedSpecs;
+ @Getter private final AtomicLong totalDeletedSpecs;
+ @Getter private final AtomicLong totalUpdatedSpecs;
+ @Getter private final ContextAwareGauge<Long> totalAddCalls;
+ @Getter private final ContextAwareGauge<Long> totalDeleteCalls;
+ @Getter private final ContextAwareGauge<Long> totalUpdateCalls;
@Getter private final ContextAwareGauge<Integer> numActiveSpecs;
- @Getter private final ContextAwareCounter numAddedSpecs;
- @Getter private final ContextAwareCounter numDeletedSpecs;
- @Getter private final ContextAwareCounter numUpdatedSpecs;
+
@Getter private final ContextAwareTimer timeForSpecCatalogGet;
- @Getter private final ContextAwareHistogram histogramForSpecAdd;
- @Getter private final ContextAwareHistogram histogramForSpecUpdate;
- @Getter private final ContextAwareHistogram histogramForSpecDelete;
public StandardMetrics(final SpecCatalog specCatalog) {
- MetricContext context = specCatalog.getMetricContext();
- this.timeForSpecCatalogGet = context.contextAwareTimer(TIME_FOR_SPEC_CATALOG_GET, 1, TimeUnit.MINUTES);
- this.numAddedSpecs = context.contextAwareCounter(NUM_ADDED_SPECS);
- this.numDeletedSpecs = context.contextAwareCounter(NUM_DELETED_SPECS);
- this.numUpdatedSpecs = context.contextAwareCounter(NUM_UPDATED_SPECS);
- this.numActiveSpecs = context.newContextAwareGauge(NUM_ACTIVE_SPECS_NAME, ()->{
+ this.metricsContext = specCatalog.getMetricContext();
+ this.timeForSpecCatalogGet = metricsContext.contextAwareTimer(TIME_FOR_SPEC_CATALOG_GET, 1, TimeUnit.MINUTES);
+ this.totalAddedSpecs = new AtomicLong(0);
+ this.totalDeletedSpecs = new AtomicLong(0);
+ this.totalUpdatedSpecs = new AtomicLong(0);
+ this.numActiveSpecs = metricsContext.newContextAwareGauge(NUM_ACTIVE_SPECS_NAME, ()->{
long startTime = System.currentTimeMillis();
int size = specCatalog.getSpecs().size();
updateGetSpecTime(startTime);
return size;
});
- this.histogramForSpecAdd = context.contextAwareHistogram(HISTOGRAM_FOR_SPEC_ADD, 1, TimeUnit.MINUTES);
- this.histogramForSpecUpdate = context.contextAwareHistogram(HISTOGRAM_FOR_SPEC_UPDATE, 1, TimeUnit.MINUTES);
- this.histogramForSpecDelete = context.contextAwareHistogram(HISTOGRAM_FOR_SPEC_DELETE, 1, TimeUnit.MINUTES);
+ this.totalAddCalls = metricsContext.newContextAwareGauge(TOTAL_ADD_CALLS, ()->this.totalAddedSpecs.get());
+ this.totalUpdateCalls = metricsContext.newContextAwareGauge(TOTAL_UPDATE_CALLS, ()->this.totalUpdatedSpecs.get());
+ this.totalDeleteCalls = metricsContext.newContextAwareGauge(TOTAL_DELETE_CALLS, ()->this.totalDeletedSpecs.get());
}
public void updateGetSpecTime(long startTime) {
@@ -107,17 +104,12 @@ public interface SpecCatalog extends SpecCatalogListenersContainer, StandardMetr
@Override
public Collection<ContextAwareGauge<?>> getGauges() {
- return Collections.singleton(this.numActiveSpecs);
+ return ImmutableList.of(numActiveSpecs, totalAddCalls, totalUpdateCalls, totalDeleteCalls);
}
@Override
public Collection<ContextAwareCounter> getCounters() {
- return ImmutableList.of(numAddedSpecs, numDeletedSpecs, numUpdatedSpecs);
- }
-
- @Override
- public Collection<ContextAwareHistogram> getHistograms() {
- return ImmutableList.of(histogramForSpecAdd, histogramForSpecDelete, histogramForSpecUpdate);
+ return ImmutableList.of();
}
@Override
@@ -126,8 +118,7 @@ public interface SpecCatalog extends SpecCatalogListenersContainer, StandardMetr
}
@Override public void onAddSpec(Spec addedSpec) {
- this.numAddedSpecs.inc();
- this.histogramForSpecAdd.update(1);
+ this.totalAddedSpecs.incrementAndGet();
submitTrackingEvent(addedSpec, SPEC_ADDED_OPERATION_TYPE);
}
@@ -145,20 +136,18 @@ public interface SpecCatalog extends SpecCatalogListenersContainer, StandardMetr
.put(GobblinMetricsKeys.SPEC_VERSION_META, specSpecVersion)
.build())
.build();
- this.numAddedSpecs.getContext().submitEvent(e);
+ this.metricsContext.submitEvent(e);
}
@Override
public void onDeleteSpec(URI deletedSpecURI, String deletedSpecVersion) {
- this.numDeletedSpecs.inc();
- this.histogramForSpecDelete.update(1);
+ this.totalDeletedSpecs.incrementAndGet();
submitTrackingEvent(deletedSpecURI, deletedSpecVersion, SPEC_DELETED_OPERATION_TYPE);
}
@Override
public void onUpdateSpec(Spec updatedSpec) {
- this.numUpdatedSpecs.inc();
- this.histogramForSpecUpdate.update(1);
+ this.totalUpdatedSpecs.incrementAndGet();
submitTrackingEvent(updatedSpec, SPEC_UPDATED_OPERATION_TYPE);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ab034478/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/FSJobCatalog.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/FSJobCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/FSJobCatalog.java
index 8d89b97..0f99235 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/FSJobCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/FSJobCatalog.java
@@ -63,7 +63,7 @@ public class FSJobCatalog extends ImmutableFSJobCatalog implements MutableJobCat
private static final Logger LOGGER = LoggerFactory.getLogger(FSJobCatalog.class);
public static final String CONF_EXTENSION = ".conf";
private static final String FS_SCHEME = "FS";
- private final MutableStandardMetrics mutableMetrics;
+ protected final MutableStandardMetrics mutableMetrics;
/**
* Initialize the JobCatalog, fetch all jobs in jobConfDirPath.
* @param sysConfig
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ab034478/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/NonObservingFSJobCatalog.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/NonObservingFSJobCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/NonObservingFSJobCatalog.java
index eebf6f9..76adca4 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/NonObservingFSJobCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/NonObservingFSJobCatalog.java
@@ -77,11 +77,12 @@ public class NonObservingFSJobCatalog extends FSJobCatalog {
Preconditions.checkState(state() == State.RUNNING, String.format("%s is not running.", this.getClass().getName()));
Preconditions.checkNotNull(jobSpec);
try {
+ long startTime = System.currentTimeMillis();
Path jobSpecPath = getPathForURI(this.jobConfDirPath, jobSpec.getUri());
boolean isUpdate = fs.exists(jobSpecPath);
materializedJobSpec(jobSpecPath, jobSpec, this.fs);
-
+ this.mutableMetrics.updatePutJobTime(startTime);
if (isUpdate) {
this.listeners.onUpdateJob(jobSpec);
} else {
@@ -103,12 +104,13 @@ public class NonObservingFSJobCatalog extends FSJobCatalog {
public synchronized void remove(URI jobURI) {
Preconditions.checkState(state() == State.RUNNING, String.format("%s is not running.", this.getClass().getName()));
try {
+ long startTime = System.currentTimeMillis();
JobSpec jobSpec = getJobSpec(jobURI);
-
Path jobSpecPath = getPathForURI(this.jobConfDirPath, jobURI);
if (fs.exists(jobSpecPath)) {
fs.delete(jobSpecPath, false);
+ this.mutableMetrics.updateRemoveJobTime(startTime);
} else {
LOGGER.warn("No file with URI:" + jobSpecPath + " is found. Deletion failed.");
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ab034478/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_catalog/TestInMemoryJobCatalog.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_catalog/TestInMemoryJobCatalog.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_catalog/TestInMemoryJobCatalog.java
index 5920f78..026c888 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_catalog/TestInMemoryJobCatalog.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_catalog/TestInMemoryJobCatalog.java
@@ -94,9 +94,9 @@ public class TestInMemoryJobCatalog {
cat.put(js1_1);
Assert.assertEquals(cat.getMetrics().getNumActiveJobs().getValue().intValue(), 1);
- Assert.assertEquals(cat.getMetrics().getNumAddedJobs().getCount(), 1);
- Assert.assertEquals(cat.getMetrics().getNumUpdatedJobs().getCount(), 0);
- Assert.assertEquals(cat.getMetrics().getNumDeletedJobs().getCount(), 0);
+ Assert.assertEquals(cat.getMetrics().getTotalAddCalls().getValue().longValue(), 1);
+ Assert.assertEquals(cat.getMetrics().getTotalUpdateCalls().getValue().longValue(), 0);
+ Assert.assertEquals(cat.getMetrics().getTotalDeleteCalls().getValue().longValue(), 0);
ma.assertEvent(Predicates.and(
MetricsAssert.eqEventNamespace(JobCatalog.class.getName()),
MetricsAssert.eqEventName(JobCatalog.StandardMetrics.TRACKING_EVENT_NAME),
@@ -109,9 +109,9 @@ public class TestInMemoryJobCatalog {
cat.put(js1_2);
Assert.assertEquals(cat.getMetrics().getNumActiveJobs().getValue().intValue(), 1);
- Assert.assertEquals(cat.getMetrics().getNumAddedJobs().getCount(), 1);
- Assert.assertEquals(cat.getMetrics().getNumUpdatedJobs().getCount(), 1);
- Assert.assertEquals(cat.getMetrics().getNumDeletedJobs().getCount(), 0);
+ Assert.assertEquals(cat.getMetrics().getTotalAddCalls().getValue().longValue(), 1);
+ Assert.assertEquals(cat.getMetrics().getTotalUpdateCalls().getValue().longValue(), 1);
+ Assert.assertEquals(cat.getMetrics().getTotalDeleteCalls().getValue().longValue(), 0);
ma.assertEvent(Predicates.and(
MetricsAssert.eqEventNamespace(JobCatalog.class.getName()),
MetricsAssert.eqEventName(JobCatalog.StandardMetrics.TRACKING_EVENT_NAME),
@@ -124,9 +124,9 @@ public class TestInMemoryJobCatalog {
cat.put(js2);
Assert.assertEquals(cat.getMetrics().getNumActiveJobs().getValue().intValue(), 2);
- Assert.assertEquals(cat.getMetrics().getNumAddedJobs().getCount(), 2);
- Assert.assertEquals(cat.getMetrics().getNumUpdatedJobs().getCount(), 1);
- Assert.assertEquals(cat.getMetrics().getNumDeletedJobs().getCount(), 0);
+ Assert.assertEquals(cat.getMetrics().getTotalAddCalls().getValue().longValue(), 2);
+ Assert.assertEquals(cat.getMetrics().getTotalUpdateCalls().getValue().longValue(), 1);
+ Assert.assertEquals(cat.getMetrics().getTotalDeleteCalls().getValue().longValue(), 0);
ma.assertEvent(Predicates.and(
MetricsAssert.eqEventNamespace(JobCatalog.class.getName()),
MetricsAssert.eqEventName(JobCatalog.StandardMetrics.TRACKING_EVENT_NAME),
@@ -139,9 +139,9 @@ public class TestInMemoryJobCatalog {
cat.put(js1_3);
Assert.assertEquals(cat.getMetrics().getNumActiveJobs().getValue().intValue(), 2);
- Assert.assertEquals(cat.getMetrics().getNumAddedJobs().getCount(), 2);
- Assert.assertEquals(cat.getMetrics().getNumUpdatedJobs().getCount(), 2);
- Assert.assertEquals(cat.getMetrics().getNumDeletedJobs().getCount(), 0);
+ Assert.assertEquals(cat.getMetrics().getTotalAddCalls().getValue().longValue(), 2);
+ Assert.assertEquals(cat.getMetrics().getTotalUpdateCalls().getValue().longValue(), 2);
+ Assert.assertEquals(cat.getMetrics().getTotalDeleteCalls().getValue().longValue(), 0);
ma.assertEvent(Predicates.and(
MetricsAssert.eqEventNamespace(JobCatalog.class.getName()),
MetricsAssert.eqEventName(JobCatalog.StandardMetrics.TRACKING_EVENT_NAME),
@@ -154,9 +154,9 @@ public class TestInMemoryJobCatalog {
cat.remove(js2.getUri());
Assert.assertEquals(cat.getMetrics().getNumActiveJobs().getValue().intValue(), 1);
- Assert.assertEquals(cat.getMetrics().getNumAddedJobs().getCount(), 2);
- Assert.assertEquals(cat.getMetrics().getNumUpdatedJobs().getCount(), 2);
- Assert.assertEquals(cat.getMetrics().getNumDeletedJobs().getCount(), 1);
+ Assert.assertEquals(cat.getMetrics().getTotalAddCalls().getValue().longValue(), 2);
+ Assert.assertEquals(cat.getMetrics().getTotalUpdateCalls().getValue().longValue(), 2);
+ Assert.assertEquals(cat.getMetrics().getTotalDeleteCalls().getValue().longValue(), 1);
ma.assertEvent(Predicates.and(
MetricsAssert.eqEventNamespace(JobCatalog.class.getName()),
MetricsAssert.eqEventName(JobCatalog.StandardMetrics.TRACKING_EVENT_NAME),
@@ -169,15 +169,15 @@ public class TestInMemoryJobCatalog {
cat.remove(new URI("test:dummy_job"));
Assert.assertEquals(cat.getMetrics().getNumActiveJobs().getValue().intValue(), 1);
- Assert.assertEquals(cat.getMetrics().getNumAddedJobs().getCount(), 2);
- Assert.assertEquals(cat.getMetrics().getNumUpdatedJobs().getCount(), 2);
- Assert.assertEquals(cat.getMetrics().getNumDeletedJobs().getCount(), 1);
+ Assert.assertEquals(cat.getMetrics().getTotalAddCalls().getValue().longValue(), 2);
+ Assert.assertEquals(cat.getMetrics().getTotalUpdateCalls().getValue().longValue(), 2);
+ Assert.assertEquals(cat.getMetrics().getTotalDeleteCalls().getValue().longValue(), 1);
cat.remove(js1_3.getUri());
Assert.assertEquals(cat.getMetrics().getNumActiveJobs().getValue().intValue(), 0);
- Assert.assertEquals(cat.getMetrics().getNumAddedJobs().getCount(), 2);
- Assert.assertEquals(cat.getMetrics().getNumUpdatedJobs().getCount(), 2);
- Assert.assertEquals(cat.getMetrics().getNumDeletedJobs().getCount(), 2);
+ Assert.assertEquals(cat.getMetrics().getTotalAddCalls().getValue().longValue(), 2);
+ Assert.assertEquals(cat.getMetrics().getTotalUpdateCalls().getValue().longValue(), 2);
+ Assert.assertEquals(cat.getMetrics().getTotalDeleteCalls().getValue().longValue(), 2);
ma.assertEvent(Predicates.and(
MetricsAssert.eqEventNamespace(JobCatalog.class.getName()),
MetricsAssert.eqEventName(JobCatalog.StandardMetrics.TRACKING_EVENT_NAME),