You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by gi...@apache.org on 2018/12/04 21:37:23 UTC

[incubator-druid] branch master updated: Add TaskCountStatsMonitor to monitor task count stats (#6657)

This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 6073390  Add TaskCountStatsMonitor to monitor task count stats (#6657)
6073390 is described below

commit 607339003b2c31ff8d2a047d4afdc563f4d05675
Author: Mingming Qiu <cs...@gmail.com>
AuthorDate: Wed Dec 5 05:37:17 2018 +0800

    Add TaskCountStatsMonitor to monitor task count stats (#6657)
    
    * Add TaskCountStatsMonitor to monitor task count stats
    
    * address comments
    
    * add file header
    
    * tweak test
---
 .../java/util/metrics/StubServiceEmitter.java      |  2 +-
 docs/content/operations/metrics.md                 |  5 ++
 .../apache/druid/indexing/overlord/TaskMaster.java | 59 +++++++++++++-
 .../apache/druid/indexing/overlord/TaskQueue.java  | 82 ++++++++++++++++++++
 .../server/metrics/TaskCountStatsMonitor.java      | 63 +++++++++++++++
 .../server/metrics/TaskCountStatsProvider.java     | 66 +++++++---------
 .../server/metrics/TaskCountStatsMonitorTest.java  | 89 ++++++++++++++++++++++
 .../java/org/apache/druid/cli/CliOverlord.java     |  2 +
 8 files changed, 327 insertions(+), 41 deletions(-)

diff --git a/core/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java b/core/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java
index 8a41db5..38f715f 100644
--- a/core/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java
+++ b/core/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java
@@ -25,7 +25,7 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import java.util.ArrayList;
 import java.util.List;
 
-class StubServiceEmitter extends ServiceEmitter
+public class StubServiceEmitter extends ServiceEmitter
 {
   private List<Event> events = new ArrayList<>();
 
diff --git a/docs/content/operations/metrics.md b/docs/content/operations/metrics.md
index d99d3e5..75c38e2 100644
--- a/docs/content/operations/metrics.md
+++ b/docs/content/operations/metrics.md
@@ -165,6 +165,11 @@ Note: If the JVM does not support CPU time measurement for the current thread, i
 |`segment/added/bytes`|Size in bytes of new segments created.|dataSource, taskId, taskType, interval.|Varies.|
 |`segment/moved/bytes`|Size in bytes of segments moved/archived via the Move Task.|dataSource, taskId, taskType, interval.|Varies.|
 |`segment/nuked/bytes`|Size in bytes of segments deleted via the Kill Task.|dataSource, taskId, taskType, interval.|Varies.|
+|`task/success/count`|Number of successful tasks per emission period. This metric is only available if the TaskCountStatsMonitor module is included.|dataSource.|Varies.|
+|`task/failed/count`|Number of failed tasks per emission period. This metric is only available if the TaskCountStatsMonitor module is included.|dataSource.|Varies.|
+|`task/running/count`|Number of current running tasks. This metric is only available if the TaskCountStatsMonitor module is included.|dataSource.|Varies.|
+|`task/pending/count`|Number of current pending tasks. This metric is only available if the TaskCountStatsMonitor module is included.|dataSource.|Varies.|
+|`task/waiting/count`|Number of current waiting tasks. This metric is only available if the TaskCountStatsMonitor module is included.|dataSource.|Varies.|
 
 ## Coordination
 
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java
index 4428661..25fe06f 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java
@@ -41,14 +41,16 @@ import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.server.DruidNode;
 import org.apache.druid.server.coordinator.CoordinatorOverlordServiceConfig;
+import org.apache.druid.server.metrics.TaskCountStatsProvider;
 
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantLock;
 
 /**
  * Encapsulates the indexer leadership lifecycle.
  */
-public class TaskMaster
+public class TaskMaster implements TaskCountStatsProvider
 {
   private static final EmittingLogger log = new EmittingLogger(TaskMaster.class);
 
@@ -265,4 +267,59 @@ public class TaskMaster
       return Optional.absent();
     }
   }
+
+  @Override
+  public Map<String, Long> getSuccessfulTaskCount()
+  {
+    Optional<TaskQueue> taskQueue = getTaskQueue();
+    if (taskQueue.isPresent()) {
+      return taskQueue.get().getSuccessfulTaskCount();
+    } else {
+      return null;
+    }
+  }
+
+  @Override
+  public Map<String, Long> getFailedTaskCount()
+  {
+    Optional<TaskQueue> taskQueue = getTaskQueue();
+    if (taskQueue.isPresent()) {
+      return taskQueue.get().getFailedTaskCount();
+    } else {
+      return null;
+    }
+  }
+
+  @Override
+  public Map<String, Long> getRunningTaskCount()
+  {
+    Optional<TaskQueue> taskQueue = getTaskQueue();
+    if (taskQueue.isPresent()) {
+      return taskQueue.get().getRunningTaskCount();
+    } else {
+      return null;
+    }
+  }
+
+  @Override
+  public Map<String, Long> getPendingTaskCount()
+  {
+    Optional<TaskQueue> taskQueue = getTaskQueue();
+    if (taskQueue.isPresent()) {
+      return taskQueue.get().getPendingTaskCount();
+    } else {
+      return null;
+    }
+  }
+
+  @Override
+  public Map<String, Long> getWaitingTaskCount()
+  {
+    Optional<TaskQueue> taskQueue = getTaskQueue();
+    if (taskQueue.isPresent()) {
+      return taskQueue.get().getWaitingTaskCount();
+    } else {
+      return null;
+    }
+  }
 }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
index 9a9c41d..ef30f15 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
@@ -53,12 +53,15 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
 
 /**
  * Interface between task producers and the task runner.
@@ -100,6 +103,11 @@ public class TaskQueue
 
   private static final EmittingLogger log = new EmittingLogger(TaskQueue.class);
 
+  private final Map<String, AtomicLong> totalSuccessfulTaskCount = new ConcurrentHashMap<>();
+  private final Map<String, AtomicLong> totalFailedTaskCount = new ConcurrentHashMap<>();
+  private Map<String, Long> prevTotalSuccessfulTaskCount = new HashMap<>();
+  private Map<String, Long> prevTotalFailedTaskCount = new HashMap<>();
+
   @Inject
   public TaskQueue(
       TaskQueueConfig config,
@@ -510,6 +518,14 @@ public class TaskQueue
                     task,
                     status.getDuration()
                 );
+
+                if (status.isSuccess()) {
+                  totalSuccessfulTaskCount.computeIfAbsent(task.getDataSource(), k -> new AtomicLong())
+                                          .incrementAndGet();
+                } else {
+                  totalFailedTaskCount.computeIfAbsent(task.getDataSource(), k -> new AtomicLong())
+                                      .incrementAndGet();
+                }
               }
             }
             catch (Exception e) {
@@ -586,4 +602,70 @@ public class TaskQueue
     return rv;
   }
 
+  private Map<String, Long> getDeltaValues(Map<String, Long> total, Map<String, Long> prev)
+  {
+    return total.entrySet()
+                .stream()
+                .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue() - prev.getOrDefault(e.getKey(), 0L)));
+  }
+
+  public Map<String, Long> getSuccessfulTaskCount()
+  {
+    Map<String, Long> total = totalSuccessfulTaskCount.entrySet()
+                                                      .stream()
+                                                      .collect(Collectors.toMap(
+                                                          Map.Entry::getKey,
+                                                          e -> e.getValue().get()
+                                                      ));
+    Map<String, Long> delta = getDeltaValues(total, prevTotalSuccessfulTaskCount);
+    prevTotalSuccessfulTaskCount = total;
+    return delta;
+  }
+
+  public Map<String, Long> getFailedTaskCount()
+  {
+    Map<String, Long> total = totalFailedTaskCount.entrySet()
+                                                  .stream()
+                                                  .collect(Collectors.toMap(
+                                                      Map.Entry::getKey,
+                                                      e -> e.getValue().get()
+                                                  ));
+    Map<String, Long> delta = getDeltaValues(total, prevTotalFailedTaskCount);
+    prevTotalFailedTaskCount = total;
+    return delta;
+  }
+
+  public Map<String, Long> getRunningTaskCount()
+  {
+    Map<String, String> taskDatasources = tasks.stream().collect(Collectors.toMap(Task::getId, Task::getDataSource));
+    return taskRunner.getRunningTasks()
+                     .stream()
+                     .collect(Collectors.toMap(
+                         e -> taskDatasources.getOrDefault(e.getTaskId(), ""),
+                         e -> 1L,
+                         Long::sum
+                     ));
+  }
+
+  public Map<String, Long> getPendingTaskCount()
+  {
+    Map<String, String> taskDatasources = tasks.stream().collect(Collectors.toMap(Task::getId, Task::getDataSource));
+    return taskRunner.getPendingTasks()
+                     .stream()
+                     .collect(Collectors.toMap(
+                         e -> taskDatasources.getOrDefault(e.getTaskId(), ""),
+                         e -> 1L,
+                         Long::sum
+                     ));
+  }
+
+  public Map<String, Long> getWaitingTaskCount()
+  {
+    Set<String> runnerKnownTaskIds = taskRunner.getKnownTasks()
+                                               .stream()
+                                               .map(TaskRunnerWorkItem::getTaskId)
+                                               .collect(Collectors.toSet());
+    return tasks.stream().filter(task -> !runnerKnownTaskIds.contains(task.getId()))
+                .collect(Collectors.toMap(Task::getDataSource, task -> 1L, Long::sum));
+  }
 }
diff --git a/server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsMonitor.java b/server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsMonitor.java
new file mode 100644
index 0000000..6c3394e
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsMonitor.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.metrics;
+
+import com.google.inject.Inject;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.java.util.metrics.AbstractMonitor;
+
+import java.util.Map;
+
+public class TaskCountStatsMonitor extends AbstractMonitor
+{
+  private final TaskCountStatsProvider statsProvider;
+
+  @Inject
+  public TaskCountStatsMonitor(
+      TaskCountStatsProvider statsProvider
+  )
+  {
+    this.statsProvider = statsProvider;
+  }
+
+  @Override
+  public boolean doMonitor(ServiceEmitter emitter)
+  {
+    emit(emitter, "task/success/count", statsProvider.getSuccessfulTaskCount());
+    emit(emitter, "task/failed/count", statsProvider.getFailedTaskCount());
+    emit(emitter, "task/running/count", statsProvider.getRunningTaskCount());
+    emit(emitter, "task/pending/count", statsProvider.getPendingTaskCount());
+    emit(emitter, "task/waiting/count", statsProvider.getWaitingTaskCount());
+    return true;
+  }
+
+  private void emit(ServiceEmitter emitter, String key, Map<String, Long> counts)
+  {
+    final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder();
+    if (counts != null) {
+      counts.forEach((k, v) -> {
+        builder.setDimension("dataSource", k);
+        emitter.emit(builder.build(key, v));
+      });
+    }
+  }
+
+}
diff --git a/core/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java b/server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsProvider.java
similarity index 51%
copy from core/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java
copy to server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsProvider.java
index 8a41db5..a96f8ce 100644
--- a/core/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java
+++ b/server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsProvider.java
@@ -17,46 +17,34 @@
  * under the License.
  */
 
-package org.apache.druid.java.util.metrics;
+package org.apache.druid.server.metrics;
 
-import org.apache.druid.java.util.emitter.core.Event;
-import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import java.util.Map;
 
-import java.util.ArrayList;
-import java.util.List;
-
-class StubServiceEmitter extends ServiceEmitter
+public interface TaskCountStatsProvider
 {
-  private List<Event> events = new ArrayList<>();
-
-  public StubServiceEmitter(String service, String host)
-  {
-    super(service, host, null);
-  }
-
-  @Override
-  public void emit(Event event)
-  {
-    events.add(event);
-  }
-
-  public List<Event> getEvents()
-  {
-    return events;
-  }
-
-  @Override
-  public void start()
-  {
-  }
-
-  @Override
-  public void flush()
-  {
-  }
-
-  @Override
-  public void close()
-  {
-  }
+  /**
+   * Return the number of successful tasks for each datasource during emission period.
+   */
+  Map<String, Long> getSuccessfulTaskCount();
+
+  /**
+   * Return the number of failed tasks for each datasource during emission period.
+   */
+  Map<String, Long> getFailedTaskCount();
+
+  /**
+   * Return the number of current running tasks for each datasource.
+   */
+  Map<String, Long> getRunningTaskCount();
+
+  /**
+   * Return the number of current pending tasks for each datasource.
+   */
+  Map<String, Long> getPendingTaskCount();
+
+  /**
+   * Return the number of current waiting tasks for each datasource.
+   */
+  Map<String, Long> getWaitingTaskCount();
 }
diff --git a/server/src/test/java/org/apache/druid/server/metrics/TaskCountStatsMonitorTest.java b/server/src/test/java/org/apache/druid/server/metrics/TaskCountStatsMonitorTest.java
new file mode 100644
index 0000000..24d89ab
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/server/metrics/TaskCountStatsMonitorTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.metrics;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Map;
+
+public class TaskCountStatsMonitorTest
+{
+  private TaskCountStatsProvider statsProvider;
+
+  @Before
+  public void setUp()
+  {
+    statsProvider = new TaskCountStatsProvider()
+    {
+      @Override
+      public Map<String, Long> getSuccessfulTaskCount()
+      {
+        return ImmutableMap.of("d1", 1L);
+      }
+
+      @Override
+      public Map<String, Long> getFailedTaskCount()
+      {
+        return ImmutableMap.of("d1", 1L);
+      }
+
+      @Override
+      public Map<String, Long> getRunningTaskCount()
+      {
+        return ImmutableMap.of("d1", 1L);
+      }
+
+      @Override
+      public Map<String, Long> getPendingTaskCount()
+      {
+        return ImmutableMap.of("d1", 1L);
+      }
+
+      @Override
+      public Map<String, Long> getWaitingTaskCount()
+      {
+        return ImmutableMap.of("d1", 1L);
+      }
+    };
+  }
+
+  @Test
+  public void testMonitor()
+  {
+    final TaskCountStatsMonitor monitor = new TaskCountStatsMonitor(statsProvider);
+    final StubServiceEmitter emitter = new StubServiceEmitter("service", "host");
+    monitor.doMonitor(emitter);
+    Assert.assertEquals(5, emitter.getEvents().size());
+    Assert.assertEquals("task/success/count", emitter.getEvents().get(0).toMap().get("metric"));
+    Assert.assertEquals(1L, emitter.getEvents().get(0).toMap().get("value"));
+    Assert.assertEquals("task/failed/count", emitter.getEvents().get(1).toMap().get("metric"));
+    Assert.assertEquals(1L, emitter.getEvents().get(1).toMap().get("value"));
+    Assert.assertEquals("task/running/count", emitter.getEvents().get(2).toMap().get("metric"));
+    Assert.assertEquals(1L, emitter.getEvents().get(2).toMap().get("value"));
+    Assert.assertEquals("task/pending/count", emitter.getEvents().get(3).toMap().get("metric"));
+    Assert.assertEquals(1L, emitter.getEvents().get(3).toMap().get("value"));
+    Assert.assertEquals("task/waiting/count", emitter.getEvents().get(4).toMap().get("metric"));
+    Assert.assertEquals(1L, emitter.getEvents().get(4).toMap().get("value"));
+  }
+}
diff --git a/services/src/main/java/org/apache/druid/cli/CliOverlord.java b/services/src/main/java/org/apache/druid/cli/CliOverlord.java
index 038ad83..5407221 100644
--- a/services/src/main/java/org/apache/druid/cli/CliOverlord.java
+++ b/services/src/main/java/org/apache/druid/cli/CliOverlord.java
@@ -100,6 +100,7 @@ import org.apache.druid.server.http.RedirectInfo;
 import org.apache.druid.server.initialization.ServerConfig;
 import org.apache.druid.server.initialization.jetty.JettyServerInitUtils;
 import org.apache.druid.server.initialization.jetty.JettyServerInitializer;
+import org.apache.druid.server.metrics.TaskCountStatsProvider;
 import org.apache.druid.server.security.AuthConfig;
 import org.apache.druid.server.security.AuthenticationUtils;
 import org.apache.druid.server.security.Authenticator;
@@ -170,6 +171,7 @@ public class CliOverlord extends ServerRunnable
             JsonConfigProvider.bind(binder, "druid.indexer.auditlog", TaskAuditLogConfig.class);
 
             binder.bind(TaskMaster.class).in(ManageLifecycle.class);
+            binder.bind(TaskCountStatsProvider.class).to(TaskMaster.class);
 
             binder.bind(TaskLogStreamer.class).to(SwitchingTaskLogStreamer.class).in(LazySingleton.class);
             binder.bind(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org