You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ab...@apache.org on 2018/03/21 08:30:59 UTC
[41/50] incubator-gobblin git commit: [GOBBLIN-419] Add more metrics
for cluster job monitoring
[GOBBLIN-419] Add more metrics for cluster job monitoring
Closes #2296 from yukuai518/metrics
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/5e6bfb07
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/5e6bfb07
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/5e6bfb07
Branch: refs/heads/0.12.0
Commit: 5e6bfb079cfdd3d2026e7cd674dd6673933437d3
Parents: 4c15fde
Author: Kuai Yu <ku...@linkedin.com>
Authored: Wed Feb 28 09:04:36 2018 -0800
Committer: Hung Tran <hu...@linkedin.com>
Committed: Wed Feb 28 09:04:36 2018 -0800
----------------------------------------------------------------------
.../configuration/ConfigurationKeys.java | 5 +
.../gobblin/cluster/GobblinClusterManager.java | 31 ++---
.../cluster/GobblinHelixJobLauncher.java | 3 +
.../cluster/GobblinHelixJobScheduler.java | 125 +++++++------------
.../gobblin/cluster/GobblinTaskRunner.java | 82 +++++++++---
.../cluster/GobblinTaskRunnerMetrics.java | 78 ++++++++++++
.../instrumented/StandardMetricsBridge.java | 52 ++++----
.../service/StreamingKafkaSpecConsumer.java | 44 ++-----
.../apache/gobblin/runtime/TaskExecutor.java | 33 ++++-
.../apache/gobblin/runtime/api/JobCatalog.java | 39 +++---
.../runtime/api/JobExecutionLauncher.java | 6 +-
.../gobblin/runtime/api/MutableJobCatalog.java | 23 ++--
.../gobblin/runtime/api/MutableSpecCatalog.java | 23 ++--
.../apache/gobblin/runtime/api/SpecCatalog.java | 39 +++---
.../runtime/job_catalog/FSJobCatalog.java | 4 +-
.../job_catalog/ImmutableFSJobCatalog.java | 2 +-
.../runtime/job_catalog/JobCatalogBase.java | 12 +-
.../runtime/spec_catalog/FlowCatalog.java | 2 +-
.../runtime/spec_catalog/TopologyCatalog.java | 2 +-
.../modules/core/GobblinServiceManager.java | 28 +----
20 files changed, 359 insertions(+), 274 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5e6bfb07/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index c80ceaf..612fd8b 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -169,6 +169,9 @@ public class ConfigurationKeys {
public static final String WORK_UNIT_RETRY_POLICY_KEY = "workunit.retry.policy";
public static final String WORK_UNIT_RETRY_ENABLED_KEY = "workunit.retry.enabled";
+ public static final String WORK_UNIT_CREATION_TIME_IN_MILLIS = "workunit.creation.time.in.millis";
+ public static final String WORK_UNIT_CREATION_AND_RUN_INTERVAL = "workunit.creation.and.run.interval";
+
public static final String JOB_RUN_ONCE_KEY = "job.runonce";
public static final String JOB_DISABLED_KEY = "job.disabled";
public static final String JOB_JAR_FILES_KEY = "job.jars";
@@ -631,6 +634,8 @@ public class ConfigurationKeys {
public static final String METRICS_REPORT_INTERVAL_KEY = METRICS_CONFIGURATIONS_PREFIX + "report.interval";
public static final String DEFAULT_METRICS_REPORT_INTERVAL = Long.toString(TimeUnit.SECONDS.toMillis(30));
public static final String METRIC_CONTEXT_NAME_KEY = "metrics.context.name";
+ public static final String METRIC_TIMER_WINDOW_SIZE_IN_MINUTES = METRICS_CONFIGURATIONS_PREFIX + "timer.window.size.in.minutes";
+ public static final int DEFAULT_METRIC_TIMER_WINDOW_SIZE_IN_MINUTES = 15;
// File-based reporting
public static final String METRICS_REPORTING_FILE_ENABLED_KEY =
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5e6bfb07/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
index 3393df6..d57c61e 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
@@ -40,6 +40,7 @@ import org.apache.gobblin.configuration.State;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.instrumented.StandardMetricsBridge;
import org.apache.gobblin.metrics.ContextAwareHistogram;
+import org.apache.gobblin.metrics.ContextAwareMetric;
import org.apache.gobblin.metrics.GobblinMetrics;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.hadoop.conf.Configuration;
@@ -155,7 +156,7 @@ public class GobblinClusterManager implements ApplicationLauncher, StandardMetri
private GobblinHelixJobScheduler jobScheduler;
@Getter
private JobConfigurationManager jobConfigurationManager;
-
+
private final String clusterName;
private final Config config;
private final MetricContext metricContext;
@@ -165,7 +166,7 @@ public class GobblinClusterManager implements ApplicationLauncher, StandardMetri
this.clusterName = clusterName;
this.config = config;
this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(config), this.getClass());
- this.metrics = new Metrics(this.metricContext);
+ this.metrics = new Metrics(this.metricContext, this.config);
this.isStandaloneMode = ConfigUtils.getBoolean(config, GobblinClusterConfigurationKeys.STANDALONE_CLUSTER_MODE_KEY,
GobblinClusterConfigurationKeys.DEFAULT_STANDALONE_CLUSTER_MODE);
@@ -557,21 +558,6 @@ public class GobblinClusterManager implements ApplicationLauncher, StandardMetri
return GobblinMetrics.isEnabled(ConfigUtils.configToProperties(this.config));
}
- @Override
- public List<Tag<?>> generateTags(State state) {
- return ImmutableList.of();
- }
-
- @Override
- public void switchMetricContext(List<Tag<?>> tags) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void switchMetricContext(MetricContext context) {
- throw new UnsupportedOperationException();
- }
-
/**
* A custom implementation of {@link LiveInstanceChangeListener}.
*/
@@ -588,19 +574,16 @@ public class GobblinClusterManager implements ApplicationLauncher, StandardMetri
private class Metrics extends StandardMetrics {
public static final String CLUSTER_LEADERSHIP_CHANGE = "clusterLeadershipChange";
private ContextAwareHistogram clusterLeadershipChange;
- public Metrics(final MetricContext metricContext) {
- clusterLeadershipChange = metricContext.contextAwareHistogram(CLUSTER_LEADERSHIP_CHANGE, 1, TimeUnit.MINUTES);
+ public Metrics(final MetricContext metricContext, final Config config) {
+ int timeWindowSizeInMinutes = ConfigUtils.getInt(config, ConfigurationKeys.METRIC_TIMER_WINDOW_SIZE_IN_MINUTES, ConfigurationKeys.DEFAULT_METRIC_TIMER_WINDOW_SIZE_IN_MINUTES);
+ this.clusterLeadershipChange = metricContext.contextAwareHistogram(CLUSTER_LEADERSHIP_CHANGE, timeWindowSizeInMinutes, TimeUnit.MINUTES);
+ this.contextAwareMetrics.add(clusterLeadershipChange);
}
@Override
public String getName() {
return GobblinClusterManager.class.getName();
}
-
- @Override
- public Collection<ContextAwareHistogram> getHistograms() {
- return ImmutableList.of(this.clusterLeadershipChange);
- }
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5e6bfb07/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
index 62c9b3f..d502462 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
@@ -183,6 +183,9 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
@Override
protected void runWorkUnits(List<WorkUnit> workUnits) throws Exception {
try {
+ long workUnitStartTime = System.currentTimeMillis();
+ workUnits.forEach((k) -> k.setProp(ConfigurationKeys.WORK_UNIT_CREATION_TIME_IN_MILLIS, workUnitStartTime));
+
// Start the output TaskState collector service
this.taskStateCollectorService.startAsync().awaitRunning();
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5e6bfb07/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
index 141e3d1..48b12f2 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
@@ -28,13 +28,13 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.gobblin.util.ConfigUtils;
import org.apache.hadoop.fs.Path;
import org.apache.helix.HelixManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.eventbus.EventBus;
@@ -47,7 +47,7 @@ import org.apache.gobblin.cluster.event.NewJobConfigArrivalEvent;
import org.apache.gobblin.cluster.event.UpdateJobConfigArrivalEvent;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.instrumented.StandardMetricsBridge;
-import org.apache.gobblin.metrics.ContextAwareCounter;
+import org.apache.gobblin.metrics.ContextAwareMetric;
import org.apache.gobblin.metrics.ContextAwareGauge;
import org.apache.gobblin.metrics.ContextAwareTimer;
import org.apache.gobblin.metrics.GobblinMetrics;
@@ -121,21 +121,6 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe
}
@Override
- public List<Tag<?>> generateTags(org.apache.gobblin.configuration.State state) {
- return null;
- }
-
- @Override
- public void switchMetricContext(List<Tag<?>> tags) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void switchMetricContext(MetricContext context) {
- throw new UnsupportedOperationException();
- }
-
- @Override
public StandardMetrics getStandardMetrics() {
return metrics;
}
@@ -147,29 +132,25 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe
private final AtomicLong totalJobsCommitted;
private final AtomicLong totalJobsFailed;
private final AtomicLong totalJobsCancelled;
- private final ContextAwareGauge<Long> numJobsLaunched;
- private final ContextAwareGauge<Long> numJobsCompleted;
- private final ContextAwareGauge<Long> numJobsCommitted;
- private final ContextAwareGauge<Long> numJobsFailed;
- private final ContextAwareGauge<Long> numJobsCancelled;
- private final ContextAwareGauge<Integer> numJobsRunning;
-
- private final ContextAwareTimer timeForJobCompletion;
- private final ContextAwareTimer timeForJobFailure;
+
+ private final ContextAwareTimer timeForCompletedJobs;
+ private final ContextAwareTimer timeForFailedJobs;
+ private final ContextAwareTimer timeForCommittedJobs;
private final ContextAwareTimer timeBeforeJobScheduling;
private final ContextAwareTimer timeBeforeJobLaunching;
+ private final ContextAwareTimer timeBetwenJobSchedulingAndLaunching;
private final ThreadPoolExecutor threadPoolExecutor;
- private final ContextAwareGauge<Integer> executorActiveCount;
- private final ContextAwareGauge<Integer> executorMaximumPoolSize;
- private final ContextAwareGauge<Integer> executorPoolSize;
- private final ContextAwareGauge<Integer> executorCorePoolSize;
- private final ContextAwareGauge<Integer> executorQueueSize;
public Metrics(final MetricContext metricContext) {
// Thread executor reference from job scheduler
this.threadPoolExecutor = (ThreadPoolExecutor)GobblinHelixJobScheduler.this.jobExecutor;
+ // timer duration setup
+ int windowSize = ConfigUtils.getInt(ConfigUtils.propertiesToConfig(GobblinHelixJobScheduler.this.properties),
+ ConfigurationKeys.METRIC_TIMER_WINDOW_SIZE_IN_MINUTES,
+ ConfigurationKeys.DEFAULT_METRIC_TIMER_WINDOW_SIZE_IN_MINUTES);
+
// All historical counters
this.totalJobsLaunched = new AtomicLong(0);
this.totalJobsCompleted = new AtomicLong(0);
@@ -177,25 +158,34 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe
this.totalJobsFailed = new AtomicLong(0);
this.totalJobsCancelled = new AtomicLong(0);
- this.numJobsLaunched = metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_LAUNCHED, ()->this.totalJobsLaunched.get());
- this.numJobsCompleted = metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_COMPLETED, ()->this.totalJobsCompleted.get());
- this.numJobsCommitted = metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_COMMITTED, ()->this.totalJobsCommitted.get());
- this.numJobsFailed = metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_FAILED, ()->this.totalJobsFailed.get());
- this.numJobsCancelled = metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_CANCELLED, ()->this.totalJobsCancelled.get());
- this.numJobsRunning = metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_RUNNING,
- ()->(int)(Metrics.this.totalJobsLaunched.get() - Metrics.this.totalJobsCompleted.get()));
-
- this.timeForJobCompletion = metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_FOR_JOB_COMPLETION, 1, TimeUnit.MINUTES);
- this.timeForJobFailure = metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_FOR_JOB_FAILURE,1, TimeUnit.MINUTES);
- this.timeBeforeJobScheduling = metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_BEFORE_JOB_SCHEDULING, 1, TimeUnit.MINUTES);
- this.timeBeforeJobLaunching = metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_BEFORE_JOB_LAUNCHING, 1, TimeUnit.MINUTES);
+ this.contextAwareMetrics.add(metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_LAUNCHED, ()->this.totalJobsLaunched.get()));
+ this.contextAwareMetrics.add(metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_COMPLETED, ()->this.totalJobsCompleted.get()));
+ this.contextAwareMetrics.add(metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_COMMITTED, ()->this.totalJobsCommitted.get()));
+ this.contextAwareMetrics.add(metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_FAILED, ()->this.totalJobsFailed.get()));
+ this.contextAwareMetrics.add(metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_CANCELLED, ()->this.totalJobsCancelled.get()));
+ this.contextAwareMetrics.add(metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_RUNNING,
+ ()->(int)(Metrics.this.totalJobsLaunched.get() - Metrics.this.totalJobsCompleted.get())));
+
+ this.timeForCompletedJobs = metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_FOR_COMPLETED_JOBS, windowSize, TimeUnit.MINUTES);
+ this.timeForFailedJobs = metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_FOR_FAILED_JOBS, windowSize, TimeUnit.MINUTES);
+ this.timeForCommittedJobs = metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_FOR_COMMITTED_JOBS, windowSize, TimeUnit.MINUTES);
+ this.timeBeforeJobScheduling = metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_BEFORE_JOB_SCHEDULING, windowSize, TimeUnit.MINUTES);
+ this.timeBeforeJobLaunching = metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_BEFORE_JOB_LAUNCHING, windowSize, TimeUnit.MINUTES);
+ this.timeBetwenJobSchedulingAndLaunching = metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_BETWEEN_JOB_SCHEDULING_AND_LAUNCHING, windowSize, TimeUnit.MINUTES);
// executor metrics
- this.executorActiveCount = metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.EXECUTOR_ACTIVE_COUNT, ()->this.threadPoolExecutor.getActiveCount());
- this.executorMaximumPoolSize = metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.EXECUTOR_MAX_POOL_SIZE, ()->this.threadPoolExecutor.getMaximumPoolSize());
- this.executorPoolSize = metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.EXECUTOR_POOL_SIZE, ()->this.threadPoolExecutor.getPoolSize());
- this.executorCorePoolSize = metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.EXECUTOR_CORE_POOL_SIZE, ()->this.threadPoolExecutor.getCorePoolSize());
- this.executorQueueSize = metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.EXECUTOR_QUEUE_SIZE, ()->this.threadPoolExecutor.getQueue().size());
+ this.contextAwareMetrics.add(metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.EXECUTOR_ACTIVE_COUNT, ()->this.threadPoolExecutor.getActiveCount()));
+ this.contextAwareMetrics.add(metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.EXECUTOR_MAX_POOL_SIZE, ()->this.threadPoolExecutor.getMaximumPoolSize()));
+ this.contextAwareMetrics.add(metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.EXECUTOR_POOL_SIZE, ()->this.threadPoolExecutor.getPoolSize()));
+ this.contextAwareMetrics.add(metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.EXECUTOR_CORE_POOL_SIZE, ()->this.threadPoolExecutor.getCorePoolSize()));
+ this.contextAwareMetrics.add(metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.EXECUTOR_QUEUE_SIZE, ()->this.threadPoolExecutor.getQueue().size()));
+
+ this.contextAwareMetrics.add(timeForCommittedJobs);
+ this.contextAwareMetrics.add(timeForCompletedJobs);
+ this.contextAwareMetrics.add(timeForFailedJobs);
+ this.contextAwareMetrics.add(timeBeforeJobScheduling);
+ this.contextAwareMetrics.add(timeBeforeJobLaunching);
+ this.contextAwareMetrics.add(timeBetwenJobSchedulingAndLaunching);
}
private void updateTimeBeforeJobScheduling (Properties jobConfig) {
@@ -208,36 +198,13 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe
Instrumented.updateTimer(Optional.of(timeBeforeJobLaunching), System.currentTimeMillis() - jobCreationTime, TimeUnit.MILLISECONDS);
}
- @Override
- public String getName() {
- return GobblinHelixJobScheduler.class.getName();
- }
-
- @Override
- public Collection<ContextAwareGauge<?>> getGauges() {
- List<ContextAwareGauge<?>> list = Lists.newArrayList();
- list.add(numJobsRunning);
- list.add(numJobsLaunched);
- list.add(numJobsCompleted);
- list.add(numJobsCommitted);
- list.add(numJobsFailed);
- list.add(numJobsCancelled);
- list.add(executorActiveCount);
- list.add(executorMaximumPoolSize);
- list.add(executorPoolSize);
- list.add(executorCorePoolSize);
- list.add(executorQueueSize);
- return list;
+ private void updateTimeBetweenJobSchedulingAndJobLaunching (long scheduledTime, long launchingTime) {
+ Instrumented.updateTimer(Optional.of(timeBetwenJobSchedulingAndLaunching), launchingTime - scheduledTime, TimeUnit.MILLISECONDS);
}
@Override
- public Collection<ContextAwareCounter> getCounters() {
- return ImmutableList.of();
- }
-
- @Override
- public Collection<ContextAwareTimer> getTimers() {
- return ImmutableList.of(timeForJobCompletion, timeForJobFailure, timeBeforeJobScheduling, timeBeforeJobLaunching);
+ public String getName() {
+ return GobblinHelixJobScheduler.class.getName();
}
}
@@ -265,12 +232,13 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe
long startTime = jobContext.getJobState().getPropAsLong(START_TIME);
if (GobblinHelixJobScheduler.this.isInstrumentationEnabled()) {
metrics.totalJobsCompleted.incrementAndGet();
- Instrumented.updateTimer(Optional.of(metrics.timeForJobCompletion), System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
+ Instrumented.updateTimer(Optional.of(metrics.timeForCompletedJobs), System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
if (jobContext.getJobState().getState() == JobState.RunningState.FAILED) {
metrics.totalJobsFailed.incrementAndGet();
- Instrumented.updateTimer(Optional.of(metrics.timeForJobFailure), System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
+ Instrumented.updateTimer(Optional.of(metrics.timeForFailedJobs), System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
} else {
metrics.totalJobsCommitted.incrementAndGet();
+ Instrumented.updateTimer(Optional.of(metrics.timeForCommittedJobs), System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
}
}
}
@@ -382,17 +350,20 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe
private final String jobUri;
private final Properties jobConfig;
private final JobListener jobListener;
+ private final Long creationTimeInMillis;
public NonScheduledJobRunner(String jobUri, Properties jobConfig, JobListener jobListener) {
this.jobUri = jobUri;
this.jobConfig = jobConfig;
this.jobListener = jobListener;
+ this.creationTimeInMillis = System.currentTimeMillis();
}
@Override
public void run() {
try {
((MetricsTrackingListener)jobListener).metrics.updateTimeBeforeJobLaunching(this.jobConfig);
+ ((MetricsTrackingListener)jobListener).metrics.updateTimeBetweenJobSchedulingAndJobLaunching(this.creationTimeInMillis, System.currentTimeMillis());
GobblinHelixJobScheduler.this.runJob(this.jobConfig, this.jobListener);
// remove non-scheduled job catalog once done so it won't be re-executed
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5e6bfb07/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
index a3fddab..3ec40dc 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
@@ -36,6 +36,11 @@ import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.instrumented.StandardMetricsBridge;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.Tag;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -81,6 +86,9 @@ import org.apache.gobblin.util.HadoopUtils;
import org.apache.gobblin.util.JvmUtils;
import org.apache.gobblin.util.PathUtils;
+import javax.annotation.Nonnull;
+import lombok.Getter;
+
import static org.apache.gobblin.cluster.GobblinClusterConfigurationKeys.CLUSTER_WORK_DIR;
@@ -109,7 +117,7 @@ import static org.apache.gobblin.cluster.GobblinClusterConfigurationKeys.CLUSTER
* @author Yinan Li
*/
@Alpha
-public class GobblinTaskRunner {
+public class GobblinTaskRunner implements StandardMetricsBridge {
private static final Logger logger = LoggerFactory.getLogger(GobblinTaskRunner.class);
static final java.nio.file.Path CLUSTER_CONF_PATH = Paths.get("generated-gobblin-cluster.conf");
@@ -141,6 +149,8 @@ public class GobblinTaskRunner {
private final String applicationName;
private final String applicationId;
private final Path appWorkPath;
+ private final MetricContext metricContext;
+ private final StandardMetricsBridge.StandardMetrics metrics;
public GobblinTaskRunner(String applicationName, String helixInstanceName, String applicationId,
String taskRunnerId, Config config, Optional<Path> appWorkDirOptional)
@@ -160,8 +170,10 @@ public class GobblinTaskRunner {
initHelixManager();
this.containerMetrics = buildContainerMetrics();
-
- this.taskStateModelFactory = registerHelixTaskFactory();
+ TaskFactoryBuilder builder = new TaskFactoryBuilder(this.config);
+ this.taskStateModelFactory = createTaskStateModelFactory(builder.build());
+ this.metrics = builder.getTaskMetrics();
+ this.metricContext = builder.getMetricContext();
services.addAll(getServices());
if (services.isEmpty()) {
@@ -174,6 +186,38 @@ public class GobblinTaskRunner {
applicationName, helixInstanceName, applicationId, taskRunnerId, config, appWorkDirOptional);
}
+ private class TaskFactoryBuilder {
+ private final boolean isRunTaskInSeparateProcessEnabled;
+ private final TaskFactory taskFactory;
+ @Getter
+ private final MetricContext metricContext;
+ @Getter
+ private StandardMetricsBridge.StandardMetrics taskMetrics;
+
+ public TaskFactoryBuilder(Config config) {
+ isRunTaskInSeparateProcessEnabled = getIsRunTaskInSeparateProcessEnabled(config);
+ metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(config), this.getClass());
+ if (isRunTaskInSeparateProcessEnabled) {
+ logger.info("Running a task in a separate process is enabled.");
+ taskFactory = new HelixTaskFactory(GobblinTaskRunner.this.containerMetrics, CLUSTER_CONF_PATH, config);
+ taskMetrics = new GobblinTaskRunnerMetrics.JvmTaskRunnerMetrics();
+ } else {
+ Properties properties = ConfigUtils.configToProperties(config);
+ TaskExecutor taskExecutor = new TaskExecutor(properties);
+ taskFactory = getInProcessTaskFactory(taskExecutor);
+ taskMetrics = new GobblinTaskRunnerMetrics.InProcessTaskRunnerMetrics(taskExecutor, metricContext);
+ }
+ }
+
+ public TaskFactory build(){
+ return taskFactory;
+ }
+
+ private Boolean getIsRunTaskInSeparateProcessEnabled(Config config) {
+ return ConfigUtils.getBoolean(config, GobblinClusterConfigurationKeys.ENABLE_TASK_IN_SEPARATE_PROCESS, false);
+ }
+ }
+
private Path initAppWorkDir(Config config, Optional<Path> appWorkDirOptional) {
return appWorkDirOptional.isPresent() ? appWorkDirOptional.get() : GobblinClusterUtils
.getAppWorkDirPathFromConfig(config, this.fs, this.applicationName, this.applicationId);
@@ -189,19 +233,10 @@ public class GobblinTaskRunner {
this.helixInstanceName, InstanceType.PARTICIPANT, zkConnectionString);
}
- private TaskStateModelFactory registerHelixTaskFactory() {
+ private TaskStateModelFactory createTaskStateModelFactory(TaskFactory factory) {
Map<String, TaskFactory> taskFactoryMap = Maps.newHashMap();
- boolean isRunTaskInSeparateProcessEnabled = getIsRunTaskInSeparateProcessEnabled();
- TaskFactory taskFactory;
- if (isRunTaskInSeparateProcessEnabled) {
- logger.info("Running a task in a separate process is enabled.");
- taskFactory = new HelixTaskFactory(this.containerMetrics, CLUSTER_CONF_PATH, config);
- } else {
- taskFactory = getInProcessTaskFactory();
- }
-
- taskFactoryMap.put(GOBBLIN_TASK_FACTORY_NAME, taskFactory);
+ taskFactoryMap.put(GOBBLIN_TASK_FACTORY_NAME, factory);
TaskStateModelFactory taskStateModelFactory =
new TaskStateModelFactory(this.helixManager, taskFactoryMap);
this.helixManager.getStateMachineEngine()
@@ -209,14 +244,13 @@ public class GobblinTaskRunner {
return taskStateModelFactory;
}
- private TaskFactory getInProcessTaskFactory() {
+ private TaskFactory getInProcessTaskFactory(TaskExecutor taskExecutor) {
Properties properties = ConfigUtils.configToProperties(this.config);
URI rootPathUri = PathUtils.getRootPath(this.appWorkPath).toUri();
Config stateStoreJobConfig = ConfigUtils.propertiesToConfig(properties)
.withValue(ConfigurationKeys.STATE_STORE_FS_URI_KEY,
ConfigValueFactory.fromAnyRef(rootPathUri.toString()));
- TaskExecutor taskExecutor = new TaskExecutor(properties);
TaskStateTracker taskStateTracker = new GobblinHelixTaskStateTracker(properties);
services.add(taskExecutor);
@@ -385,6 +419,22 @@ public class GobblinTaskRunner {
}
}
+ @Override
+ public StandardMetrics getStandardMetrics() {
+ return this.metrics;
+ }
+
+ @Nonnull
+ @Override
+ public MetricContext getMetricContext() {
+ return this.metricContext;
+ }
+
+ @Override
+ public boolean isInstrumentationEnabled() {
+ return GobblinMetrics.isEnabled(this.config);
+ }
+
/**
* A custom {@link MessageHandlerFactory} for {@link ParticipantShutdownMessageHandler}s that handle messages
* of type "SHUTDOWN" for shutting down the participants.
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5e6bfb07/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunnerMetrics.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunnerMetrics.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunnerMetrics.java
new file mode 100644
index 0000000..51e8b36
--- /dev/null
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunnerMetrics.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.cluster;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import com.codahale.metrics.Metric;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.StandardMetricsBridge;
+import org.apache.gobblin.metrics.ContextAwareMetric;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.runtime.TaskExecutor;
+
+
+public class GobblinTaskRunnerMetrics {
+
+ static class InProcessTaskRunnerMetrics extends StandardMetricsBridge.StandardMetrics {
+ private TaskExecutor taskExecutor;
+ private static String CURRENT_QUEUED_TASK_COUNT = "currentQueuedTaskCount";
+ private static String HISTORICAL_QUEUED_TASK_COUNT = "historicalQueuedTaskCount";
+ private static String QUEUED_TASK_COUNT = "queuedTaskCount";
+ private static String CURRENT_QUEUED_TASK_TOTAL_TIME = "currentQueuedTaskTotalTime";
+ private static String HISTORICAL_QUEUED_TASK_TOTAL_TIME = "historicalQueuedTaskTotalTime";
+ private static String QUEUED_TASK_TOTAL_TIME = "queuedTaskTotalTime";
+ private static String FAILED_TASK_COUNT = "failedTaskCount";
+ private static String SUCCESSFUL_TASK_COUNT = "successfulTaskCount";
+ private static String RUNNING_TASK_COUNT = "runningTaskCount";
+
+ public InProcessTaskRunnerMetrics (TaskExecutor executor, MetricContext context) {
+ taskExecutor = executor;
+ contextAwareMetrics.add(context.newContextAwareGauge(CURRENT_QUEUED_TASK_COUNT, ()->this.taskExecutor.getCurrentQueuedTaskCount().longValue()));
+ contextAwareMetrics.add(context.newContextAwareGauge(CURRENT_QUEUED_TASK_TOTAL_TIME, ()->this.taskExecutor.getCurrentQueuedTaskTotalTime().longValue()));
+ contextAwareMetrics.add(context.newContextAwareGauge(HISTORICAL_QUEUED_TASK_COUNT, ()->this.taskExecutor.getHistoricalQueuedTaskCount().longValue()));
+ contextAwareMetrics.add(context.newContextAwareGauge(HISTORICAL_QUEUED_TASK_TOTAL_TIME, ()->this.taskExecutor.getHistoricalQueuedTaskTotalTime().longValue()));
+ contextAwareMetrics.add(context.newContextAwareGauge(QUEUED_TASK_COUNT, ()->this.taskExecutor.getQueuedTaskCount().longValue()));
+ contextAwareMetrics.add(context.newContextAwareGauge(QUEUED_TASK_TOTAL_TIME, ()->this.taskExecutor.getQueuedTaskTotalTime().longValue()));
+ contextAwareMetrics.add(context.newContextAwareGauge(FAILED_TASK_COUNT, ()->this.taskExecutor.getFailedTaskCount().getCount()));
+ contextAwareMetrics.add(context.newContextAwareGauge(SUCCESSFUL_TASK_COUNT, ()->this.taskExecutor.getSuccessfulTaskCount().getCount()));
+ contextAwareMetrics.add(context.newContextAwareGauge(RUNNING_TASK_COUNT, ()->this.taskExecutor.getRunningTaskCount().getCount()));
+
+ this.rawMetrics.put(ConfigurationKeys.WORK_UNIT_CREATION_AND_RUN_INTERVAL, this.taskExecutor.getTaskCreateAndRunTimer());
+ }
+
+ @Override
+ public String getName() {
+ return InProcessTaskRunnerMetrics.class.getName();
+ }
+ }
+
+ static class JvmTaskRunnerMetrics extends StandardMetricsBridge.StandardMetrics {
+ //TODO: add metrics to monitor the process execution status (will be revisited after process isolation work is done)
+ @Override
+ public String getName() {
+ return JvmTaskRunnerMetrics.class.getName();
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5e6bfb07/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/StandardMetricsBridge.java
----------------------------------------------------------------------
diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/StandardMetricsBridge.java b/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/StandardMetricsBridge.java
index 3993dce..355139b 100644
--- a/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/StandardMetricsBridge.java
+++ b/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/StandardMetricsBridge.java
@@ -18,15 +18,18 @@
package org.apache.gobblin.instrumented;
import java.util.Collection;
+import java.util.List;
+import java.util.Map;
-import org.apache.gobblin.metrics.ContextAwareCounter;
-import org.apache.gobblin.metrics.ContextAwareGauge;
-import org.apache.gobblin.metrics.ContextAwareHistogram;
-import org.apache.gobblin.metrics.ContextAwareMeter;
-import org.apache.gobblin.metrics.ContextAwareTimer;
-
+import com.codahale.metrics.Metric;
+import com.codahale.metrics.MetricSet;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.gobblin.metrics.ContextAwareMetric;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.Tag;
/**
* This interface indicates a class will expose its metrics to some external systems.
@@ -35,30 +38,37 @@ public interface StandardMetricsBridge extends Instrumentable {
StandardMetrics getStandardMetrics();
- public class StandardMetrics {
+ default void switchMetricContext(MetricContext context) {
+ throw new UnsupportedOperationException();
+ }
- public String getName() {
- return this.getClass().getName();
- }
+ default void switchMetricContext(List<Tag<?>> tags) {
+ throw new UnsupportedOperationException();
+ }
- public Collection<ContextAwareGauge<?>> getGauges() {
- return ImmutableList.of();
- }
+ default List<Tag<?>> generateTags(org.apache.gobblin.configuration.State state) {
+ return ImmutableList.of();
+ }
+
+ public class StandardMetrics implements MetricSet {
+ protected final List<ContextAwareMetric> contextAwareMetrics;
+ protected final Map<String, Metric> rawMetrics;
- public Collection<ContextAwareCounter> getCounters() {
- return ImmutableList.of();
+ public StandardMetrics() {
+ this.contextAwareMetrics = Lists.newArrayList();
+ this.rawMetrics = Maps.newHashMap();
}
- public Collection<ContextAwareMeter> getMeters() {
- return ImmutableList.of();
+ public String getName() {
+ return this.getClass().getName();
}
- public Collection<ContextAwareTimer> getTimers() {
- return ImmutableList.of();
+ public Collection<ContextAwareMetric> getContextAwareMetrics() {
+ return contextAwareMetrics;
}
- public Collection<ContextAwareHistogram> getHistograms() {
- return ImmutableList.of();
+ public Map<String, Metric> getMetrics() {
+ return rawMetrics;
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5e6bfb07/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java
index 5fd5413..6d8de39 100644
--- a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java
+++ b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java
@@ -34,6 +34,7 @@ import org.apache.commons.lang3.tuple.Pair;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.instrumented.StandardMetricsBridge;
import org.apache.gobblin.metrics.ContextAwareGauge;
+import org.apache.gobblin.metrics.ContextAwareMetric;
import org.apache.gobblin.metrics.GobblinMetrics;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.Tag;
@@ -199,12 +200,6 @@ public class StreamingKafkaSpecConsumer extends AbstractIdleService implements S
}
private class Metrics extends StandardMetricsBridge.StandardMetrics {
- private ContextAwareGauge<Integer> jobSpecQueueSize;
- private ContextAwareGauge<Long> jobSpecEnq;
- private ContextAwareGauge<Long> jobSpecDeq;
- private ContextAwareGauge<Long> jobSpecConsumed;
- private ContextAwareGauge<Long> jobSpecParseFailures;
-
private AtomicLong jobSpecEnqCount = new AtomicLong(0);
private AtomicLong jobSpecDeqCount = new AtomicLong(0);
@@ -215,12 +210,12 @@ public class StreamingKafkaSpecConsumer extends AbstractIdleService implements S
public static final String SPEC_CONSUMER_JOB_SPEC_PARSE_FAILURES = "specConsumerJobSpecParseFailures";
public Metrics(MetricContext context) {
- this.jobSpecQueueSize = context.newContextAwareGauge(SPEC_CONSUMER_JOB_SPEC_QUEUE_SIZE, ()->StreamingKafkaSpecConsumer.this._jobSpecQueue.size());
- this.jobSpecEnq = context.newContextAwareGauge(SPEC_CONSUMER_JOB_SPEC_ENQ, ()->jobSpecEnqCount.get());
- this.jobSpecDeq = context.newContextAwareGauge(SPEC_CONSUMER_JOB_SPEC_DEQ, ()->jobSpecDeqCount.get());
- this.jobSpecConsumed = context.newContextAwareGauge(SPEC_CONSUMER_JOB_SPEC_CONSUMED,
- ()->getNewSpecs() + getRemovedSpecs() + getMessageParseFailures());
- this.jobSpecParseFailures = context.newContextAwareGauge(SPEC_CONSUMER_JOB_SPEC_PARSE_FAILURES, ()->getMessageParseFailures());
+ this.contextAwareMetrics.add(context.newContextAwareGauge(SPEC_CONSUMER_JOB_SPEC_QUEUE_SIZE, ()->StreamingKafkaSpecConsumer.this._jobSpecQueue.size()));
+ this.contextAwareMetrics.add(context.newContextAwareGauge(SPEC_CONSUMER_JOB_SPEC_ENQ, ()->jobSpecEnqCount.get()));
+ this.contextAwareMetrics.add(context.newContextAwareGauge(SPEC_CONSUMER_JOB_SPEC_DEQ, ()->jobSpecDeqCount.get()));
+ this.contextAwareMetrics.add(context.newContextAwareGauge(SPEC_CONSUMER_JOB_SPEC_CONSUMED,
+ ()->getNewSpecs() + getRemovedSpecs() + getMessageParseFailures()));
+ this.contextAwareMetrics.add(context.newContextAwareGauge(SPEC_CONSUMER_JOB_SPEC_PARSE_FAILURES, ()->getMessageParseFailures()));
}
private long getNewSpecs() {
@@ -237,16 +232,6 @@ public class StreamingKafkaSpecConsumer extends AbstractIdleService implements S
return StreamingKafkaSpecConsumer.this._jobMonitor.getMessageParseFailures() != null?
StreamingKafkaSpecConsumer.this._jobMonitor.getMessageParseFailures().getCount():0;
}
-
- public Collection<ContextAwareGauge<?>> getGauges() {
- List list = Lists.newArrayList();
- list.add(jobSpecQueueSize);
- list.add(jobSpecEnq);
- list.add(jobSpecDeq);
- list.add(jobSpecConsumed);
- list.add(jobSpecParseFailures);
- return list;
- }
}
@Override
@@ -264,19 +249,4 @@ public class StreamingKafkaSpecConsumer extends AbstractIdleService implements S
public boolean isInstrumentationEnabled() {
return _isInstrumentedEnabled;
}
-
- @Override
- public List<Tag<?>> generateTags(org.apache.gobblin.configuration.State state) {
- return ImmutableList.of();
- }
-
- @Override
- public void switchMetricContext(List<Tag<?>> tags) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void switchMetricContext(MetricContext context) {
- throw new UnsupportedOperationException();
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5e6bfb07/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskExecutor.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskExecutor.java
index be78275..b868893 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskExecutor.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskExecutor.java
@@ -39,6 +39,8 @@ import com.codahale.metrics.Gauge;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricSet;
+import com.codahale.metrics.SlidingTimeWindowReservoir;
+import com.codahale.metrics.Timer;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
@@ -75,12 +77,15 @@ public class TaskExecutor extends AbstractIdleService {
private final ExecutorService forkExecutor;
// Task retry interval
+ @Getter
private final long retryIntervalInSeconds;
// The maximum number of items in the queued task time map.
+ @Getter
private final int queuedTaskTimeMaxSize;
// The maximum age of the items in the queued task time map.
+ @Getter
private final long queuedTaskTimeMaxAge ;
// Map of queued task ids to queue times. The key is the task id, the value is the time the task was queued. If the
@@ -96,32 +101,44 @@ public class TaskExecutor extends AbstractIdleService {
private long lastCalculationTime = 0;
// The total number of tasks currently queued and queued over the historical lookback period.
+ @Getter
private AtomicInteger queuedTaskCount = new AtomicInteger();
// The total number of tasks currently queued.
+ @Getter
private AtomicInteger currentQueuedTaskCount = new AtomicInteger();
// The total number of tasks queued over the historical lookback period.
+ @Getter
private AtomicInteger historicalQueuedTaskCount = new AtomicInteger();
// The total time tasks have currently been in the queue and were in the queue during the historical lookback period.
+ @Getter
private AtomicLong queuedTaskTotalTime = new AtomicLong();
// The total time tasks have currently been in the queue.
+ @Getter
private AtomicLong currentQueuedTaskTotalTime = new AtomicLong();
// The total time tasks have been in the queue during the historical lookback period.
+ @Getter
private AtomicLong historicalQueuedTaskTotalTime = new AtomicLong();
// Count of running tasks.
+ @Getter
private final Counter runningTaskCount = new Counter();
// Count of failed tasks.
+ @Getter
private final Meter successfulTaskCount = new Meter();
// Count of failed tasks.
+ @Getter
private final Meter failedTaskCount = new Meter();
+ @Getter
+ private final Timer taskCreateAndRunTimer;
+
// The metric set exposed from the task executor.
private final TaskExecutorQueueMetricSet metricSet = new TaskExecutorQueueMetricSet();
@@ -129,7 +146,7 @@ public class TaskExecutor extends AbstractIdleService {
* Constructor used internally.
*/
private TaskExecutor(int taskExecutorThreadPoolSize, int coreRetryThreadPoolSize, long retryIntervalInSeconds,
- int queuedTaskTimeMaxSize, long queuedTaskTimeMaxAge) {
+ int queuedTaskTimeMaxSize, long queuedTaskTimeMaxAge, int timerWindowSize) {
Preconditions.checkArgument(taskExecutorThreadPoolSize > 0, "Task executor thread pool size should be positive");
Preconditions.checkArgument(retryIntervalInSeconds > 0, "Task retry interval should be positive");
Preconditions.checkArgument(queuedTaskTimeMaxSize > 0, "Queued task time max size should be positive");
@@ -143,6 +160,7 @@ public class TaskExecutor extends AbstractIdleService {
this.retryIntervalInSeconds = retryIntervalInSeconds;
this.queuedTaskTimeMaxSize = queuedTaskTimeMaxSize;
this.queuedTaskTimeMaxAge = queuedTaskTimeMaxAge;
+ this.taskCreateAndRunTimer = new Timer(new SlidingTimeWindowReservoir(timerWindowSize, TimeUnit.MINUTES));
this.forkExecutor = ExecutorsUtils.loggingDecorator(
new ThreadPoolExecutor(
@@ -175,7 +193,9 @@ public class TaskExecutor extends AbstractIdleService {
Integer.parseInt(properties.getProperty(ConfigurationKeys.QUEUED_TASK_TIME_MAX_SIZE,
Integer.toString(ConfigurationKeys.DEFAULT_QUEUED_TASK_TIME_MAX_SIZE))),
Long.parseLong(properties.getProperty(ConfigurationKeys.QUEUED_TASK_TIME_MAX_AGE,
- Long.toString(ConfigurationKeys.DEFAULT_QUEUED_TASK_TIME_MAX_AGE))));
+ Long.toString(ConfigurationKeys.DEFAULT_QUEUED_TASK_TIME_MAX_AGE))),
+ Integer.parseInt(properties.getProperty(ConfigurationKeys.METRIC_TIMER_WINDOW_SIZE_IN_MINUTES,
+ Integer.toString(ConfigurationKeys.DEFAULT_METRIC_TIMER_WINDOW_SIZE_IN_MINUTES))));
}
/**
@@ -191,7 +211,9 @@ public class TaskExecutor extends AbstractIdleService {
conf.getInt(ConfigurationKeys.QUEUED_TASK_TIME_MAX_SIZE,
ConfigurationKeys.DEFAULT_QUEUED_TASK_TIME_MAX_SIZE),
conf.getLong(ConfigurationKeys.QUEUED_TASK_TIME_MAX_AGE,
- ConfigurationKeys.DEFAULT_QUEUED_TASK_TIME_MAX_AGE));
+ ConfigurationKeys.DEFAULT_QUEUED_TASK_TIME_MAX_AGE),
+ conf.getInt(ConfigurationKeys.METRIC_TIMER_WINDOW_SIZE_IN_MINUTES,
+ ConfigurationKeys.DEFAULT_METRIC_TIMER_WINDOW_SIZE_IN_MINUTES));
}
@Override
@@ -430,7 +452,12 @@ public class TaskExecutor extends AbstractIdleService {
private void onStart(long startTime) {
Long queueTime = queuedTasks.remove(this.underlyingTask.getTaskId());
+ long workUnitCreationTime = this.underlyingTask.getTaskContext().getTaskState().getPropAsLong(ConfigurationKeys.WORK_UNIT_CREATION_TIME_IN_MILLIS, 0);
long timeInQueue = startTime - queueTime;
+ long timeSinceWorkUnitCreation = startTime - workUnitCreationTime;
+
+ taskCreateAndRunTimer.update(timeSinceWorkUnitCreation, TimeUnit.MILLISECONDS);
+
LOG.debug(String.format("Task %s started. Saving queued time of %d ms to history.", underlyingTask.getTaskId(), timeInQueue));
queuedTaskTimeHistorical.putIfAbsent(System.currentTimeMillis(), timeInQueue);
runningTaskCount.inc();
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5e6bfb07/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java
index 42ecef3..6afa94a 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java
@@ -18,23 +18,27 @@ package org.apache.gobblin.runtime.api;
import java.net.URI;
import java.util.Collection;
+import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.instrumented.GobblinMetricsKeys;
import org.apache.gobblin.instrumented.Instrumentable;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.instrumented.StandardMetricsBridge;
-import org.apache.gobblin.metrics.ContextAwareCounter;
import org.apache.gobblin.metrics.ContextAwareGauge;
+import org.apache.gobblin.metrics.ContextAwareMetric;
import org.apache.gobblin.metrics.ContextAwareTimer;
import org.apache.gobblin.metrics.GobblinTrackingEvent;
import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.util.ConfigUtils;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@@ -76,6 +80,7 @@ public interface JobCatalog extends JobCatalogListenersContainer, Instrumentable
public static final String JOB_UPDATED_OPERATION_TYPE = "JobUpdated";
private final MetricContext metricsContext;
+ protected final int timeWindowSizeInMinutes;
@Getter private final AtomicLong totalAddedJobs;
@Getter private final AtomicLong totalDeletedJobs;
@Getter private final AtomicLong totalUpdatedJobs;
@@ -85,17 +90,28 @@ public interface JobCatalog extends JobCatalogListenersContainer, Instrumentable
@Getter private final ContextAwareGauge<Long> totalUpdateCalls;
@Getter private final ContextAwareGauge<Integer> numActiveJobs;
- public StandardMetrics(final JobCatalog jobCatalog) {
+ public StandardMetrics(final JobCatalog jobCatalog, Optional<Config> sysConfig) {
+ // timer window size
+ this.timeWindowSizeInMinutes = sysConfig.isPresent()?
+ ConfigUtils.getInt(sysConfig.get(), ConfigurationKeys.METRIC_TIMER_WINDOW_SIZE_IN_MINUTES, ConfigurationKeys.DEFAULT_METRIC_TIMER_WINDOW_SIZE_IN_MINUTES) :
+ ConfigurationKeys.DEFAULT_METRIC_TIMER_WINDOW_SIZE_IN_MINUTES;
+
this.metricsContext = jobCatalog.getMetricContext();
this.totalAddedJobs = new AtomicLong(0);
this.totalDeletedJobs = new AtomicLong(0);
this.totalUpdatedJobs = new AtomicLong(0);
- this.timeForJobCatalogGet = metricsContext.contextAwareTimer(TIME_FOR_JOB_CATALOG_GET, 1, TimeUnit.MINUTES);
+ this.timeForJobCatalogGet = metricsContext.contextAwareTimer(TIME_FOR_JOB_CATALOG_GET, timeWindowSizeInMinutes, TimeUnit.MINUTES);
this.totalAddCalls = metricsContext.newContextAwareGauge(TOTAL_ADD_CALLS, ()->this.totalAddedJobs.get());
this.totalUpdateCalls = metricsContext.newContextAwareGauge(TOTAL_UPDATE_CALLS, ()->this.totalUpdatedJobs.get());
this.totalDeleteCalls = metricsContext.newContextAwareGauge(TOTAL_DELETE_CALLS, ()->this.totalDeletedJobs.get());
this.numActiveJobs = metricsContext.newContextAwareGauge(NUM_ACTIVE_JOBS_NAME, ()->(int)(totalAddedJobs.get() - totalDeletedJobs.get()));
+
+ this.contextAwareMetrics.add(timeForJobCatalogGet);
+ this.contextAwareMetrics.add(totalAddCalls);
+ this.contextAwareMetrics.add(totalDeleteCalls);
+ this.contextAwareMetrics.add(totalUpdateCalls);
+ this.contextAwareMetrics.add(numActiveJobs);
}
public void updateGetJobTime(long startTime) {
@@ -136,20 +152,5 @@ public interface JobCatalog extends JobCatalogListenersContainer, Instrumentable
this.totalUpdatedJobs.incrementAndGet();
submitTrackingEvent(updatedJob, JOB_UPDATED_OPERATION_TYPE);
}
-
- @Override
- public Collection<ContextAwareGauge<?>> getGauges() {
- return ImmutableList.of(totalAddCalls, totalDeleteCalls, totalUpdateCalls, numActiveJobs);
- }
-
- @Override
- public Collection<ContextAwareCounter> getCounters() {
- return ImmutableList.of();
- }
-
- @Override
- public Collection<ContextAwareTimer> getTimers() {
- return ImmutableList.of(timeForJobCatalogGet);
- }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5e6bfb07/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java
index 3f50ee7..7bc9cc0 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java
@@ -44,10 +44,12 @@ public interface JobExecutionLauncher extends Instrumentable {
public static final String NUM_JOBS_CANCELLED = "numJobsCancelled";
public static final String NUM_JOBS_RUNNING = "numJobsRunning";
- public static final String TIMER_FOR_JOB_COMPLETION = "timerForJobCompletion";
- public static final String TIMER_FOR_JOB_FAILURE = "timerForJobFailure";
+ public static final String TIMER_FOR_COMPLETED_JOBS = "timeForCompletedJobs";
+ public static final String TIMER_FOR_FAILED_JOBS = "timeForFailedJobs";
+ public static final String TIMER_FOR_COMMITTED_JOBS = "timerForCommittedJobs";
public static final String TIMER_BEFORE_JOB_SCHEDULING = "timerBeforeJobScheduling";
public static final String TIMER_BEFORE_JOB_LAUNCHING = "timerBeforeJobLaunching";
+ public static final String TIMER_BETWEEN_JOB_SCHEDULING_AND_LAUNCHING = "timerBetwenJobSchedulingAndLaunching";
public static final String EXECUTOR_ACTIVE_COUNT = "executorActiveCount";
public static final String EXECUTOR_MAX_POOL_SIZE = "executorMaximumPoolSize";
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5e6bfb07/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableJobCatalog.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableJobCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableJobCatalog.java
index 8b6e98c..57dfce5 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableJobCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableJobCatalog.java
@@ -22,10 +22,14 @@ import java.util.Collection;
import java.util.concurrent.TimeUnit;
import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareMetric;
import org.apache.gobblin.metrics.ContextAwareTimer;
+import org.apache.gobblin.util.ConfigUtils;
import com.google.common.base.Optional;
+import com.typesafe.config.Config;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@@ -55,10 +59,12 @@ public interface MutableJobCatalog extends JobCatalog {
public static final String TIME_FOR_JOB_CATALOG_PUT = "timeForJobCatalogPut";
@Getter private final ContextAwareTimer timeForJobCatalogPut;
@Getter private final ContextAwareTimer timeForJobCatalogRemove;
- public MutableStandardMetrics(JobCatalog catalog) {
- super(catalog);
- timeForJobCatalogPut = catalog.getMetricContext().contextAwareTimer(TIME_FOR_JOB_CATALOG_PUT, 1, TimeUnit.MINUTES);
- timeForJobCatalogRemove = catalog.getMetricContext().contextAwareTimer(TIME_FOR_JOB_CATALOG_REMOVE, 1, TimeUnit.MINUTES);
+ public MutableStandardMetrics(JobCatalog catalog, Optional<Config> sysConfig) {
+ super(catalog, sysConfig);
+ timeForJobCatalogPut = catalog.getMetricContext().contextAwareTimer(TIME_FOR_JOB_CATALOG_PUT, timeWindowSizeInMinutes, TimeUnit.MINUTES);
+ timeForJobCatalogRemove = catalog.getMetricContext().contextAwareTimer(TIME_FOR_JOB_CATALOG_REMOVE, this.timeWindowSizeInMinutes, TimeUnit.MINUTES);
+ this.contextAwareMetrics.add(timeForJobCatalogPut);
+ this.contextAwareMetrics.add(timeForJobCatalogRemove);
}
public void updatePutJobTime(long startTime) {
@@ -70,14 +76,5 @@ public interface MutableJobCatalog extends JobCatalog {
log.info("updateRemoveJobTime...");
Instrumented.updateTimer(Optional.of(this.timeForJobCatalogRemove), System.currentTimeMillis() - startTime, TimeUnit.MILLISECONDS);
}
-
- @Override
- public Collection<ContextAwareTimer> getTimers() {
- Collection<ContextAwareTimer> all = new ArrayList<>();
- all.addAll(super.getTimers());
- all.add(this.timeForJobCatalogPut);
- all.add(this.timeForJobCatalogRemove);
- return all;
- }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5e6bfb07/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java
index 3aa16be..108a324 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java
@@ -22,10 +22,14 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
+import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareMetric;
import org.apache.gobblin.metrics.ContextAwareTimer;
+import org.apache.gobblin.util.ConfigUtils;
import com.google.common.base.Optional;
+import com.typesafe.config.Config;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@@ -55,10 +59,12 @@ public interface MutableSpecCatalog extends SpecCatalog {
public static final String TIME_FOR_SPEC_CATALOG_PUT = "timeForSpecCatalogPut";
@Getter private final ContextAwareTimer timeForSpecCatalogPut;
@Getter private final ContextAwareTimer timeForSpecCatalogRemove;
- public MutableStandardMetrics(SpecCatalog catalog) {
- super(catalog);
- timeForSpecCatalogPut = catalog.getMetricContext().contextAwareTimer(TIME_FOR_SPEC_CATALOG_PUT, 1, TimeUnit.MINUTES);
- timeForSpecCatalogRemove = catalog.getMetricContext().contextAwareTimer(TIME_FOR_SPEC_CATALOG_REMOVE, 1, TimeUnit.MINUTES);
+ public MutableStandardMetrics(SpecCatalog catalog, Optional<Config> sysConfig) {
+ super(catalog, sysConfig);
+ timeForSpecCatalogPut = catalog.getMetricContext().contextAwareTimer(TIME_FOR_SPEC_CATALOG_PUT, this.timeWindowSizeInMinutes, TimeUnit.MINUTES);
+ timeForSpecCatalogRemove = catalog.getMetricContext().contextAwareTimer(TIME_FOR_SPEC_CATALOG_REMOVE, this.timeWindowSizeInMinutes, TimeUnit.MINUTES);
+ this.contextAwareMetrics.add(timeForSpecCatalogPut);
+ this.contextAwareMetrics.add(timeForSpecCatalogRemove);
}
public void updatePutSpecTime(long startTime) {
@@ -70,14 +76,5 @@ public interface MutableSpecCatalog extends SpecCatalog {
log.info("updateRemoveSpecTime...");
Instrumented.updateTimer(Optional.of(this.timeForSpecCatalogRemove), System.currentTimeMillis() - startTime, TimeUnit.MILLISECONDS);
}
-
- @Override
- public Collection<ContextAwareTimer> getTimers() {
- Collection<ContextAwareTimer> all = new ArrayList<>();
- all.addAll(super.getTimers());
- all.add(this.timeForSpecCatalogPut);
- all.add(this.timeForSpecCatalogRemove);
- return all;
- }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5e6bfb07/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java
index 6e8510f..457be9a 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java
@@ -19,21 +19,25 @@ package org.apache.gobblin.runtime.api;
import java.net.URI;
import java.util.Collection;
+import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.instrumented.GobblinMetricsKeys;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.instrumented.StandardMetricsBridge;
-import org.apache.gobblin.metrics.ContextAwareCounter;
import org.apache.gobblin.metrics.ContextAwareGauge;
+import org.apache.gobblin.metrics.ContextAwareMetric;
import org.apache.gobblin.metrics.ContextAwareTimer;
import org.apache.gobblin.metrics.GobblinTrackingEvent;
import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.util.ConfigUtils;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@@ -70,6 +74,8 @@ public interface SpecCatalog extends SpecCatalogListenersContainer, StandardMetr
public static final String TIME_FOR_SPEC_CATALOG_GET = "timeForSpecCatalogGet";
private final MetricContext metricsContext;
+ protected final int timeWindowSizeInMinutes;
+
@Getter private final AtomicLong totalAddedSpecs;
@Getter private final AtomicLong totalDeletedSpecs;
@Getter private final AtomicLong totalUpdatedSpecs;
@@ -80,9 +86,13 @@ public interface SpecCatalog extends SpecCatalogListenersContainer, StandardMetr
@Getter private final ContextAwareTimer timeForSpecCatalogGet;
- public StandardMetrics(final SpecCatalog specCatalog) {
+ public StandardMetrics(final SpecCatalog specCatalog, Optional<Config> sysConfig) {
+ this.timeWindowSizeInMinutes = sysConfig.isPresent()?
+ ConfigUtils.getInt(sysConfig.get(), ConfigurationKeys.METRIC_TIMER_WINDOW_SIZE_IN_MINUTES, ConfigurationKeys.DEFAULT_METRIC_TIMER_WINDOW_SIZE_IN_MINUTES) :
+ ConfigurationKeys.DEFAULT_METRIC_TIMER_WINDOW_SIZE_IN_MINUTES;
+
this.metricsContext = specCatalog.getMetricContext();
- this.timeForSpecCatalogGet = metricsContext.contextAwareTimer(TIME_FOR_SPEC_CATALOG_GET, 1, TimeUnit.MINUTES);
+ this.timeForSpecCatalogGet = metricsContext.contextAwareTimer(TIME_FOR_SPEC_CATALOG_GET, timeWindowSizeInMinutes, TimeUnit.MINUTES);
this.totalAddedSpecs = new AtomicLong(0);
this.totalDeletedSpecs = new AtomicLong(0);
this.totalUpdatedSpecs = new AtomicLong(0);
@@ -95,6 +105,12 @@ public interface SpecCatalog extends SpecCatalogListenersContainer, StandardMetr
this.totalAddCalls = metricsContext.newContextAwareGauge(TOTAL_ADD_CALLS, ()->this.totalAddedSpecs.get());
this.totalUpdateCalls = metricsContext.newContextAwareGauge(TOTAL_UPDATE_CALLS, ()->this.totalUpdatedSpecs.get());
this.totalDeleteCalls = metricsContext.newContextAwareGauge(TOTAL_DELETE_CALLS, ()->this.totalDeletedSpecs.get());
+
+ this.contextAwareMetrics.add(numActiveSpecs);
+ this.contextAwareMetrics.add(totalAddCalls);
+ this.contextAwareMetrics.add(totalUpdateCalls);
+ this.contextAwareMetrics.add(totalDeleteCalls);
+ this.contextAwareMetrics.add(timeForSpecCatalogGet);
}
public void updateGetSpecTime(long startTime) {
@@ -102,21 +118,6 @@ public interface SpecCatalog extends SpecCatalogListenersContainer, StandardMetr
Instrumented.updateTimer(Optional.of(this.timeForSpecCatalogGet), System.currentTimeMillis() - startTime, TimeUnit.MILLISECONDS);
}
- @Override
- public Collection<ContextAwareGauge<?>> getGauges() {
- return ImmutableList.of(numActiveSpecs, totalAddCalls, totalUpdateCalls, totalDeleteCalls);
- }
-
- @Override
- public Collection<ContextAwareCounter> getCounters() {
- return ImmutableList.of();
- }
-
- @Override
- public Collection<ContextAwareTimer> getTimers() {
- return ImmutableList.of(this.timeForSpecCatalogGet);
- }
-
@Override public void onAddSpec(Spec addedSpec) {
this.totalAddedSpecs.incrementAndGet();
submitTrackingEvent(addedSpec, SPEC_ADDED_OPERATION_TYPE);
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5e6bfb07/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/FSJobCatalog.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/FSJobCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/FSJobCatalog.java
index 0f99235..f48b42c 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/FSJobCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/FSJobCatalog.java
@@ -87,9 +87,9 @@ public class FSJobCatalog extends ImmutableFSJobCatalog implements MutableJobCat
}
@Override
- protected JobCatalog.StandardMetrics createStandardMetrics() {
+ protected JobCatalog.StandardMetrics createStandardMetrics(Optional<Config> sysConfig) {
log.info("create standard metrics {} for {}", MutableStandardMetrics.class.getName(), this.getClass().getName());
- return new MutableStandardMetrics(this);
+ return new MutableStandardMetrics(this, sysConfig);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5e6bfb07/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/ImmutableFSJobCatalog.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/ImmutableFSJobCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/ImmutableFSJobCatalog.java
index 7162c81..626cac2 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/ImmutableFSJobCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/ImmutableFSJobCatalog.java
@@ -102,7 +102,7 @@ public class ImmutableFSJobCatalog extends JobCatalogBase implements JobCatalog
public ImmutableFSJobCatalog(Config sysConfig, PathAlterationObserver observer, Optional<MetricContext> parentMetricContext,
boolean instrumentationEnabled)
throws IOException {
- super(Optional.of(LOGGER), parentMetricContext, instrumentationEnabled);
+ super(Optional.of(LOGGER), parentMetricContext, instrumentationEnabled, Optional.of(sysConfig));
this.sysConfig = sysConfig;
ConfigAccessor cfgAccessor = new ConfigAccessor(this.sysConfig);
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5e6bfb07/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/JobCatalogBase.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/JobCatalogBase.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/JobCatalogBase.java
index d40c962..dcb0723 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/JobCatalogBase.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/JobCatalogBase.java
@@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.AbstractIdleService;
+import com.typesafe.config.Config;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.MetricContext;
@@ -63,13 +64,18 @@ public abstract class JobCatalogBase extends AbstractIdleService implements JobC
public JobCatalogBase(Optional<Logger> log, Optional<MetricContext> parentMetricContext,
boolean instrumentationEnabled) {
+ this(log, parentMetricContext, instrumentationEnabled, Optional.absent());
+ }
+
+ public JobCatalogBase(Optional<Logger> log, Optional<MetricContext> parentMetricContext,
+ boolean instrumentationEnabled, Optional<Config> sysConfig) {
this.log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass());
this.listeners = new JobCatalogListenersList(log);
if (instrumentationEnabled) {
MetricContext realParentCtx =
parentMetricContext.or(Instrumented.getMetricContext(new org.apache.gobblin.configuration.State(), getClass()));
this.metricContext = realParentCtx.childBuilder(JobCatalog.class.getSimpleName()).build();
- this.metrics = createStandardMetrics();
+ this.metrics = createStandardMetrics(sysConfig);
this.addListener(this.metrics);
}
else {
@@ -78,8 +84,8 @@ public abstract class JobCatalogBase extends AbstractIdleService implements JobC
}
}
- protected StandardMetrics createStandardMetrics() {
- return new StandardMetrics(this);
+ protected StandardMetrics createStandardMetrics(Optional<Config> sysConfig) {
+ return new StandardMetrics(this, sysConfig);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5e6bfb07/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
index 482825f..a91baed 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
@@ -87,7 +87,7 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
MetricContext realParentCtx =
parentMetricContext.or(Instrumented.getMetricContext(new org.apache.gobblin.configuration.State(), getClass()));
this.metricContext = realParentCtx.childBuilder(FlowCatalog.class.getSimpleName()).build();
- this.metrics = new MutableStandardMetrics(this);
+ this.metrics = new MutableStandardMetrics(this, Optional.of(config));
this.addListener(this.metrics);
}
else {
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5e6bfb07/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
index c334d2b..5c25a67 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
@@ -94,7 +94,7 @@ public class TopologyCatalog extends AbstractIdleService implements SpecCatalog,
MetricContext realParentCtx =
parentMetricContext.or(Instrumented.getMetricContext(new org.apache.gobblin.configuration.State(), getClass()));
this.metricContext = realParentCtx.childBuilder(TopologyCatalog.class.getSimpleName()).build();
- this.metrics = new SpecCatalog.StandardMetrics(this);
+ this.metrics = new SpecCatalog.StandardMetrics(this, Optional.of(config));
this.addListener(this.metrics);
}
else {
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5e6bfb07/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
index 2cbb113..3137c21 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
@@ -21,6 +21,7 @@ import org.apache.gobblin.configuration.State;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.instrumented.StandardMetricsBridge;
import org.apache.gobblin.metrics.ContextAwareHistogram;
+import org.apache.gobblin.metrics.ContextAwareMetric;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.service.FlowId;
@@ -155,7 +156,7 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
}
this.config = config;
this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(config), this.getClass());
- this.metrics = new Metrics(this.metricContext);
+ this.metrics = new Metrics(this.metricContext, this.config);
this.serviceId = serviceId;
this.serviceLauncher = new ServiceBasedAppLauncher(properties, serviceName);
@@ -463,31 +464,14 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
return false;
}
- @Override
- public List<Tag<?>> generateTags(State state) {
- return null;
- }
-
- @Override
- public void switchMetricContext(List<Tag<?>> tags) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void switchMetricContext(MetricContext context) {
- throw new UnsupportedOperationException();
- }
-
private class Metrics extends StandardMetrics {
public static final String SERVICE_LEADERSHIP_CHANGE = "serviceLeadershipChange";
private ContextAwareHistogram serviceLeadershipChange;
- public Metrics(final MetricContext metricContext) {
- serviceLeadershipChange = metricContext.contextAwareHistogram(SERVICE_LEADERSHIP_CHANGE, 1, TimeUnit.MINUTES);
- }
- @Override
- public Collection<ContextAwareHistogram> getHistograms() {
- return ImmutableList.of(this.serviceLeadershipChange);
+ public Metrics(final MetricContext metricContext, Config config) {
+ int timeWindowSizeInMinutes = ConfigUtils.getInt(config, ConfigurationKeys.METRIC_TIMER_WINDOW_SIZE_IN_MINUTES, ConfigurationKeys.DEFAULT_METRIC_TIMER_WINDOW_SIZE_IN_MINUTES);
+ this.serviceLeadershipChange = metricContext.contextAwareHistogram(SERVICE_LEADERSHIP_CHANGE, timeWindowSizeInMinutes, TimeUnit.MINUTES);
+ this.contextAwareMetrics.add(this.serviceLeadershipChange);
}
}