You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ab...@apache.org on 2017/08/15 19:30:24 UTC

incubator-gobblin git commit: [GOBBLIN-202] Added JMX reporting of task execution queue size and time. This can be used as a custom metric to autoscale the cluster

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master d0ece1a04 -> dad2a8c2a


[GOBBLIN-202] Added JMX reporting of task execution queue size and time. This can be used as a custom metric to autoscale the cluster

Closes #2039 from
kadaan/JMX_metrics_for_Task_Executor


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/dad2a8c2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/dad2a8c2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/dad2a8c2

Branch: refs/heads/master
Commit: dad2a8c2a75e2b76edf0ee74195c4ba8e6a13f90
Parents: d0ece1a
Author: Joel Baranick <jo...@ensighten.com>
Authored: Tue Aug 15 12:30:19 2017 -0700
Committer: Abhishek Tiwari <ab...@gmail.com>
Committed: Tue Aug 15 12:30:19 2017 -0700

----------------------------------------------------------------------
 .../configuration/ConfigurationKeys.java        |   5 +
 .../gobblin/cluster/GobblinTaskRunner.java      |   5 +-
 .../apache/gobblin/runtime/TaskExecutor.java    | 233 ++++++++++++++++++-
 .../runtime/services/JMXReportingService.java   |  13 ++
 4 files changed, 244 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/dad2a8c2/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 36921be..dd386ef 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -176,6 +176,11 @@ public class ConfigurationKeys {
   public static final boolean DEFAULT_CLEANUP_STAGING_DATA_PER_TASK = true;
   public static final String CLEANUP_STAGING_DATA_BY_INITIALIZER = "cleanup.staging.data.by.initializer";
 
+  public static final String QUEUED_TASK_TIME_MAX_SIZE = "taskexecutor.queued_task_time.history.max_size";
+  public static final int DEFAULT_QUEUED_TASK_TIME_MAX_SIZE = 2048;
+  public static final String QUEUED_TASK_TIME_MAX_AGE = "taskexecutor.queued_task_time.history.max_age";
+  public static final long DEFAULT_QUEUED_TASK_TIME_MAX_AGE = TimeUnit.HOURS.toMillis(1);
+
   /** Optional, for user to specified which template to use, inside .job file */
   public static final String JOB_TEMPLATE_PATH = "job.template" ;
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/dad2a8c2/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
index 838abe6..3ddbf55 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
@@ -60,6 +60,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.eventbus.EventBus;
@@ -69,6 +70,7 @@ import com.google.common.util.concurrent.ServiceManager;
 
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
 
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.configuration.ConfigurationKeys;
@@ -159,7 +161,8 @@ public class GobblinTaskRunner {
     Path appWorkDir = appWorkDirOptional.isPresent() ? appWorkDirOptional.get() :
         GobblinClusterUtils.getAppWorkDirPath(this.fs, applicationName, applicationId);
 
-    List<Service> services = Lists.newArrayList(taskExecutor, taskStateTracker, new JMXReportingService());
+    List<Service> services = Lists.newArrayList(taskExecutor, taskStateTracker,
+        new JMXReportingService(ImmutableMap.of("task.executor" ,taskExecutor.getTaskExecutorQueueMetricSet())));
     services.addAll(getServices());
 
     this.serviceManager = new ServiceManager(services);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/dad2a8c2/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskExecutor.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskExecutor.java
index 2abf131..28ea378 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskExecutor.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskExecutor.java
@@ -17,7 +17,13 @@
 
 package org.apache.gobblin.runtime;
 
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
 import java.util.Properties;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -25,22 +31,32 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.gobblin.runtime.fork.Fork;
-import org.apache.hadoop.conf.Configuration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.Metric;
+import com.codahale.metrics.MetricSet;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.AbstractIdleService;
 
+import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.runtime.fork.Fork;
 import org.apache.gobblin.util.ExecutorsUtils;
 
 import lombok.Getter;
 
+import static com.codahale.metrics.MetricRegistry.name;
+
 
 /**
  * A class for executing {@link Task}s and retrying failed ones as well as for executing {@link Fork}s.
@@ -61,12 +77,63 @@ public class TaskExecutor extends AbstractIdleService {
   // Task retry interval
   private final long retryIntervalInSeconds;
 
+  // The maximum number of items in the queued task time map.
+  private final int queuedTaskTimeMaxSize;
+
+  // The maximum age of the items in the queued task time map.
+  private final long queuedTaskTimeMaxAge ;
+
+  // Map of queued task ids to queue times.  The key is the task id, the value is the time the task was queued.  If the
+  // task is being retried, the time may be in the future.  Entries with time in the future will not be counted as
+  // queued until the time is in the past.
+  private final Map<String, Long> queuedTasks = Maps.newConcurrentMap();
+
+  // Set of historical queued task times. The key is the UTC epoch time the task started, the value is the milliseconds
+  // the task waited to start.
+  private final ConcurrentSkipListMap<Long, Long> queuedTaskTimeHistorical = new ConcurrentSkipListMap<>();
+
+  // The timestamp for the last time the metric source data was pruned.
+  private long lastCleanupTime = 0;
+
+  // The total number of tasks currently queued and queued over the historical lookback period.
+  private AtomicInteger queuedTaskCount = new AtomicInteger();
+
+  // The total number of tasks currently queued.
+  private AtomicInteger currentQueuedTaskCount = new AtomicInteger();
+
+  // The total number of tasks queued over the historical lookback period.
+  private AtomicInteger historicalQueuedTaskCount = new AtomicInteger();
+
+  // The total time tasks have currently been in the queue and were in the queue during the historical lookback period.
+  private AtomicLong queuedTaskTotalTime = new AtomicLong();
+
+  // The total time tasks have currently been in the queue.
+  private AtomicLong currentQueuedTaskTotalTime = new AtomicLong();
+
+  // The total time tasks have been in the queue during the historical lookback period.
+  private AtomicLong historicalQueuedTaskTotalTime = new AtomicLong();
+
+  // Count of running tasks.
+  private final Counter runningTaskCount = new Counter();
+
+  // Count of failed tasks.
+  private final Meter successfulTaskCount = new Meter();
+
+  // Count of failed tasks.
+  private final Meter failedTaskCount = new Meter();
+
+  // The metric set exposed from the task executor.
+  private final TaskExecutorQueueMetricSet metricSet = new TaskExecutorQueueMetricSet();
+
   /**
    * Constructor used internally.
    */
-  private TaskExecutor(int taskExecutorThreadPoolSize, int coreRetryThreadPoolSize, long retryIntervalInSeconds) {
+  private TaskExecutor(int taskExecutorThreadPoolSize, int coreRetryThreadPoolSize, long retryIntervalInSeconds,
+                       int queuedTaskTimeMaxSize, long queuedTaskTimeMaxAge) {
     Preconditions.checkArgument(taskExecutorThreadPoolSize > 0, "Task executor thread pool size should be positive");
     Preconditions.checkArgument(retryIntervalInSeconds > 0, "Task retry interval should be positive");
+    Preconditions.checkArgument(queuedTaskTimeMaxSize > 0, "Queued task time max size should be positive");
+    Preconditions.checkArgument(queuedTaskTimeMaxAge > 0, "Queued task time max age should be positive");
 
     // Currently a fixed-size thread pool is used to execute tasks. We probably need to revisit this later.
     this.taskExecutor = Executors.newScheduledThreadPool(
@@ -74,6 +141,8 @@ public class TaskExecutor extends AbstractIdleService {
         ExecutorsUtils.newThreadFactory(Optional.of(LOG), Optional.of("TaskExecutor-%d")));
 
     this.retryIntervalInSeconds = retryIntervalInSeconds;
+    this.queuedTaskTimeMaxSize = queuedTaskTimeMaxSize;
+    this.queuedTaskTimeMaxAge = queuedTaskTimeMaxAge;
 
     this.forkExecutor = new ThreadPoolExecutor(
         // The core thread pool size is equal to that of the task executor as there's at least one fork per task
@@ -100,7 +169,11 @@ public class TaskExecutor extends AbstractIdleService {
         Integer.parseInt(properties.getProperty(ConfigurationKeys.TASK_RETRY_THREAD_POOL_CORE_SIZE_KEY,
             Integer.toString(ConfigurationKeys.DEFAULT_TASK_RETRY_THREAD_POOL_CORE_SIZE))),
         Long.parseLong(properties.getProperty(ConfigurationKeys.TASK_RETRY_INTERVAL_IN_SEC_KEY,
-            Long.toString(ConfigurationKeys.DEFAULT_TASK_RETRY_INTERVAL_IN_SEC))));
+            Long.toString(ConfigurationKeys.DEFAULT_TASK_RETRY_INTERVAL_IN_SEC))),
+        Integer.parseInt(properties.getProperty(ConfigurationKeys.QUEUED_TASK_TIME_MAX_SIZE,
+            Integer.toString(ConfigurationKeys.DEFAULT_QUEUED_TASK_TIME_MAX_SIZE))),
+        Long.parseLong(properties.getProperty(ConfigurationKeys.QUEUED_TASK_TIME_MAX_AGE,
+            Long.toString(ConfigurationKeys.DEFAULT_QUEUED_TASK_TIME_MAX_AGE))));
   }
 
   /**
@@ -112,7 +185,11 @@ public class TaskExecutor extends AbstractIdleService {
         conf.getInt(ConfigurationKeys.TASK_RETRY_THREAD_POOL_CORE_SIZE_KEY,
             ConfigurationKeys.DEFAULT_TASK_RETRY_THREAD_POOL_CORE_SIZE),
         conf.getLong(ConfigurationKeys.TASK_RETRY_INTERVAL_IN_SEC_KEY,
-            ConfigurationKeys.DEFAULT_TASK_RETRY_INTERVAL_IN_SEC));
+            ConfigurationKeys.DEFAULT_TASK_RETRY_INTERVAL_IN_SEC),
+        conf.getInt(ConfigurationKeys.QUEUED_TASK_TIME_MAX_SIZE,
+            ConfigurationKeys.DEFAULT_QUEUED_TASK_TIME_MAX_SIZE),
+        conf.getLong(ConfigurationKeys.QUEUED_TASK_TIME_MAX_AGE,
+            ConfigurationKeys.DEFAULT_QUEUED_TASK_TIME_MAX_AGE));
   }
 
   @Override
@@ -145,7 +222,7 @@ public class TaskExecutor extends AbstractIdleService {
    */
   public void execute(Task task) {
     LOG.info(String.format("Executing task %s", task.getTaskId()));
-    this.taskExecutor.execute(task);
+    this.taskExecutor.execute(new TrackingTask(task));
   }
 
   /**
@@ -156,7 +233,7 @@ public class TaskExecutor extends AbstractIdleService {
    */
   public Future<?> submit(Task task) {
     LOG.info(String.format("Submitting task %s", task.getTaskId()));
-    return this.taskExecutor.submit(task);
+    return this.taskExecutor.submit(new TrackingTask(task));
   }
 
   /**
@@ -196,8 +273,142 @@ public class TaskExecutor extends AbstractIdleService {
     // Task retry interval increases linearly with number of retries
     long interval = task.getRetryCount() * this.retryIntervalInSeconds;
     // Schedule the retry of the failed task
-    this.taskExecutor.schedule(task, interval, TimeUnit.SECONDS);
+    this.taskExecutor.schedule(new TrackingTask(task, interval, TimeUnit.SECONDS), interval, TimeUnit.SECONDS);
     LOG.info(String.format("Scheduled retry of failed task %s to run in %d seconds", task.getTaskId(), interval));
     task.incrementRetryCount();
   }
+
+  public MetricSet getTaskExecutorQueueMetricSet() {
+    return this.metricSet;
+  }
+
+  private synchronized void cleanupMetricSources() {
+    long currentTimeMillis = System.currentTimeMillis();
+    if (lastCleanupTime < currentTimeMillis - TimeUnit.SECONDS.toMillis(10)) {
+      int currentQueuedTaskCount = 0;
+      long currentQueuedTaskTotalTime = 0;
+      for (Map.Entry<String, Long> queuedTask : this.queuedTasks.entrySet()) {
+        if (queuedTask.getValue() <= currentTimeMillis) {
+          currentQueuedTaskCount++;
+          currentQueuedTaskTotalTime += queuedTask.getValue();
+        }
+      }
+      this.currentQueuedTaskCount.set(currentQueuedTaskCount);
+      this.currentQueuedTaskTotalTime.set(currentQueuedTaskTotalTime);
+
+      int historicalQueuedTaskCount = 0;
+      long historicalQueuedTaskTotalTime = 0;
+      long cutoff = currentTimeMillis - queuedTaskTimeMaxAge;
+      Iterator<Map.Entry<Long, Long>> iterator = queuedTaskTimeHistorical.descendingMap().entrySet().iterator();
+      while (iterator.hasNext()) {
+        try {
+          Map.Entry<Long, Long> historicalQueuedTask = iterator.next();
+          if (historicalQueuedTask.getKey() < cutoff || historicalQueuedTaskCount >= queuedTaskTimeMaxSize) {
+            iterator.remove();
+          } else {
+            historicalQueuedTaskCount++;
+            historicalQueuedTaskTotalTime += historicalQueuedTask.getValue();
+          }
+        } catch (NoSuchElementException e) {
+          LOG.warn("Ran out of items in historical task queue time set.");
+        }
+      }
+      this.historicalQueuedTaskCount.set(historicalQueuedTaskCount);
+      this.historicalQueuedTaskTotalTime.set(historicalQueuedTaskTotalTime);
+
+      this.queuedTaskCount.set(currentQueuedTaskCount + historicalQueuedTaskCount);
+      this.queuedTaskTotalTime.set(currentQueuedTaskTotalTime + historicalQueuedTaskTotalTime);
+
+      this.lastCleanupTime = currentTimeMillis;
+    } else {
+      LOG.debug("Skipped cleanup of metrics sources because not enough time has passed since last cleanup.");
+    }
+  }
+
+  private class TaskExecutorQueueMetricSet implements MetricSet {
+    @Override
+    public Map<String, Metric> getMetrics() {
+      final Map<String, Metric> metrics = new HashMap<>();
+      metrics.put(name("queued", "current", "count"), new Gauge<Integer>() {
+        @Override
+        public Integer getValue() {
+          cleanupMetricSources();
+          return currentQueuedTaskCount.intValue();
+        }
+      });
+      metrics.put(name("queued", "historical", "count"), new Gauge<Integer>() {
+        @Override
+        public Integer getValue() {
+          cleanupMetricSources();
+          return historicalQueuedTaskCount.intValue();
+        }
+      });
+      metrics.put(name("queued", "count"), new Gauge<Integer>() {
+        @Override
+        public Integer getValue() {
+          cleanupMetricSources();
+          return queuedTaskCount.intValue();
+        }
+      });
+      metrics.put(name("queued", "current", "time", "total"), new Gauge<Long>() {
+        @Override
+        public Long getValue() {
+          cleanupMetricSources();
+          return currentQueuedTaskTotalTime.longValue();
+        }
+      });
+      metrics.put(name("queued", "historical", "time", "total"), new Gauge<Long>() {
+        @Override
+        public Long getValue() {
+          cleanupMetricSources();
+          return historicalQueuedTaskTotalTime.longValue();
+        }
+      });
+      metrics.put(name("queued", "time", "total"), new Gauge<Long>() {
+        @Override
+        public Long getValue() {
+          cleanupMetricSources();
+          return queuedTaskTotalTime.longValue();
+        }
+      });
+      metrics.put(name("running", "count"), runningTaskCount);
+      metrics.put(name("successful", "count"), successfulTaskCount);
+      metrics.put(name("failed", "count"), failedTaskCount);
+      return Collections.unmodifiableMap(metrics);
+    }
+  }
+
+  private class TrackingTask implements Runnable {
+    private Task underlyingTask;
+
+    public TrackingTask(Task task) {
+      this(task, 0, TimeUnit.SECONDS);
+    }
+
+    public TrackingTask(Task task, long interval, TimeUnit timeUnit) {
+      queuedTasks.putIfAbsent(task.getTaskId(), System.currentTimeMillis() + timeUnit.toMillis(interval));
+      this.underlyingTask = task;
+    }
+
+    @Override
+    public void run() {
+      long startTime = System.currentTimeMillis();
+      onStart(startTime);
+      try {
+        this.underlyingTask.run();
+        successfulTaskCount.mark();;
+      } catch (Exception e) {
+        failedTaskCount.mark();
+        throw e;
+      } finally {
+        runningTaskCount.dec();
+      }
+    }
+
+    private void onStart(long startTime) {
+      Long queueTime = queuedTasks.remove(this.underlyingTask.getTaskId());
+      queuedTaskTimeHistorical.putIfAbsent(System.currentTimeMillis(), startTime - queueTime);
+      runningTaskCount.inc();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/dad2a8c2/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/services/JMXReportingService.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/services/JMXReportingService.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/services/JMXReportingService.java
index 2f3800b..99d62ae 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/services/JMXReportingService.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/services/JMXReportingService.java
@@ -29,6 +29,7 @@ import com.codahale.metrics.jvm.GarbageCollectorMetricSet;
 import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
 import com.codahale.metrics.jvm.ThreadStatesGaugeSet;
 
+import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.AbstractIdleService;
 
 
@@ -51,11 +52,20 @@ import com.google.common.util.concurrent.AbstractIdleService;
 public class JMXReportingService extends AbstractIdleService {
 
   private final MetricRegistry metricRegistry = new MetricRegistry();
+  private Map<String, MetricSet> additionalMetricSets;
   private final JmxReporter jmxReporter = JmxReporter.forRegistry(this.metricRegistry)
       .convertRatesTo(TimeUnit.SECONDS)
       .convertDurationsTo(TimeUnit.MILLISECONDS)
       .build();
 
+  public JMXReportingService() {
+    this(ImmutableMap.of());
+  }
+
+  public JMXReportingService(Map<String, MetricSet> additionalMetricSets) {
+    this.additionalMetricSets = additionalMetricSets;
+  }
+
   @Override
   protected void startUp() throws Exception {
     registerJvmMetrics();
@@ -72,6 +82,9 @@ public class JMXReportingService extends AbstractIdleService {
     registerMetricSetWithPrefix("jvm.memory", new MemoryUsageGaugeSet());
     registerMetricSetWithPrefix("jvm.threads", new ThreadStatesGaugeSet());
     this.metricRegistry.register("jvm.fileDescriptorRatio", new FileDescriptorRatioGauge());
+    for (Map.Entry<String, MetricSet> metricSet : this.additionalMetricSets.entrySet()) {
+      registerMetricSetWithPrefix(metricSet.getKey(), metricSet.getValue());
+    }
   }
 
   private void registerMetricSetWithPrefix(String prefix, MetricSet metricSet) {