You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by gi...@apache.org on 2018/12/04 21:37:23 UTC
[incubator-druid] branch master updated: Add TaskCountStatsMonitor
to monitor task count stats (#6657)
This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new 6073390 Add TaskCountStatsMonitor to monitor task count stats (#6657)
6073390 is described below
commit 607339003b2c31ff8d2a047d4afdc563f4d05675
Author: Mingming Qiu <cs...@gmail.com>
AuthorDate: Wed Dec 5 05:37:17 2018 +0800
Add TaskCountStatsMonitor to monitor task count stats (#6657)
* Add TaskCountStatsMonitor to monitor task count stats
* address comments
* add file header
* tweak test
---
.../java/util/metrics/StubServiceEmitter.java | 2 +-
docs/content/operations/metrics.md | 5 ++
.../apache/druid/indexing/overlord/TaskMaster.java | 59 +++++++++++++-
.../apache/druid/indexing/overlord/TaskQueue.java | 82 ++++++++++++++++++++
.../server/metrics/TaskCountStatsMonitor.java | 63 +++++++++++++++
.../server/metrics/TaskCountStatsProvider.java | 66 +++++++---------
.../server/metrics/TaskCountStatsMonitorTest.java | 89 ++++++++++++++++++++++
.../java/org/apache/druid/cli/CliOverlord.java | 2 +
8 files changed, 327 insertions(+), 41 deletions(-)
diff --git a/core/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java b/core/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java
index 8a41db5..38f715f 100644
--- a/core/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java
+++ b/core/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java
@@ -25,7 +25,7 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import java.util.ArrayList;
import java.util.List;
-class StubServiceEmitter extends ServiceEmitter
+public class StubServiceEmitter extends ServiceEmitter
{
private List<Event> events = new ArrayList<>();
diff --git a/docs/content/operations/metrics.md b/docs/content/operations/metrics.md
index d99d3e5..75c38e2 100644
--- a/docs/content/operations/metrics.md
+++ b/docs/content/operations/metrics.md
@@ -165,6 +165,11 @@ Note: If the JVM does not support CPU time measurement for the current thread, i
|`segment/added/bytes`|Size in bytes of new segments created.|dataSource, taskId, taskType, interval.|Varies.|
|`segment/moved/bytes`|Size in bytes of segments moved/archived via the Move Task.|dataSource, taskId, taskType, interval.|Varies.|
|`segment/nuked/bytes`|Size in bytes of segments deleted via the Kill Task.|dataSource, taskId, taskType, interval.|Varies.|
+|`task/success/count`|Number of successful tasks per emission period. This metric is only available if the TaskCountStatsMonitor module is included.|dataSource.|Varies.|
+|`task/failed/count`|Number of failed tasks per emission period. This metric is only available if the TaskCountStatsMonitor module is included.|dataSource.|Varies.|
+|`task/running/count`|Number of current running tasks. This metric is only available if the TaskCountStatsMonitor module is included.|dataSource.|Varies.|
+|`task/pending/count`|Number of current pending tasks. This metric is only available if the TaskCountStatsMonitor module is included.|dataSource.|Varies.|
+|`task/waiting/count`|Number of current waiting tasks. This metric is only available if the TaskCountStatsMonitor module is included.|dataSource.|Varies.|
## Coordination
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java
index 4428661..25fe06f 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java
@@ -41,14 +41,16 @@ import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordinator.CoordinatorOverlordServiceConfig;
+import org.apache.druid.server.metrics.TaskCountStatsProvider;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
/**
* Encapsulates the indexer leadership lifecycle.
*/
-public class TaskMaster
+public class TaskMaster implements TaskCountStatsProvider
{
private static final EmittingLogger log = new EmittingLogger(TaskMaster.class);
@@ -265,4 +267,59 @@ public class TaskMaster
return Optional.absent();
}
}
+
+ @Override
+ public Map<String, Long> getSuccessfulTaskCount()
+ {
+ Optional<TaskQueue> taskQueue = getTaskQueue();
+ if (taskQueue.isPresent()) {
+ return taskQueue.get().getSuccessfulTaskCount();
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public Map<String, Long> getFailedTaskCount()
+ {
+ Optional<TaskQueue> taskQueue = getTaskQueue();
+ if (taskQueue.isPresent()) {
+ return taskQueue.get().getFailedTaskCount();
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public Map<String, Long> getRunningTaskCount()
+ {
+ Optional<TaskQueue> taskQueue = getTaskQueue();
+ if (taskQueue.isPresent()) {
+ return taskQueue.get().getRunningTaskCount();
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public Map<String, Long> getPendingTaskCount()
+ {
+ Optional<TaskQueue> taskQueue = getTaskQueue();
+ if (taskQueue.isPresent()) {
+ return taskQueue.get().getPendingTaskCount();
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public Map<String, Long> getWaitingTaskCount()
+ {
+ Optional<TaskQueue> taskQueue = getTaskQueue();
+ if (taskQueue.isPresent()) {
+ return taskQueue.get().getWaitingTaskCount();
+ } else {
+ return null;
+ }
+ }
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
index 9a9c41d..ef30f15 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
@@ -53,12 +53,15 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
/**
* Interface between task producers and the task runner.
@@ -100,6 +103,11 @@ public class TaskQueue
private static final EmittingLogger log = new EmittingLogger(TaskQueue.class);
+ private final Map<String, AtomicLong> totalSuccessfulTaskCount = new ConcurrentHashMap<>();
+ private final Map<String, AtomicLong> totalFailedTaskCount = new ConcurrentHashMap<>();
+ private Map<String, Long> prevTotalSuccessfulTaskCount = new HashMap<>();
+ private Map<String, Long> prevTotalFailedTaskCount = new HashMap<>();
+
@Inject
public TaskQueue(
TaskQueueConfig config,
@@ -510,6 +518,14 @@ public class TaskQueue
task,
status.getDuration()
);
+
+ if (status.isSuccess()) {
+ totalSuccessfulTaskCount.computeIfAbsent(task.getDataSource(), k -> new AtomicLong())
+ .incrementAndGet();
+ } else {
+ totalFailedTaskCount.computeIfAbsent(task.getDataSource(), k -> new AtomicLong())
+ .incrementAndGet();
+ }
}
}
catch (Exception e) {
@@ -586,4 +602,70 @@ public class TaskQueue
return rv;
}
+ private Map<String, Long> getDeltaValues(Map<String, Long> total, Map<String, Long> prev)
+ {
+ return total.entrySet()
+ .stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue() - prev.getOrDefault(e.getKey(), 0L)));
+ }
+
+ public Map<String, Long> getSuccessfulTaskCount()
+ {
+ Map<String, Long> total = totalSuccessfulTaskCount.entrySet()
+ .stream()
+ .collect(Collectors.toMap(
+ Map.Entry::getKey,
+ e -> e.getValue().get()
+ ));
+ Map<String, Long> delta = getDeltaValues(total, prevTotalSuccessfulTaskCount);
+ prevTotalSuccessfulTaskCount = total;
+ return delta;
+ }
+
+ public Map<String, Long> getFailedTaskCount()
+ {
+ Map<String, Long> total = totalFailedTaskCount.entrySet()
+ .stream()
+ .collect(Collectors.toMap(
+ Map.Entry::getKey,
+ e -> e.getValue().get()
+ ));
+ Map<String, Long> delta = getDeltaValues(total, prevTotalFailedTaskCount);
+ prevTotalFailedTaskCount = total;
+ return delta;
+ }
+
+ public Map<String, Long> getRunningTaskCount()
+ {
+ Map<String, String> taskDatasources = tasks.stream().collect(Collectors.toMap(Task::getId, Task::getDataSource));
+ return taskRunner.getRunningTasks()
+ .stream()
+ .collect(Collectors.toMap(
+ e -> taskDatasources.getOrDefault(e.getTaskId(), ""),
+ e -> 1L,
+ Long::sum
+ ));
+ }
+
+ public Map<String, Long> getPendingTaskCount()
+ {
+ Map<String, String> taskDatasources = tasks.stream().collect(Collectors.toMap(Task::getId, Task::getDataSource));
+ return taskRunner.getPendingTasks()
+ .stream()
+ .collect(Collectors.toMap(
+ e -> taskDatasources.getOrDefault(e.getTaskId(), ""),
+ e -> 1L,
+ Long::sum
+ ));
+ }
+
+ public Map<String, Long> getWaitingTaskCount()
+ {
+ Set<String> runnerKnownTaskIds = taskRunner.getKnownTasks()
+ .stream()
+ .map(TaskRunnerWorkItem::getTaskId)
+ .collect(Collectors.toSet());
+ return tasks.stream().filter(task -> !runnerKnownTaskIds.contains(task.getId()))
+ .collect(Collectors.toMap(Task::getDataSource, task -> 1L, Long::sum));
+ }
}
diff --git a/server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsMonitor.java b/server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsMonitor.java
new file mode 100644
index 0000000..6c3394e
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsMonitor.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.metrics;
+
+import com.google.inject.Inject;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.java.util.metrics.AbstractMonitor;
+
+import java.util.Map;
+
+public class TaskCountStatsMonitor extends AbstractMonitor
+{
+ private final TaskCountStatsProvider statsProvider;
+
+ @Inject
+ public TaskCountStatsMonitor(
+ TaskCountStatsProvider statsProvider
+ )
+ {
+ this.statsProvider = statsProvider;
+ }
+
+ @Override
+ public boolean doMonitor(ServiceEmitter emitter)
+ {
+ emit(emitter, "task/success/count", statsProvider.getSuccessfulTaskCount());
+ emit(emitter, "task/failed/count", statsProvider.getFailedTaskCount());
+ emit(emitter, "task/running/count", statsProvider.getRunningTaskCount());
+ emit(emitter, "task/pending/count", statsProvider.getPendingTaskCount());
+ emit(emitter, "task/waiting/count", statsProvider.getWaitingTaskCount());
+ return true;
+ }
+
+ private void emit(ServiceEmitter emitter, String key, Map<String, Long> counts)
+ {
+ final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder();
+ if (counts != null) {
+ counts.forEach((k, v) -> {
+ builder.setDimension("dataSource", k);
+ emitter.emit(builder.build(key, v));
+ });
+ }
+ }
+
+}
diff --git a/core/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java b/server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsProvider.java
similarity index 51%
copy from core/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java
copy to server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsProvider.java
index 8a41db5..a96f8ce 100644
--- a/core/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java
+++ b/server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsProvider.java
@@ -17,46 +17,34 @@
* under the License.
*/
-package org.apache.druid.java.util.metrics;
+package org.apache.druid.server.metrics;
-import org.apache.druid.java.util.emitter.core.Event;
-import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import java.util.Map;
-import java.util.ArrayList;
-import java.util.List;
-
-class StubServiceEmitter extends ServiceEmitter
+public interface TaskCountStatsProvider
{
- private List<Event> events = new ArrayList<>();
-
- public StubServiceEmitter(String service, String host)
- {
- super(service, host, null);
- }
-
- @Override
- public void emit(Event event)
- {
- events.add(event);
- }
-
- public List<Event> getEvents()
- {
- return events;
- }
-
- @Override
- public void start()
- {
- }
-
- @Override
- public void flush()
- {
- }
-
- @Override
- public void close()
- {
- }
+ /**
+ * Return the number of successful tasks for each datasource during emission period.
+ */
+ Map<String, Long> getSuccessfulTaskCount();
+
+ /**
+ * Return the number of failed tasks for each datasource during emission period.
+ */
+ Map<String, Long> getFailedTaskCount();
+
+ /**
+ * Return the number of current running tasks for each datasource.
+ */
+ Map<String, Long> getRunningTaskCount();
+
+ /**
+ * Return the number of current pending tasks for each datasource.
+ */
+ Map<String, Long> getPendingTaskCount();
+
+ /**
+ * Return the number of current waiting tasks for each datasource.
+ */
+ Map<String, Long> getWaitingTaskCount();
}
diff --git a/server/src/test/java/org/apache/druid/server/metrics/TaskCountStatsMonitorTest.java b/server/src/test/java/org/apache/druid/server/metrics/TaskCountStatsMonitorTest.java
new file mode 100644
index 0000000..24d89ab
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/server/metrics/TaskCountStatsMonitorTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.metrics;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Map;
+
+public class TaskCountStatsMonitorTest
+{
+ private TaskCountStatsProvider statsProvider;
+
+ @Before
+ public void setUp()
+ {
+ statsProvider = new TaskCountStatsProvider()
+ {
+ @Override
+ public Map<String, Long> getSuccessfulTaskCount()
+ {
+ return ImmutableMap.of("d1", 1L);
+ }
+
+ @Override
+ public Map<String, Long> getFailedTaskCount()
+ {
+ return ImmutableMap.of("d1", 1L);
+ }
+
+ @Override
+ public Map<String, Long> getRunningTaskCount()
+ {
+ return ImmutableMap.of("d1", 1L);
+ }
+
+ @Override
+ public Map<String, Long> getPendingTaskCount()
+ {
+ return ImmutableMap.of("d1", 1L);
+ }
+
+ @Override
+ public Map<String, Long> getWaitingTaskCount()
+ {
+ return ImmutableMap.of("d1", 1L);
+ }
+ };
+ }
+
+ @Test
+ public void testMonitor()
+ {
+ final TaskCountStatsMonitor monitor = new TaskCountStatsMonitor(statsProvider);
+ final StubServiceEmitter emitter = new StubServiceEmitter("service", "host");
+ monitor.doMonitor(emitter);
+ Assert.assertEquals(5, emitter.getEvents().size());
+ Assert.assertEquals("task/success/count", emitter.getEvents().get(0).toMap().get("metric"));
+ Assert.assertEquals(1L, emitter.getEvents().get(0).toMap().get("value"));
+ Assert.assertEquals("task/failed/count", emitter.getEvents().get(1).toMap().get("metric"));
+ Assert.assertEquals(1L, emitter.getEvents().get(1).toMap().get("value"));
+ Assert.assertEquals("task/running/count", emitter.getEvents().get(2).toMap().get("metric"));
+ Assert.assertEquals(1L, emitter.getEvents().get(2).toMap().get("value"));
+ Assert.assertEquals("task/pending/count", emitter.getEvents().get(3).toMap().get("metric"));
+ Assert.assertEquals(1L, emitter.getEvents().get(3).toMap().get("value"));
+ Assert.assertEquals("task/waiting/count", emitter.getEvents().get(4).toMap().get("metric"));
+ Assert.assertEquals(1L, emitter.getEvents().get(4).toMap().get("value"));
+ }
+}
diff --git a/services/src/main/java/org/apache/druid/cli/CliOverlord.java b/services/src/main/java/org/apache/druid/cli/CliOverlord.java
index 038ad83..5407221 100644
--- a/services/src/main/java/org/apache/druid/cli/CliOverlord.java
+++ b/services/src/main/java/org/apache/druid/cli/CliOverlord.java
@@ -100,6 +100,7 @@ import org.apache.druid.server.http.RedirectInfo;
import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.server.initialization.jetty.JettyServerInitUtils;
import org.apache.druid.server.initialization.jetty.JettyServerInitializer;
+import org.apache.druid.server.metrics.TaskCountStatsProvider;
import org.apache.druid.server.security.AuthConfig;
import org.apache.druid.server.security.AuthenticationUtils;
import org.apache.druid.server.security.Authenticator;
@@ -170,6 +171,7 @@ public class CliOverlord extends ServerRunnable
JsonConfigProvider.bind(binder, "druid.indexer.auditlog", TaskAuditLogConfig.class);
binder.bind(TaskMaster.class).in(ManageLifecycle.class);
+ binder.bind(TaskCountStatsProvider.class).to(TaskMaster.class);
binder.bind(TaskLogStreamer.class).to(SwitchingTaskLogStreamer.class).in(LazySingleton.class);
binder.bind(
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org