You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2018/12/04 21:37:19 UTC

[GitHub] gianm closed pull request #6657: Add TaskCountStatsMonitor to monitor task count stats

gianm closed pull request #6657: Add TaskCountStatsMonitor to monitor task count stats
URL: https://github.com/apache/incubator-druid/pull/6657
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java b/core/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java
index 8a41db527ba..38f715f848a 100644
--- a/core/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java
+++ b/core/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java
@@ -25,7 +25,7 @@
 import java.util.ArrayList;
 import java.util.List;
 
-class StubServiceEmitter extends ServiceEmitter
+public class StubServiceEmitter extends ServiceEmitter
 {
   private List<Event> events = new ArrayList<>();
 
diff --git a/docs/content/operations/metrics.md b/docs/content/operations/metrics.md
index e3270230aaa..c9e8a3a0638 100644
--- a/docs/content/operations/metrics.md
+++ b/docs/content/operations/metrics.md
@@ -163,6 +163,11 @@ Note: If the JVM does not support CPU time measurement for the current thread, i
 |`segment/added/bytes`|Size in bytes of new segments created.|dataSource, taskId, taskType, interval.|Varies.|
 |`segment/moved/bytes`|Size in bytes of segments moved/archived via the Move Task.|dataSource, taskId, taskType, interval.|Varies.|
 |`segment/nuked/bytes`|Size in bytes of segments deleted via the Kill Task.|dataSource, taskId, taskType, interval.|Varies.|
+|`task/success/count`|Number of successful tasks per emission period. This metric is only available if the TaskCountStatsMonitor module is included.|dataSource.|Varies.|
+|`task/failed/count`|Number of failed tasks per emission period. This metric is only available if the TaskCountStatsMonitor module is included.|dataSource.|Varies.|
+|`task/running/count`|Number of current running tasks. This metric is only available if the TaskCountStatsMonitor module is included.|dataSource.|Varies.|
+|`task/pending/count`|Number of current pending tasks. This metric is only available if the TaskCountStatsMonitor module is included.|dataSource.|Varies.|
+|`task/waiting/count`|Number of current waiting tasks. This metric is only available if the TaskCountStatsMonitor module is included.|dataSource.|Varies.|
 
 ## Coordination
 
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java
index 4428661513a..25fe06fe608 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java
@@ -41,14 +41,16 @@
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.server.DruidNode;
 import org.apache.druid.server.coordinator.CoordinatorOverlordServiceConfig;
+import org.apache.druid.server.metrics.TaskCountStatsProvider;
 
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantLock;
 
 /**
  * Encapsulates the indexer leadership lifecycle.
  */
-public class TaskMaster
+public class TaskMaster implements TaskCountStatsProvider
 {
   private static final EmittingLogger log = new EmittingLogger(TaskMaster.class);
 
@@ -265,4 +267,59 @@ public String getCurrentLeader()
       return Optional.absent();
     }
   }
+
+  @Override
+  public Map<String, Long> getSuccessfulTaskCount()
+  {
+    Optional<TaskQueue> taskQueue = getTaskQueue();
+    if (taskQueue.isPresent()) {
+      return taskQueue.get().getSuccessfulTaskCount();
+    } else {
+      return null;
+    }
+  }
+
+  @Override
+  public Map<String, Long> getFailedTaskCount()
+  {
+    Optional<TaskQueue> taskQueue = getTaskQueue();
+    if (taskQueue.isPresent()) {
+      return taskQueue.get().getFailedTaskCount();
+    } else {
+      return null;
+    }
+  }
+
+  @Override
+  public Map<String, Long> getRunningTaskCount()
+  {
+    Optional<TaskQueue> taskQueue = getTaskQueue();
+    if (taskQueue.isPresent()) {
+      return taskQueue.get().getRunningTaskCount();
+    } else {
+      return null;
+    }
+  }
+
+  @Override
+  public Map<String, Long> getPendingTaskCount()
+  {
+    Optional<TaskQueue> taskQueue = getTaskQueue();
+    if (taskQueue.isPresent()) {
+      return taskQueue.get().getPendingTaskCount();
+    } else {
+      return null;
+    }
+  }
+
+  @Override
+  public Map<String, Long> getWaitingTaskCount()
+  {
+    Optional<TaskQueue> taskQueue = getTaskQueue();
+    if (taskQueue.isPresent()) {
+      return taskQueue.get().getWaitingTaskCount();
+    } else {
+      return null;
+    }
+  }
 }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
index 9a9c41dfbb8..ef30f15f225 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
@@ -53,12 +53,15 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
 
 /**
  * Interface between task producers and the task runner.
@@ -100,6 +103,11 @@
 
   private static final EmittingLogger log = new EmittingLogger(TaskQueue.class);
 
+  private final Map<String, AtomicLong> totalSuccessfulTaskCount = new ConcurrentHashMap<>();
+  private final Map<String, AtomicLong> totalFailedTaskCount = new ConcurrentHashMap<>();
+  private Map<String, Long> prevTotalSuccessfulTaskCount = new HashMap<>();
+  private Map<String, Long> prevTotalFailedTaskCount = new HashMap<>();
+
   @Inject
   public TaskQueue(
       TaskQueueConfig config,
@@ -510,6 +518,14 @@ private void handleStatus(final TaskStatus status)
                     task,
                     status.getDuration()
                 );
+
+                if (status.isSuccess()) {
+                  totalSuccessfulTaskCount.computeIfAbsent(task.getDataSource(), k -> new AtomicLong())
+                                          .incrementAndGet();
+                } else {
+                  totalFailedTaskCount.computeIfAbsent(task.getDataSource(), k -> new AtomicLong())
+                                      .incrementAndGet();
+                }
               }
             }
             catch (Exception e) {
@@ -586,4 +602,70 @@ private void syncFromStorage()
     return rv;
   }
 
+  private Map<String, Long> getDeltaValues(Map<String, Long> total, Map<String, Long> prev)
+  {
+    return total.entrySet()
+                .stream()
+                .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue() - prev.getOrDefault(e.getKey(), 0L)));
+  }
+
+  public Map<String, Long> getSuccessfulTaskCount()
+  {
+    Map<String, Long> total = totalSuccessfulTaskCount.entrySet()
+                                                      .stream()
+                                                      .collect(Collectors.toMap(
+                                                          Map.Entry::getKey,
+                                                          e -> e.getValue().get()
+                                                      ));
+    Map<String, Long> delta = getDeltaValues(total, prevTotalSuccessfulTaskCount);
+    prevTotalSuccessfulTaskCount = total;
+    return delta;
+  }
+
+  public Map<String, Long> getFailedTaskCount()
+  {
+    Map<String, Long> total = totalFailedTaskCount.entrySet()
+                                                  .stream()
+                                                  .collect(Collectors.toMap(
+                                                      Map.Entry::getKey,
+                                                      e -> e.getValue().get()
+                                                  ));
+    Map<String, Long> delta = getDeltaValues(total, prevTotalFailedTaskCount);
+    prevTotalFailedTaskCount = total;
+    return delta;
+  }
+
+  public Map<String, Long> getRunningTaskCount()
+  {
+    Map<String, String> taskDatasources = tasks.stream().collect(Collectors.toMap(Task::getId, Task::getDataSource));
+    return taskRunner.getRunningTasks()
+                     .stream()
+                     .collect(Collectors.toMap(
+                         e -> taskDatasources.getOrDefault(e.getTaskId(), ""),
+                         e -> 1L,
+                         Long::sum
+                     ));
+  }
+
+  public Map<String, Long> getPendingTaskCount()
+  {
+    Map<String, String> taskDatasources = tasks.stream().collect(Collectors.toMap(Task::getId, Task::getDataSource));
+    return taskRunner.getPendingTasks()
+                     .stream()
+                     .collect(Collectors.toMap(
+                         e -> taskDatasources.getOrDefault(e.getTaskId(), ""),
+                         e -> 1L,
+                         Long::sum
+                     ));
+  }
+
+  public Map<String, Long> getWaitingTaskCount()
+  {
+    Set<String> runnerKnownTaskIds = taskRunner.getKnownTasks()
+                                               .stream()
+                                               .map(TaskRunnerWorkItem::getTaskId)
+                                               .collect(Collectors.toSet());
+    return tasks.stream().filter(task -> !runnerKnownTaskIds.contains(task.getId()))
+                .collect(Collectors.toMap(Task::getDataSource, task -> 1L, Long::sum));
+  }
 }
diff --git a/server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsMonitor.java b/server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsMonitor.java
new file mode 100644
index 00000000000..6c3394e9775
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsMonitor.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.metrics;
+
+import com.google.inject.Inject;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.java.util.metrics.AbstractMonitor;
+
+import java.util.Map;
+
+public class TaskCountStatsMonitor extends AbstractMonitor
+{
+  private final TaskCountStatsProvider statsProvider;
+
+  @Inject
+  public TaskCountStatsMonitor(
+      TaskCountStatsProvider statsProvider
+  )
+  {
+    this.statsProvider = statsProvider;
+  }
+
+  @Override
+  public boolean doMonitor(ServiceEmitter emitter)
+  {
+    emit(emitter, "task/success/count", statsProvider.getSuccessfulTaskCount());
+    emit(emitter, "task/failed/count", statsProvider.getFailedTaskCount());
+    emit(emitter, "task/running/count", statsProvider.getRunningTaskCount());
+    emit(emitter, "task/pending/count", statsProvider.getPendingTaskCount());
+    emit(emitter, "task/waiting/count", statsProvider.getWaitingTaskCount());
+    return true;
+  }
+
+  private void emit(ServiceEmitter emitter, String key, Map<String, Long> counts)
+  {
+    final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder();
+    if (counts != null) {
+      counts.forEach((k, v) -> {
+        builder.setDimension("dataSource", k);
+        emitter.emit(builder.build(key, v));
+      });
+    }
+  }
+
+}
diff --git a/server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsProvider.java b/server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsProvider.java
new file mode 100644
index 00000000000..a96f8ce0623
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsProvider.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.metrics;
+
+import java.util.Map;
+
+public interface TaskCountStatsProvider
+{
+  /**
+   * Return the number of successful tasks for each datasource during emission period.
+   */
+  Map<String, Long> getSuccessfulTaskCount();
+
+  /**
+   * Return the number of failed tasks for each datasource during emission period.
+   */
+  Map<String, Long> getFailedTaskCount();
+
+  /**
+   * Return the number of current running tasks for each datasource.
+   */
+  Map<String, Long> getRunningTaskCount();
+
+  /**
+   * Return the number of current pending tasks for each datasource.
+   */
+  Map<String, Long> getPendingTaskCount();
+
+  /**
+   * Return the number of current waiting tasks for each datasource.
+   */
+  Map<String, Long> getWaitingTaskCount();
+}
diff --git a/server/src/test/java/org/apache/druid/server/metrics/TaskCountStatsMonitorTest.java b/server/src/test/java/org/apache/druid/server/metrics/TaskCountStatsMonitorTest.java
new file mode 100644
index 00000000000..24d89ab957a
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/server/metrics/TaskCountStatsMonitorTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.metrics;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Map;
+
+public class TaskCountStatsMonitorTest
+{
+  private TaskCountStatsProvider statsProvider;
+
+  @Before
+  public void setUp()
+  {
+    statsProvider = new TaskCountStatsProvider()
+    {
+      @Override
+      public Map<String, Long> getSuccessfulTaskCount()
+      {
+        return ImmutableMap.of("d1", 1L);
+      }
+
+      @Override
+      public Map<String, Long> getFailedTaskCount()
+      {
+        return ImmutableMap.of("d1", 1L);
+      }
+
+      @Override
+      public Map<String, Long> getRunningTaskCount()
+      {
+        return ImmutableMap.of("d1", 1L);
+      }
+
+      @Override
+      public Map<String, Long> getPendingTaskCount()
+      {
+        return ImmutableMap.of("d1", 1L);
+      }
+
+      @Override
+      public Map<String, Long> getWaitingTaskCount()
+      {
+        return ImmutableMap.of("d1", 1L);
+      }
+    };
+  }
+
+  @Test
+  public void testMonitor()
+  {
+    final TaskCountStatsMonitor monitor = new TaskCountStatsMonitor(statsProvider);
+    final StubServiceEmitter emitter = new StubServiceEmitter("service", "host");
+    monitor.doMonitor(emitter);
+    Assert.assertEquals(5, emitter.getEvents().size());
+    Assert.assertEquals("task/success/count", emitter.getEvents().get(0).toMap().get("metric"));
+    Assert.assertEquals(1L, emitter.getEvents().get(0).toMap().get("value"));
+    Assert.assertEquals("task/failed/count", emitter.getEvents().get(1).toMap().get("metric"));
+    Assert.assertEquals(1L, emitter.getEvents().get(1).toMap().get("value"));
+    Assert.assertEquals("task/running/count", emitter.getEvents().get(2).toMap().get("metric"));
+    Assert.assertEquals(1L, emitter.getEvents().get(2).toMap().get("value"));
+    Assert.assertEquals("task/pending/count", emitter.getEvents().get(3).toMap().get("metric"));
+    Assert.assertEquals(1L, emitter.getEvents().get(3).toMap().get("value"));
+    Assert.assertEquals("task/waiting/count", emitter.getEvents().get(4).toMap().get("metric"));
+    Assert.assertEquals(1L, emitter.getEvents().get(4).toMap().get("value"));
+  }
+}
diff --git a/services/src/main/java/org/apache/druid/cli/CliOverlord.java b/services/src/main/java/org/apache/druid/cli/CliOverlord.java
index 038ad837c4d..540722195e8 100644
--- a/services/src/main/java/org/apache/druid/cli/CliOverlord.java
+++ b/services/src/main/java/org/apache/druid/cli/CliOverlord.java
@@ -100,6 +100,7 @@
 import org.apache.druid.server.initialization.ServerConfig;
 import org.apache.druid.server.initialization.jetty.JettyServerInitUtils;
 import org.apache.druid.server.initialization.jetty.JettyServerInitializer;
+import org.apache.druid.server.metrics.TaskCountStatsProvider;
 import org.apache.druid.server.security.AuthConfig;
 import org.apache.druid.server.security.AuthenticationUtils;
 import org.apache.druid.server.security.Authenticator;
@@ -170,6 +171,7 @@ public void configure(Binder binder)
             JsonConfigProvider.bind(binder, "druid.indexer.auditlog", TaskAuditLogConfig.class);
 
             binder.bind(TaskMaster.class).in(ManageLifecycle.class);
+            binder.bind(TaskCountStatsProvider.class).to(TaskMaster.class);
 
             binder.bind(TaskLogStreamer.class).to(SwitchingTaskLogStreamer.class).in(LazySingleton.class);
             binder.bind(


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org