You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ab...@apache.org on 2017/08/15 19:30:24 UTC
incubator-gobblin git commit: [GOBBLIN-202] Added JMX reporting of
task execution queue size and time. This can be used as a custom metric to
autoscale the cluster
Repository: incubator-gobblin
Updated Branches:
refs/heads/master d0ece1a04 -> dad2a8c2a
[GOBBLIN-202] Added JMX reporting of task execution queue size and time. This can be used as a custom metric to autoscale the cluster
Closes #2039 from
kadaan/JMX_metrics_for_Task_Executor
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/dad2a8c2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/dad2a8c2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/dad2a8c2
Branch: refs/heads/master
Commit: dad2a8c2a75e2b76edf0ee74195c4ba8e6a13f90
Parents: d0ece1a
Author: Joel Baranick <jo...@ensighten.com>
Authored: Tue Aug 15 12:30:19 2017 -0700
Committer: Abhishek Tiwari <ab...@gmail.com>
Committed: Tue Aug 15 12:30:19 2017 -0700
----------------------------------------------------------------------
.../configuration/ConfigurationKeys.java | 5 +
.../gobblin/cluster/GobblinTaskRunner.java | 5 +-
.../apache/gobblin/runtime/TaskExecutor.java | 233 ++++++++++++++++++-
.../runtime/services/JMXReportingService.java | 13 ++
4 files changed, 244 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/dad2a8c2/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 36921be..dd386ef 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -176,6 +176,11 @@ public class ConfigurationKeys {
public static final boolean DEFAULT_CLEANUP_STAGING_DATA_PER_TASK = true;
public static final String CLEANUP_STAGING_DATA_BY_INITIALIZER = "cleanup.staging.data.by.initializer";
+ public static final String QUEUED_TASK_TIME_MAX_SIZE = "taskexecutor.queued_task_time.history.max_size";
+ public static final int DEFAULT_QUEUED_TASK_TIME_MAX_SIZE = 2048;
+ public static final String QUEUED_TASK_TIME_MAX_AGE = "taskexecutor.queued_task_time.history.max_age";
+ public static final long DEFAULT_QUEUED_TASK_TIME_MAX_AGE = TimeUnit.HOURS.toMillis(1);
+
/** Optional, for user to specified which template to use, inside .job file */
public static final String JOB_TEMPLATE_PATH = "job.template" ;
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/dad2a8c2/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
index 838abe6..3ddbf55 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
@@ -60,6 +60,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.eventbus.EventBus;
@@ -69,6 +70,7 @@ import com.google.common.util.concurrent.ServiceManager;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.configuration.ConfigurationKeys;
@@ -159,7 +161,8 @@ public class GobblinTaskRunner {
Path appWorkDir = appWorkDirOptional.isPresent() ? appWorkDirOptional.get() :
GobblinClusterUtils.getAppWorkDirPath(this.fs, applicationName, applicationId);
- List<Service> services = Lists.newArrayList(taskExecutor, taskStateTracker, new JMXReportingService());
+ List<Service> services = Lists.newArrayList(taskExecutor, taskStateTracker,
+ new JMXReportingService(ImmutableMap.of("task.executor" ,taskExecutor.getTaskExecutorQueueMetricSet())));
services.addAll(getServices());
this.serviceManager = new ServiceManager(services);
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/dad2a8c2/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskExecutor.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskExecutor.java
index 2abf131..28ea378 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskExecutor.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskExecutor.java
@@ -17,7 +17,13 @@
package org.apache.gobblin.runtime;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
import java.util.Properties;
+import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -25,22 +31,32 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
-import org.apache.gobblin.runtime.fork.Fork;
-import org.apache.hadoop.conf.Configuration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.Metric;
+import com.codahale.metrics.MetricSet;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
import com.google.common.util.concurrent.AbstractIdleService;
+import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.runtime.fork.Fork;
import org.apache.gobblin.util.ExecutorsUtils;
import lombok.Getter;
+import static com.codahale.metrics.MetricRegistry.name;
+
/**
* A class for executing {@link Task}s and retrying failed ones as well as for executing {@link Fork}s.
@@ -61,12 +77,63 @@ public class TaskExecutor extends AbstractIdleService {
// Task retry interval
private final long retryIntervalInSeconds;
+ // The maximum number of items in the queued task time map.
+ private final int queuedTaskTimeMaxSize;
+
+ // The maximum age of the items in the queued task time map.
+ private final long queuedTaskTimeMaxAge ;
+
+ // Map of queued task ids to queue times. The key is the task id, the value is the time the task was queued. If the
+ // task is being retried, the time may be in the future. Entries with time in the future will not be counted as
+ // queued until the time is in the past.
+ private final Map<String, Long> queuedTasks = Maps.newConcurrentMap();
+
+ // Set of historical queued task times. The key is the UTC epoch time the task started, the value is the milliseconds
+ // the task waited to start.
+ private final ConcurrentSkipListMap<Long, Long> queuedTaskTimeHistorical = new ConcurrentSkipListMap<>();
+
+ // The timestamp for the last time the metric source data was pruned.
+ private long lastCleanupTime = 0;
+
+ // The total number of tasks currently queued and queued over the historical lookback period.
+ private AtomicInteger queuedTaskCount = new AtomicInteger();
+
+ // The total number of tasks currently queued.
+ private AtomicInteger currentQueuedTaskCount = new AtomicInteger();
+
+ // The total number of tasks queued over the historical lookback period.
+ private AtomicInteger historicalQueuedTaskCount = new AtomicInteger();
+
+ // The total time tasks have currently been in the queue and were in the queue during the historical lookback period.
+ private AtomicLong queuedTaskTotalTime = new AtomicLong();
+
+ // The total time tasks have currently been in the queue.
+ private AtomicLong currentQueuedTaskTotalTime = new AtomicLong();
+
+ // The total time tasks have been in the queue during the historical lookback period.
+ private AtomicLong historicalQueuedTaskTotalTime = new AtomicLong();
+
+ // Count of running tasks.
+ private final Counter runningTaskCount = new Counter();
+
+ // Count of failed tasks.
+ private final Meter successfulTaskCount = new Meter();
+
+ // Count of failed tasks.
+ private final Meter failedTaskCount = new Meter();
+
+ // The metric set exposed from the task executor.
+ private final TaskExecutorQueueMetricSet metricSet = new TaskExecutorQueueMetricSet();
+
/**
* Constructor used internally.
*/
- private TaskExecutor(int taskExecutorThreadPoolSize, int coreRetryThreadPoolSize, long retryIntervalInSeconds) {
+ private TaskExecutor(int taskExecutorThreadPoolSize, int coreRetryThreadPoolSize, long retryIntervalInSeconds,
+ int queuedTaskTimeMaxSize, long queuedTaskTimeMaxAge) {
Preconditions.checkArgument(taskExecutorThreadPoolSize > 0, "Task executor thread pool size should be positive");
Preconditions.checkArgument(retryIntervalInSeconds > 0, "Task retry interval should be positive");
+ Preconditions.checkArgument(queuedTaskTimeMaxSize > 0, "Queued task time max size should be positive");
+ Preconditions.checkArgument(queuedTaskTimeMaxAge > 0, "Queued task time max age should be positive");
// Currently a fixed-size thread pool is used to execute tasks. We probably need to revisit this later.
this.taskExecutor = Executors.newScheduledThreadPool(
@@ -74,6 +141,8 @@ public class TaskExecutor extends AbstractIdleService {
ExecutorsUtils.newThreadFactory(Optional.of(LOG), Optional.of("TaskExecutor-%d")));
this.retryIntervalInSeconds = retryIntervalInSeconds;
+ this.queuedTaskTimeMaxSize = queuedTaskTimeMaxSize;
+ this.queuedTaskTimeMaxAge = queuedTaskTimeMaxAge;
this.forkExecutor = new ThreadPoolExecutor(
// The core thread pool size is equal to that of the task executor as there's at least one fork per task
@@ -100,7 +169,11 @@ public class TaskExecutor extends AbstractIdleService {
Integer.parseInt(properties.getProperty(ConfigurationKeys.TASK_RETRY_THREAD_POOL_CORE_SIZE_KEY,
Integer.toString(ConfigurationKeys.DEFAULT_TASK_RETRY_THREAD_POOL_CORE_SIZE))),
Long.parseLong(properties.getProperty(ConfigurationKeys.TASK_RETRY_INTERVAL_IN_SEC_KEY,
- Long.toString(ConfigurationKeys.DEFAULT_TASK_RETRY_INTERVAL_IN_SEC))));
+ Long.toString(ConfigurationKeys.DEFAULT_TASK_RETRY_INTERVAL_IN_SEC))),
+ Integer.parseInt(properties.getProperty(ConfigurationKeys.QUEUED_TASK_TIME_MAX_SIZE,
+ Integer.toString(ConfigurationKeys.DEFAULT_QUEUED_TASK_TIME_MAX_SIZE))),
+ Long.parseLong(properties.getProperty(ConfigurationKeys.QUEUED_TASK_TIME_MAX_AGE,
+ Long.toString(ConfigurationKeys.DEFAULT_QUEUED_TASK_TIME_MAX_AGE))));
}
/**
@@ -112,7 +185,11 @@ public class TaskExecutor extends AbstractIdleService {
conf.getInt(ConfigurationKeys.TASK_RETRY_THREAD_POOL_CORE_SIZE_KEY,
ConfigurationKeys.DEFAULT_TASK_RETRY_THREAD_POOL_CORE_SIZE),
conf.getLong(ConfigurationKeys.TASK_RETRY_INTERVAL_IN_SEC_KEY,
- ConfigurationKeys.DEFAULT_TASK_RETRY_INTERVAL_IN_SEC));
+ ConfigurationKeys.DEFAULT_TASK_RETRY_INTERVAL_IN_SEC),
+ conf.getInt(ConfigurationKeys.QUEUED_TASK_TIME_MAX_SIZE,
+ ConfigurationKeys.DEFAULT_QUEUED_TASK_TIME_MAX_SIZE),
+ conf.getLong(ConfigurationKeys.QUEUED_TASK_TIME_MAX_AGE,
+ ConfigurationKeys.DEFAULT_QUEUED_TASK_TIME_MAX_AGE));
}
@Override
@@ -145,7 +222,7 @@ public class TaskExecutor extends AbstractIdleService {
*/
public void execute(Task task) {
LOG.info(String.format("Executing task %s", task.getTaskId()));
- this.taskExecutor.execute(task);
+ this.taskExecutor.execute(new TrackingTask(task));
}
/**
@@ -156,7 +233,7 @@ public class TaskExecutor extends AbstractIdleService {
*/
public Future<?> submit(Task task) {
LOG.info(String.format("Submitting task %s", task.getTaskId()));
- return this.taskExecutor.submit(task);
+ return this.taskExecutor.submit(new TrackingTask(task));
}
/**
@@ -196,8 +273,142 @@ public class TaskExecutor extends AbstractIdleService {
// Task retry interval increases linearly with number of retries
long interval = task.getRetryCount() * this.retryIntervalInSeconds;
// Schedule the retry of the failed task
- this.taskExecutor.schedule(task, interval, TimeUnit.SECONDS);
+ this.taskExecutor.schedule(new TrackingTask(task, interval, TimeUnit.SECONDS), interval, TimeUnit.SECONDS);
LOG.info(String.format("Scheduled retry of failed task %s to run in %d seconds", task.getTaskId(), interval));
task.incrementRetryCount();
}
+
+ public MetricSet getTaskExecutorQueueMetricSet() {
+ return this.metricSet;
+ }
+
+ private synchronized void cleanupMetricSources() {
+ long currentTimeMillis = System.currentTimeMillis();
+ if (lastCleanupTime < currentTimeMillis - TimeUnit.SECONDS.toMillis(10)) {
+ int currentQueuedTaskCount = 0;
+ long currentQueuedTaskTotalTime = 0;
+ for (Map.Entry<String, Long> queuedTask : this.queuedTasks.entrySet()) {
+ if (queuedTask.getValue() <= currentTimeMillis) {
+ currentQueuedTaskCount++;
+ currentQueuedTaskTotalTime += queuedTask.getValue();
+ }
+ }
+ this.currentQueuedTaskCount.set(currentQueuedTaskCount);
+ this.currentQueuedTaskTotalTime.set(currentQueuedTaskTotalTime);
+
+ int historicalQueuedTaskCount = 0;
+ long historicalQueuedTaskTotalTime = 0;
+ long cutoff = currentTimeMillis - queuedTaskTimeMaxAge;
+ Iterator<Map.Entry<Long, Long>> iterator = queuedTaskTimeHistorical.descendingMap().entrySet().iterator();
+ while (iterator.hasNext()) {
+ try {
+ Map.Entry<Long, Long> historicalQueuedTask = iterator.next();
+ if (historicalQueuedTask.getKey() < cutoff || historicalQueuedTaskCount >= queuedTaskTimeMaxSize) {
+ iterator.remove();
+ } else {
+ historicalQueuedTaskCount++;
+ historicalQueuedTaskTotalTime += historicalQueuedTask.getValue();
+ }
+ } catch (NoSuchElementException e) {
+ LOG.warn("Ran out of items in historical task queue time set.");
+ }
+ }
+ this.historicalQueuedTaskCount.set(historicalQueuedTaskCount);
+ this.historicalQueuedTaskTotalTime.set(historicalQueuedTaskTotalTime);
+
+ this.queuedTaskCount.set(currentQueuedTaskCount + historicalQueuedTaskCount);
+ this.queuedTaskTotalTime.set(currentQueuedTaskTotalTime + historicalQueuedTaskTotalTime);
+
+ this.lastCleanupTime = currentTimeMillis;
+ } else {
+ LOG.debug("Skipped cleanup of metrics sources because not enough time has passed since last cleanup.");
+ }
+ }
+
+ private class TaskExecutorQueueMetricSet implements MetricSet {
+ @Override
+ public Map<String, Metric> getMetrics() {
+ final Map<String, Metric> metrics = new HashMap<>();
+ metrics.put(name("queued", "current", "count"), new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ cleanupMetricSources();
+ return currentQueuedTaskCount.intValue();
+ }
+ });
+ metrics.put(name("queued", "historical", "count"), new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ cleanupMetricSources();
+ return historicalQueuedTaskCount.intValue();
+ }
+ });
+ metrics.put(name("queued", "count"), new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ cleanupMetricSources();
+ return queuedTaskCount.intValue();
+ }
+ });
+ metrics.put(name("queued", "current", "time", "total"), new Gauge<Long>() {
+ @Override
+ public Long getValue() {
+ cleanupMetricSources();
+ return currentQueuedTaskTotalTime.longValue();
+ }
+ });
+ metrics.put(name("queued", "historical", "time", "total"), new Gauge<Long>() {
+ @Override
+ public Long getValue() {
+ cleanupMetricSources();
+ return historicalQueuedTaskTotalTime.longValue();
+ }
+ });
+ metrics.put(name("queued", "time", "total"), new Gauge<Long>() {
+ @Override
+ public Long getValue() {
+ cleanupMetricSources();
+ return queuedTaskTotalTime.longValue();
+ }
+ });
+ metrics.put(name("running", "count"), runningTaskCount);
+ metrics.put(name("successful", "count"), successfulTaskCount);
+ metrics.put(name("failed", "count"), failedTaskCount);
+ return Collections.unmodifiableMap(metrics);
+ }
+ }
+
+ private class TrackingTask implements Runnable {
+ private Task underlyingTask;
+
+ public TrackingTask(Task task) {
+ this(task, 0, TimeUnit.SECONDS);
+ }
+
+ public TrackingTask(Task task, long interval, TimeUnit timeUnit) {
+ queuedTasks.putIfAbsent(task.getTaskId(), System.currentTimeMillis() + timeUnit.toMillis(interval));
+ this.underlyingTask = task;
+ }
+
+ @Override
+ public void run() {
+ long startTime = System.currentTimeMillis();
+ onStart(startTime);
+ try {
+ this.underlyingTask.run();
+ successfulTaskCount.mark();;
+ } catch (Exception e) {
+ failedTaskCount.mark();
+ throw e;
+ } finally {
+ runningTaskCount.dec();
+ }
+ }
+
+ private void onStart(long startTime) {
+ Long queueTime = queuedTasks.remove(this.underlyingTask.getTaskId());
+ queuedTaskTimeHistorical.putIfAbsent(System.currentTimeMillis(), startTime - queueTime);
+ runningTaskCount.inc();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/dad2a8c2/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/services/JMXReportingService.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/services/JMXReportingService.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/services/JMXReportingService.java
index 2f3800b..99d62ae 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/services/JMXReportingService.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/services/JMXReportingService.java
@@ -29,6 +29,7 @@ import com.codahale.metrics.jvm.GarbageCollectorMetricSet;
import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
import com.codahale.metrics.jvm.ThreadStatesGaugeSet;
+import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.AbstractIdleService;
@@ -51,11 +52,20 @@ import com.google.common.util.concurrent.AbstractIdleService;
public class JMXReportingService extends AbstractIdleService {
private final MetricRegistry metricRegistry = new MetricRegistry();
+ private Map<String, MetricSet> additionalMetricSets;
private final JmxReporter jmxReporter = JmxReporter.forRegistry(this.metricRegistry)
.convertRatesTo(TimeUnit.SECONDS)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.build();
+ public JMXReportingService() {
+ this(ImmutableMap.of());
+ }
+
+ public JMXReportingService(Map<String, MetricSet> additionalMetricSets) {
+ this.additionalMetricSets = additionalMetricSets;
+ }
+
@Override
protected void startUp() throws Exception {
registerJvmMetrics();
@@ -72,6 +82,9 @@ public class JMXReportingService extends AbstractIdleService {
registerMetricSetWithPrefix("jvm.memory", new MemoryUsageGaugeSet());
registerMetricSetWithPrefix("jvm.threads", new ThreadStatesGaugeSet());
this.metricRegistry.register("jvm.fileDescriptorRatio", new FileDescriptorRatioGauge());
+ for (Map.Entry<String, MetricSet> metricSet : this.additionalMetricSets.entrySet()) {
+ registerMetricSetWithPrefix(metricSet.getKey(), metricSet.getValue());
+ }
}
private void registerMetricSetWithPrefix(String prefix, MetricSet metricSet) {