You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by jo...@apache.org on 2022/04/26 16:44:59 UTC
[druid] branch master updated: Worker level task metrics (#12446)
This is an automated email from the ASF dual-hosted git repository.
jonwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 564d6defd4 Worker level task metrics (#12446)
564d6defd4 is described below
commit 564d6defd47749d55dd07e5549d7264cbc1c4019
Author: zachjsh <za...@gmail.com>
AuthorDate: Tue Apr 26 12:44:44 2022 -0400
Worker level task metrics (#12446)
* * fix metric name inconsistency
* * add task slot metrics for middle managers
* * add new WorkerTaskCountStatsMonitor to report task count metrics
from worker
* * more stuff
* * remove unused variable
* * more stuff
* * add javadocs
* * fix checkstyle
* * fix hadoop test failure
* * cleanup
* * add more code coverage in tests
* * fix test failure
* * add docs
* * increase code coverage
* * fix spelling
* * fix failing tests
* * remove dead code
* * fix spelling
---
docs/configuration/index.md | 2 +
docs/operations/metrics.md | 5 +
.../main/resources/defaultMetricDimensions.json | 8 +-
.../druid/indexing/overlord/ForkingTaskRunner.java | 73 ++++++-
.../indexing/overlord/ForkingTaskRunnerTest.java | 16 ++
.../metrics/WorkerTaskCountStatsMonitor.java | 79 ++++++++
.../metrics/WorkerTaskCountStatsProvider.java | 63 ++++++
.../metrics/WorkerTaskCountStatsMonitorTest.java | 216 +++++++++++++++++++++
.../org/apache/druid/cli/CliMiddleManager.java | 2 +
website/.spelling | 1 +
10 files changed, 454 insertions(+), 11 deletions(-)
diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index f4c268ae9d..e598b36ea0 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -383,6 +383,8 @@ Metric monitoring is an essential part of Druid operations. The following monit
|`org.apache.druid.server.metrics.QueryCountStatsMonitor`|Reports how many queries have been successful/failed/interrupted.|
|`org.apache.druid.server.emitter.HttpEmittingMonitor`|Reports internal metrics of `http` or `parametrized` emitter (see below). Must not be used with another emitter type. See the description of the metrics here: https://github.com/apache/druid/pull/4973.|
|`org.apache.druid.server.metrics.TaskCountStatsMonitor`|Reports how many ingestion tasks are currently running/pending/waiting and also the number of successful/failed tasks per emission period.|
+|`org.apache.druid.server.metrics.TaskSlotCountStatsMonitor`|Reports metrics about task slot usage per emission period.|
+|`org.apache.druid.server.metrics.WorkerTaskCountStatsMonitor`|Reports how many ingestion tasks are currently running/pending/waiting, the number of successful/failed tasks, and metrics about task slot usage for the reporting worker, per emission period. Only supported by middleManager node types.|
For example, you might configure monitors on all processes for system and JVM information within `common.runtime.properties` as follows:
diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index a5894ed005..8124acd1c9 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -214,6 +214,11 @@ Note: If the JVM does not support CPU time measurement for the current thread, i
|`taskSlot/lazy/count`|Number of total task slots in lazy marked MiddleManagers and Indexers per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.|category.|Varies.|
|`taskSlot/blacklisted/count`|Number of total task slots in blacklisted MiddleManagers and Indexers per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.|category.|Varies.|
|`task/segmentAvailability/wait/time`|The amount of milliseconds a batch indexing task waited for newly created segments to become available for querying.|dataSource, taskType, taskId, segmentAvailabilityConfirmed|Varies.|
+|`worker/task/failed/count`|Number of failed tasks run on the reporting worker per emission period. This metric is only available if the WorkerTaskCountStatsMonitor module is included, and is only supported for middleManager nodes.|category, version.|Varies.|
+|`worker/task/success/count`|Number of successful tasks run on the reporting worker per emission period. This metric is only available if the WorkerTaskCountStatsMonitor module is included, and is only supported for middleManager nodes.|category, version.|Varies.|
+|`worker/taskSlot/idle/count`|Number of idle task slots on the reporting worker per emission period. This metric is only available if the WorkerTaskCountStatsMonitor module is included, and is only supported for middleManager nodes.|category, version.|Varies.|
+|`worker/taskSlot/total/count`|Number of total task slots on the reporting worker per emission period. This metric is only available if the WorkerTaskCountStatsMonitor module is included.|category, version.|Varies.|
+|`worker/taskSlot/used/count`|Number of busy task slots on the reporting worker per emission period. This metric is only available if the WorkerTaskCountStatsMonitor module is included.|category, version.|Varies.|
## Shuffle metrics (Native parallel task)
diff --git a/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json b/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json
index 6c0fa75d1a..a1f5a85479 100644
--- a/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json
+++ b/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json
@@ -63,9 +63,15 @@
"task/pending/count" : { "dimensions" : ["dataSource"], "type" : "gauge" },
"task/waiting/count" : { "dimensions" : ["dataSource"], "type" : "gauge" },
+ "worker/task/failed/count" : { "dimensions" : ["category", "version"], "type" : "count" },
+ "worker/task/success/count" : { "dimensions" : ["category", "version"], "type" : "count" },
+ "worker/taskSlot/idle/count" : { "dimensions" : ["category", "version"], "type" : "gauge" },
+ "worker/taskSlot/total/count" : { "dimensions" : ["category", "version"], "type" : "gauge" },
+ "worker/taskSlot/used/count" : { "dimensions" : ["category", "version"], "type" : "gauge" },
+
"taskSlot/total/count" : { "dimensions" : ["category"], "type" : "gauge" },
"taskSlot/idle/count" : { "dimensions" : ["category"], "type" : "gauge" },
- "taskSlot/busy/count" : { "dimensions" : ["category"], "type" : "gauge" },
+ "taskSlot/used/count" : { "dimensions" : ["category"], "type" : "gauge" },
"taskSlot/lazy/count" : { "dimensions" : ["category"], "type" : "gauge" },
"taskSlot/blacklisted/count" : { "dimensions" : ["category"], "type" : "gauge" },
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
index c35e10cb25..77c515502d 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
@@ -63,6 +63,7 @@ import org.apache.druid.query.DruidMetrics;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.log.StartupLoggingConfig;
import org.apache.druid.server.metrics.MonitorsConfig;
+import org.apache.druid.server.metrics.WorkerTaskCountStatsProvider;
import org.apache.druid.tasklogs.TaskLogPusher;
import org.apache.druid.tasklogs.TaskLogStreamer;
import org.joda.time.DateTime;
@@ -83,13 +84,14 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
/**
* Runs tasks in separate processes using the "internal peon" verb.
*/
public class ForkingTaskRunner
extends BaseRestorableTaskRunner<ForkingTaskRunner.ForkingTaskRunnerWorkItem>
- implements TaskLogStreamer
+ implements TaskLogStreamer, WorkerTaskCountStatsProvider
{
private static final EmittingLogger LOGGER = new EmittingLogger(ForkingTaskRunner.class);
private static final String CHILD_PROPERTY_PREFIX = "druid.indexer.fork.property.";
@@ -104,6 +106,11 @@ public class ForkingTaskRunner
private volatile boolean stopping = false;
+ private static final AtomicLong LAST_REPORTED_FAILED_TASK_COUNT = new AtomicLong();
+ private static final AtomicLong FAILED_TASK_COUNT = new AtomicLong();
+ private static final AtomicLong SUCCESSFUL_TASK_COUNT = new AtomicLong();
+ private static final AtomicLong LAST_REPORTED_SUCCESSFUL_TASK_COUNT = new AtomicLong();
+
@Inject
public ForkingTaskRunner(
ForkingTaskRunnerConfig config,
@@ -399,7 +406,11 @@ public class ForkingTaskRunner
)
);
}
-
+ if (status.isSuccess()) {
+ SUCCESSFUL_TASK_COUNT.incrementAndGet();
+ } else {
+ FAILED_TASK_COUNT.incrementAndGet();
+ }
TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), status);
return status;
}
@@ -690,18 +701,12 @@ public class ForkingTaskRunner
@Override
public Map<String, Long> getTotalTaskSlotCount()
{
- if (config.getPorts() != null && !config.getPorts().isEmpty()) {
- return ImmutableMap.of(workerConfig.getCategory(), Long.valueOf(config.getPorts().size()));
- }
- return ImmutableMap.of(workerConfig.getCategory(), Long.valueOf(config.getEndPort() - config.getStartPort() + 1));
+ return ImmutableMap.of(workerConfig.getCategory(), getTotalTaskSlotCountLong());
}
public long getTotalTaskSlotCountLong()
{
- if (config.getPorts() != null && !config.getPorts().isEmpty()) {
- return config.getPorts().size();
- }
- return config.getEndPort() - config.getStartPort() + 1;
+ return workerConfig.getCapacity();
}
@Override
@@ -733,6 +738,54 @@ public class ForkingTaskRunner
return ImmutableMap.of(workerConfig.getCategory(), 0L);
}
+ @Override
+ public Long getWorkerFailedTaskCount()
+ {
+ long failedTaskCount = FAILED_TASK_COUNT.get();
+ long lastReportedFailedTaskCount = LAST_REPORTED_FAILED_TASK_COUNT.get();
+ LAST_REPORTED_FAILED_TASK_COUNT.set(failedTaskCount);
+ return failedTaskCount - lastReportedFailedTaskCount;
+ }
+
+ @Override
+ public Long getWorkerIdleTaskSlotCount()
+ {
+ return Math.max(getTotalTaskSlotCountLong() - getUsedTaskSlotCountLong(), 0);
+ }
+
+ @Override
+ public Long getWorkerUsedTaskSlotCount()
+ {
+ return (long) portFinder.findUsedPortCount();
+ }
+
+ @Override
+ public Long getWorkerTotalTaskSlotCount()
+ {
+ return getTotalTaskSlotCountLong();
+ }
+
+ @Override
+ public String getWorkerCategory()
+ {
+ return workerConfig.getCategory();
+ }
+
+ @Override
+ public String getWorkerVersion()
+ {
+ return workerConfig.getVersion();
+ }
+
+ @Override
+ public Long getWorkerSuccessfulTaskCount()
+ {
+ long successfulTaskCount = SUCCESSFUL_TASK_COUNT.get();
+ long lastReportedSuccessfulTaskCount = LAST_REPORTED_SUCCESSFUL_TASK_COUNT.get();
+ LAST_REPORTED_SUCCESSFUL_TASK_COUNT.set(successfulTaskCount);
+ return successfulTaskCount - lastReportedSuccessfulTaskCount;
+ }
+
protected static class ForkingTaskRunnerWorkItem extends TaskRunnerWorkItem
{
private final Task task;
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java
index ae5c49d4af..2ba8e9d59d 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java
@@ -231,6 +231,12 @@ public class ForkingTaskRunnerTest
@Override
int waitForTaskProcessToComplete(Task task, ProcessHolder processHolder, File logFile, File reportsFile)
{
+ WorkerConfig workerConfig = new WorkerConfig();
+ Assert.assertEquals(1L, (long) this.getWorkerUsedTaskSlotCount());
+ Assert.assertEquals(workerConfig.getCapacity(), (long) this.getWorkerTotalTaskSlotCount());
+ Assert.assertEquals(workerConfig.getCapacity() - 1, (long) this.getWorkerIdleTaskSlotCount());
+ Assert.assertEquals(workerConfig.getCategory(), this.getWorkerCategory());
+ Assert.assertEquals(workerConfig.getVersion(), this.getWorkerVersion());
// Emulate task process failure
return 1;
}
@@ -242,6 +248,8 @@ public class ForkingTaskRunnerTest
"Task execution process exited unsuccessfully with code[1]. See middleManager logs for more details.",
status.getErrorMsg()
);
+ Assert.assertEquals(1L, (long) forkingTaskRunner.getWorkerFailedTaskCount());
+ Assert.assertEquals(0L, (long) forkingTaskRunner.getWorkerSuccessfulTaskCount());
}
@Test
@@ -294,6 +302,12 @@ public class ForkingTaskRunnerTest
@Override
int waitForTaskProcessToComplete(Task task, ProcessHolder processHolder, File logFile, File reportsFile)
{
+ WorkerConfig workerConfig = new WorkerConfig();
+ Assert.assertEquals(1L, (long) this.getWorkerUsedTaskSlotCount());
+ Assert.assertEquals(workerConfig.getCapacity(), (long) this.getWorkerTotalTaskSlotCount());
+ Assert.assertEquals(workerConfig.getCapacity() - 1, (long) this.getWorkerIdleTaskSlotCount());
+ Assert.assertEquals(workerConfig.getCategory(), this.getWorkerCategory());
+ Assert.assertEquals(workerConfig.getVersion(), this.getWorkerVersion());
return 0;
}
};
@@ -301,6 +315,8 @@ public class ForkingTaskRunnerTest
final TaskStatus status = forkingTaskRunner.run(task).get();
Assert.assertEquals(TaskState.SUCCESS, status.getStatusCode());
Assert.assertNull(status.getErrorMsg());
+ Assert.assertEquals(0L, (long) forkingTaskRunner.getWorkerFailedTaskCount());
+ Assert.assertEquals(1L, (long) forkingTaskRunner.getWorkerSuccessfulTaskCount());
}
@Test
diff --git a/server/src/main/java/org/apache/druid/server/metrics/WorkerTaskCountStatsMonitor.java b/server/src/main/java/org/apache/druid/server/metrics/WorkerTaskCountStatsMonitor.java
new file mode 100644
index 0000000000..cfd01610a1
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/server/metrics/WorkerTaskCountStatsMonitor.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.metrics;
+
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import org.apache.druid.discovery.NodeRole;
+import org.apache.druid.guice.annotations.Self;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.java.util.metrics.AbstractMonitor;
+
+import java.util.Set;
+
+public class WorkerTaskCountStatsMonitor extends AbstractMonitor
+{
+ private final WorkerTaskCountStatsProvider statsProvider;
+ private final String workerCategory;
+ private final String workerVersion;
+ private final boolean isMiddleManager;
+
+ @Inject
+ public WorkerTaskCountStatsMonitor(
+ Injector injector,
+ @Self Set<NodeRole> nodeRoles
+ )
+ {
+ this.isMiddleManager = nodeRoles.contains(NodeRole.MIDDLE_MANAGER);
+ if (isMiddleManager) {
+ this.statsProvider = injector.getInstance(WorkerTaskCountStatsProvider.class);
+ this.workerCategory = statsProvider.getWorkerCategory();
+ this.workerVersion = statsProvider.getWorkerVersion();
+ } else {
+ this.statsProvider = null;
+ this.workerCategory = null;
+ this.workerVersion = null;
+ }
+ }
+
+ @Override
+ public boolean doMonitor(ServiceEmitter emitter)
+ {
+ if (isMiddleManager) {
+ emit(emitter, "worker/task/failed/count", statsProvider.getWorkerFailedTaskCount());
+ emit(emitter, "worker/task/success/count", statsProvider.getWorkerSuccessfulTaskCount());
+ emit(emitter, "worker/taskSlot/idle/count", statsProvider.getWorkerIdleTaskSlotCount());
+ emit(emitter, "worker/taskSlot/total/count", statsProvider.getWorkerTotalTaskSlotCount());
+ emit(emitter, "worker/taskSlot/used/count", statsProvider.getWorkerUsedTaskSlotCount());
+ }
+ return true;
+ }
+
+ private void emit(ServiceEmitter emitter, String metricName, Long value)
+ {
+ if (value != null) {
+ final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder();
+ builder.setDimension("category", workerCategory);
+ builder.setDimension("version", workerVersion);
+ emitter.emit(builder.build(metricName, value));
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/druid/server/metrics/WorkerTaskCountStatsProvider.java b/server/src/main/java/org/apache/druid/server/metrics/WorkerTaskCountStatsProvider.java
new file mode 100644
index 0000000000..3ca9df5624
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/server/metrics/WorkerTaskCountStatsProvider.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.metrics;
+
+/**
+ * Proides task / task count status at the level of individual worker nodes. These merics
+ * are repoerted by workers, like middle-managers.
+ */
+public interface WorkerTaskCountStatsProvider
+{
+ /**
+ * The number of failed tasks run on worker during emission period.
+ */
+ Long getWorkerFailedTaskCount();
+
+ /**
+ * The number of successful tasks run on worker during emission period.
+ */
+ Long getWorkerSuccessfulTaskCount();
+
+ /**
+ * The number of idle task slots on worker.
+ */
+ Long getWorkerIdleTaskSlotCount();
+
+ /**
+ * The number of total task slots on worker.
+ */
+ Long getWorkerTotalTaskSlotCount();
+
+ /**
+ * The number of used task slots on worker.
+ */
+ Long getWorkerUsedTaskSlotCount();
+
+
+ /**
+ * The worker category.
+ */
+ String getWorkerCategory();
+
+ /**
+ * The worker version.
+ */
+ String getWorkerVersion();
+}
diff --git a/server/src/test/java/org/apache/druid/server/metrics/WorkerTaskCountStatsMonitorTest.java b/server/src/test/java/org/apache/druid/server/metrics/WorkerTaskCountStatsMonitorTest.java
new file mode 100644
index 0000000000..17702c0fad
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/server/metrics/WorkerTaskCountStatsMonitorTest.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.metrics;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Module;
+import org.apache.druid.discovery.NodeRole;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+public class WorkerTaskCountStatsMonitorTest
+{
+ private Injector injectorForMiddleManager;
+ private Injector injectorForMiddleManagerNullStats;
+ private Injector injectorForPeon;
+
+ private WorkerTaskCountStatsProvider statsProvider;
+ private WorkerTaskCountStatsProvider nullStatsProvider;
+
+ @Before
+ public void setUp()
+ {
+ statsProvider = new WorkerTaskCountStatsProvider()
+ {
+ @Override
+ public Long getWorkerTotalTaskSlotCount()
+ {
+ return 5L;
+ }
+
+ @Override
+ public Long getWorkerFailedTaskCount()
+ {
+ return 4L;
+ }
+
+ @Override
+ public Long getWorkerIdleTaskSlotCount()
+ {
+ return 3L;
+ }
+
+ @Override
+ public Long getWorkerSuccessfulTaskCount()
+ {
+ return 2L;
+ }
+
+ @Override
+ public Long getWorkerUsedTaskSlotCount()
+ {
+ return 1L;
+ }
+
+ @Override
+ public String getWorkerCategory()
+ {
+ return "workerCategory";
+ }
+
+ @Override
+ public String getWorkerVersion()
+ {
+ return "workerVersion";
+ }
+ };
+
+ nullStatsProvider = new WorkerTaskCountStatsProvider()
+ {
+ @Nullable
+ @Override
+ public Long getWorkerTotalTaskSlotCount()
+ {
+ return null;
+ }
+
+ @Nullable
+ @Override
+ public Long getWorkerFailedTaskCount()
+ {
+ return null;
+ }
+
+ @Nullable
+ @Override
+ public Long getWorkerIdleTaskSlotCount()
+ {
+ return null;
+ }
+
+ @Nullable
+ @Override
+ public Long getWorkerSuccessfulTaskCount()
+ {
+ return null;
+ }
+
+ @Nullable
+ @Override
+ public Long getWorkerUsedTaskSlotCount()
+ {
+ return null;
+ }
+
+ @Nullable
+ @Override
+ public String getWorkerCategory()
+ {
+ return null;
+ }
+
+ @Nullable
+ @Override
+ public String getWorkerVersion()
+ {
+ return null;
+ }
+ };
+
+ injectorForMiddleManager = Guice.createInjector(
+ ImmutableList.of(
+ (Module) binder -> {
+ binder.bind(WorkerTaskCountStatsProvider.class).toInstance(statsProvider);
+ }
+ )
+ );
+
+ injectorForMiddleManagerNullStats = Guice.createInjector(
+ ImmutableList.of(
+ (Module) binder -> {
+ binder.bind(WorkerTaskCountStatsProvider.class).toInstance(nullStatsProvider);
+ }
+ )
+ );
+
+ injectorForPeon = Guice.createInjector(
+ ImmutableList.of(
+ (Module) binder -> {}
+ )
+ );
+ }
+
+ @Test
+ public void testMonitor()
+ {
+ final WorkerTaskCountStatsMonitor monitor =
+ new WorkerTaskCountStatsMonitor(injectorForMiddleManager, ImmutableSet.of(NodeRole.MIDDLE_MANAGER));
+ final StubServiceEmitter emitter = new StubServiceEmitter("service", "host");
+ monitor.doMonitor(emitter);
+ Assert.assertEquals(5, emitter.getEvents().size());
+ Assert.assertEquals("worker/task/failed/count", emitter.getEvents().get(0).toMap().get("metric"));
+ Assert.assertEquals("workerCategory", emitter.getEvents().get(0).toMap().get("category"));
+ Assert.assertEquals("workerVersion", emitter.getEvents().get(0).toMap().get("version"));
+ Assert.assertEquals(4L, emitter.getEvents().get(0).toMap().get("value"));
+ Assert.assertEquals("worker/task/success/count", emitter.getEvents().get(1).toMap().get("metric"));
+ Assert.assertEquals("workerCategory", emitter.getEvents().get(1).toMap().get("category"));
+ Assert.assertEquals("workerVersion", emitter.getEvents().get(1).toMap().get("version"));
+ Assert.assertEquals(2L, emitter.getEvents().get(1).toMap().get("value"));
+ Assert.assertEquals("worker/taskSlot/idle/count", emitter.getEvents().get(2).toMap().get("metric"));
+ Assert.assertEquals("workerCategory", emitter.getEvents().get(2).toMap().get("category"));
+ Assert.assertEquals("workerVersion", emitter.getEvents().get(2).toMap().get("version"));
+ Assert.assertEquals(3L, emitter.getEvents().get(2).toMap().get("value"));
+ Assert.assertEquals("worker/taskSlot/total/count", emitter.getEvents().get(3).toMap().get("metric"));
+ Assert.assertEquals("workerCategory", emitter.getEvents().get(3).toMap().get("category"));
+ Assert.assertEquals("workerVersion", emitter.getEvents().get(3).toMap().get("version"));
+ Assert.assertEquals(5L, emitter.getEvents().get(3).toMap().get("value"));
+ Assert.assertEquals("worker/taskSlot/used/count", emitter.getEvents().get(4).toMap().get("metric"));
+ Assert.assertEquals("workerCategory", emitter.getEvents().get(4).toMap().get("category"));
+ Assert.assertEquals("workerVersion", emitter.getEvents().get(4).toMap().get("version"));
+ Assert.assertEquals(1L, emitter.getEvents().get(4).toMap().get("value"));
+ }
+
+ @Test
+ public void testMonitorWithNulls()
+ {
+ final WorkerTaskCountStatsMonitor monitor =
+ new WorkerTaskCountStatsMonitor(injectorForMiddleManagerNullStats, ImmutableSet.of(NodeRole.MIDDLE_MANAGER));
+ final StubServiceEmitter emitter = new StubServiceEmitter("service", "host");
+ monitor.doMonitor(emitter);
+ Assert.assertEquals(0, emitter.getEvents().size());
+ }
+
+ @Test
+ public void testMonitorNotMiddleManager()
+ {
+ final WorkerTaskCountStatsMonitor monitor =
+ new WorkerTaskCountStatsMonitor(injectorForPeon, ImmutableSet.of(NodeRole.PEON));
+ final StubServiceEmitter emitter = new StubServiceEmitter("service", "host");
+ monitor.doMonitor(emitter);
+ Assert.assertEquals(0, emitter.getEvents().size());
+ }
+}
diff --git a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java
index a005e3a48d..ed424ba037 100644
--- a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java
+++ b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java
@@ -78,6 +78,7 @@ import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.http.SelfDiscoveryResource;
import org.apache.druid.server.initialization.jetty.JettyServerInitializer;
+import org.apache.druid.server.metrics.WorkerTaskCountStatsProvider;
import org.apache.druid.timeline.PruneLastCompactionState;
import org.eclipse.jetty.server.Server;
@@ -137,6 +138,7 @@ public class CliMiddleManager extends ServerRunnable
binder.bind(TaskRunner.class).to(ForkingTaskRunner.class);
binder.bind(ForkingTaskRunner.class).in(LazySingleton.class);
+ binder.bind(WorkerTaskCountStatsProvider.class).to(ForkingTaskRunner.class);
binder.bind(IndexingServiceClient.class).to(HttpIndexingServiceClient.class).in(LazySingleton.class);
binder.bind(new TypeLiteral<IndexTaskClientFactory<ParallelIndexSupervisorTaskClient>>() {})
diff --git a/website/.spelling b/website/.spelling
index e83d1b8598..89ad85dd32 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -1428,6 +1428,7 @@ Sys
SysMonitor
TaskCountStatsMonitor
TaskSlotCountStatsMonitor
+WorkerTaskCountStatsMonitor
bufferCapacity
bufferpoolName
cms
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org