You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2021/02/05 09:53:57 UTC
[incubator-pinot] 01/01: Adding metrics for minion tasks status
This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch report_pending_tasks_number
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 56c792532b7d92bca034f4a926642ef1c9bb0504
Author: Xiang Fu <fx...@gmail.com>
AuthorDate: Fri Feb 5 01:53:26 2021 -0800
Adding metrics for minion tasks status
---
.../etc/jmx_prometheus_javaagent/configs/pinot.yml | 5 +++
.../pinot/common/metrics/ControllerGauge.java | 5 ++-
.../helix/core/minion/PinotTaskManager.java | 41 ++++++++++++++++++
.../helix/core/minion/TaskTypeMetricsUpdater.java | 50 ++++++++++++++++++++++
.../tests/SimpleMinionClusterIntegrationTest.java | 49 ++++++++++++++++++---
5 files changed, 142 insertions(+), 8 deletions(-)
diff --git a/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/pinot.yml b/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/pinot.yml
index 0221fde..40cd394 100644
--- a/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/pinot.yml
+++ b/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/pinot.yml
@@ -67,6 +67,11 @@ rules:
labels:
table: "$1"
taskType: "$2"
+- pattern: "\"org.apache.pinot.common.metrics\"<type=\"ControllerMetrics\", name=\"pinot.controller.taskStatus.(\\w+)\\.(\\w+)\"><>(\\w+)"
+ name: "pinot_controller_taskStatus_$3"
+ labels:
+ taskType: "$1"
+ status: "$2"
# Pinot Broker
- pattern: "\"org.apache.pinot.common.metrics\"<type=\"BrokerMetrics\", name=\"pinot.broker.(\\w+).authorization\"><>(\\w+)"
name: "pinot_broker_authorization_$2"
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
index 3f69f45..c4990a5 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
@@ -67,7 +67,10 @@ public enum ControllerGauge implements AbstractMetrics.Gauge {
TABLE_STORAGE_EST_MISSING_SEGMENT_PERCENT("TableStorageEstMissingSegmentPercent", false),
// Number of scheduled Cron jobs
- CRON_SCHEDULER_JOB_SCHEDULED("cronSchedulerJobScheduled", false);
+ CRON_SCHEDULER_JOB_SCHEDULED("cronSchedulerJobScheduled", false),
+
+ // Number of Tasks Status
+ TASK_STATUS("taskStatus", false);
private final String gaugeName;
private final String unit;
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
index 0a899c5..71f78a6 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
@@ -29,6 +29,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nullable;
+import org.apache.helix.task.TaskState;
import org.apache.pinot.common.metrics.ControllerGauge;
import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerMetrics;
@@ -72,12 +73,15 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> {
private static final String TABLE_CONFIG_PARENT_PATH = "/CONFIGS/TABLE";
private static final String TABLE_CONFIG_PATH_PREFIX = "/CONFIGS/TABLE/";
+ private static final String TASK_QUEUE_PATH_PATTERN = "/TaskRebalancer/TaskQueue_%s/Context";
private final PinotHelixTaskResourceManager _helixTaskResourceManager;
private final ClusterInfoAccessor _clusterInfoAccessor;
private final TaskGeneratorRegistry _taskGeneratorRegistry;
private final Map<String, Map<String, String>> _tableTaskTypeToCronExpressionMap = new ConcurrentHashMap<>();
private final Map<String, TableTaskSchedulerUpdater> _tableTaskSchedulerUpdaterMap = new ConcurrentHashMap<>();
+ private final Map<String, TaskTypeMetricsUpdater> _taskTypeMetricsUpdaterMap = new ConcurrentHashMap<>();
+ private final Map<TaskState, Integer> _taskStateToCountMap = new ConcurrentHashMap<>();
private Scheduler _scheduledExecutorService = null;
@@ -125,6 +129,10 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> {
return TABLE_CONFIG_PATH_PREFIX + tableWithType;
}
+ private String getPropertyStorePathForTaskQueue(String taskType) {
+ return String.format(TASK_QUEUE_PATH_PATTERN, taskType);
+ }
+
public synchronized void cleanUpCronTaskSchedulerForTable(String tableWithType) {
LOGGER.info("Cleaning up task in scheduler for table {}", tableWithType);
TableTaskSchedulerUpdater tableTaskSchedulerUpdater = _tableTaskSchedulerUpdaterMap.get(tableWithType);
@@ -369,6 +377,7 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> {
String taskType = entry.getKey();
List<TableConfig> enabledTableConfigs = entry.getValue();
PinotTaskGenerator taskGenerator = _taskGeneratorRegistry.getTaskGenerator(taskType);
+ scheduleTaskTypeMetricsUpdaterIfNeeded(taskType);
if (taskGenerator != null) {
_helixTaskResourceManager.ensureTaskQueueExists(taskType);
tasksScheduled.put(taskType, scheduleTask(taskGenerator, enabledTableConfigs, isLeader));
@@ -438,6 +447,7 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> {
}
}
+ scheduleTaskTypeMetricsUpdaterIfNeeded(taskType);
_helixTaskResourceManager.ensureTaskQueueExists(taskType);
return scheduleTask(taskGenerator, enabledTableConfigs, false);
}
@@ -458,6 +468,7 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> {
.checkState(tableConfig.getTaskConfig() != null && tableConfig.getTaskConfig().isTaskTypeEnabled(taskType),
"Table: %s does not have task type: %s enabled", tableNameWithType, taskType);
+ scheduleTaskTypeMetricsUpdaterIfNeeded(taskType);
_helixTaskResourceManager.ensureTaskQueueExists(taskType);
return scheduleTask(taskGenerator, Collections.singletonList(tableConfig), false);
}
@@ -478,4 +489,34 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> {
public Scheduler getScheduler() {
return _scheduledExecutorService;
}
+
+ public synchronized void reportMetrics(String taskType) {
+ Map<String, TaskState> taskStates = _helixTaskResourceManager.getTaskStates(taskType);
+ Map<TaskState, Integer> taskStateToCountMap = new HashMap<>();
+ for (String taskName : taskStates.keySet()) {
+ TaskState taskState = taskStates.get(taskName);
+ if (!taskStateToCountMap.containsKey(taskState)) {
+ taskStateToCountMap.put(taskState, 0);
+ }
+ taskStateToCountMap.put(taskState, taskStateToCountMap.get(taskState) + 1);
+ }
+ // Reset all the status to 0
+ for (TaskState taskState : _taskStateToCountMap.keySet()) {
+ _taskStateToCountMap.put(taskState, 0);
+ }
+ _taskStateToCountMap.putAll(taskStateToCountMap);
+ for (TaskState taskState : _taskStateToCountMap.keySet()) {
+ _controllerMetrics.setValueOfTableGauge(String.format("%s.%s", taskType, taskState), ControllerGauge.TASK_STATUS,
+ _taskStateToCountMap.get(taskState));
+ }
+ }
+
+ private synchronized void scheduleTaskTypeMetricsUpdaterIfNeeded(String taskType) {
+ if (!_taskTypeMetricsUpdaterMap.containsKey(taskType)) {
+ TaskTypeMetricsUpdater taskTypeMetricsUpdater = new TaskTypeMetricsUpdater(taskType, this);
+ _pinotHelixResourceManager.getPropertyStore()
+ .subscribeDataChanges(getPropertyStorePathForTaskQueue(taskType), taskTypeMetricsUpdater);
+ _taskTypeMetricsUpdaterMap.put(taskType, taskTypeMetricsUpdater);
+ }
+ }
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TaskTypeMetricsUpdater.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TaskTypeMetricsUpdater.java
new file mode 100644
index 0000000..f4ebc8e
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TaskTypeMetricsUpdater.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.minion;
+
+import org.I0Itec.zkclient.IZkDataListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class TaskTypeMetricsUpdater implements IZkDataListener {
+ private static final Logger LOGGER = LoggerFactory.getLogger(TaskTypeMetricsUpdater.class);
+ private final String _taskType;
+ private final PinotTaskManager _pinotTaskManager;
+
+ public TaskTypeMetricsUpdater(String taskType, PinotTaskManager pinotTaskManager) {
+ _taskType = taskType;
+ _pinotTaskManager = pinotTaskManager;
+ }
+
+ @Override
+ public void handleDataChange(String dataPath, Object data)
+ throws Exception {
+ try {
+ _pinotTaskManager.reportMetrics(_taskType);
+ } catch (Exception e) {
+ LOGGER.error("Failed to update metrics for task type {}", _taskType, e);
+ throw e;
+ }
+ }
+
+ @Override
+ public void handleDataDeleted(String dataPath) {
+ }
+}
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
index b2c2674..120ed89 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
@@ -27,6 +27,7 @@ import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
import org.apache.helix.task.TaskState;
+import org.apache.pinot.common.metrics.ControllerGauge;
import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
@@ -45,11 +46,16 @@ import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.util.TestUtils;
+import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import static org.testng.Assert.*;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
/**
@@ -62,6 +68,7 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest {
private static final String TABLE_NAME_2 = "testTable2";
private static final String TABLE_NAME_3 = "testTable3";
private static final long STATE_TRANSITION_TIMEOUT_MS = 60_000L; // 1 minute
+ private static final int NUM_TASKS = 2;
private static final AtomicBoolean HOLD = new AtomicBoolean();
private static final AtomicBoolean TASK_START_NOTIFIED = new AtomicBoolean();
@@ -120,7 +127,7 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest {
// Wait at most 60 seconds for all tasks IN_PROGRESS
TestUtils.waitForCondition(input -> {
Collection<TaskState> taskStates = _helixTaskResourceManager.getTaskStates(TASK_TYPE).values();
- assertEquals(taskStates.size(), 2);
+ assertEquals(taskStates.size(), NUM_TASKS);
for (TaskState taskState : taskStates) {
if (taskState != TaskState.IN_PROGRESS) {
return false;
@@ -133,13 +140,20 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest {
return true;
}, STATE_TRANSITION_TIMEOUT_MS, "Failed to get all tasks IN_PROGRESS");
+ Assert.assertEquals(_controllerStarter.getControllerMetrics()
+ .getValueOfTableGauge(TASK_TYPE + "." + TaskState.IN_PROGRESS, ControllerGauge.TASK_STATUS), NUM_TASKS);
+ Assert.assertEquals(_controllerStarter.getControllerMetrics()
+ .getValueOfTableGauge(TASK_TYPE + "." + TaskState.COMPLETED, ControllerGauge.TASK_STATUS), 0);
+ Assert.assertEquals(_controllerStarter.getControllerMetrics()
+ .getValueOfTableGauge(TASK_TYPE + "." + TaskState.STOPPED, ControllerGauge.TASK_STATUS), 0);
+
// Stop the task queue
_helixTaskResourceManager.stopTaskQueue(TASK_TYPE);
// Wait at most 60 seconds for all tasks STOPPED
TestUtils.waitForCondition(input -> {
Collection<TaskState> taskStates = _helixTaskResourceManager.getTaskStates(TASK_TYPE).values();
- assertEquals(taskStates.size(), 2);
+ assertEquals(taskStates.size(), NUM_TASKS);
for (TaskState taskState : taskStates) {
if (taskState != TaskState.STOPPED) {
return false;
@@ -152,6 +166,13 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest {
return true;
}, STATE_TRANSITION_TIMEOUT_MS, "Failed to get all tasks STOPPED");
+ Assert.assertEquals(_controllerStarter.getControllerMetrics()
+ .getValueOfTableGauge(TASK_TYPE + "." + TaskState.IN_PROGRESS, ControllerGauge.TASK_STATUS), 0);
+ Assert.assertEquals(_controllerStarter.getControllerMetrics()
+ .getValueOfTableGauge(TASK_TYPE + "." + TaskState.COMPLETED, ControllerGauge.TASK_STATUS), 0);
+ Assert.assertEquals(_controllerStarter.getControllerMetrics()
+ .getValueOfTableGauge(TASK_TYPE + "." + TaskState.STOPPED, ControllerGauge.TASK_STATUS), NUM_TASKS);
+
// Resume the task queue, and let the task complete
_helixTaskResourceManager.resumeTaskQueue(TASK_TYPE);
HOLD.set(false);
@@ -159,7 +180,7 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest {
// Wait at most 60 seconds for all tasks COMPLETED
TestUtils.waitForCondition(input -> {
Collection<TaskState> taskStates = _helixTaskResourceManager.getTaskStates(TASK_TYPE).values();
- assertEquals(taskStates.size(), 2);
+ assertEquals(taskStates.size(), NUM_TASKS);
for (TaskState taskState : taskStates) {
if (taskState != TaskState.COMPLETED) {
return false;
@@ -172,12 +193,26 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest {
return true;
}, STATE_TRANSITION_TIMEOUT_MS, "Failed to get all tasks COMPLETED");
+ Assert.assertEquals(_controllerStarter.getControllerMetrics()
+ .getValueOfTableGauge(TASK_TYPE + "." + TaskState.IN_PROGRESS, ControllerGauge.TASK_STATUS), 0);
+ Assert.assertEquals(_controllerStarter.getControllerMetrics()
+ .getValueOfTableGauge(TASK_TYPE + "." + TaskState.COMPLETED, ControllerGauge.TASK_STATUS), NUM_TASKS);
+ Assert.assertEquals(_controllerStarter.getControllerMetrics()
+ .getValueOfTableGauge(TASK_TYPE + "." + TaskState.STOPPED, ControllerGauge.TASK_STATUS), 0);
+
// Delete the task queue
_helixTaskResourceManager.deleteTaskQueue(TASK_TYPE, false);
// Wait at most 60 seconds for task queue to be deleted
TestUtils.waitForCondition(input -> !_helixTaskResourceManager.getTaskTypes().contains(TASK_TYPE),
STATE_TRANSITION_TIMEOUT_MS, "Failed to delete the task queue");
+
+ Assert.assertEquals(_controllerStarter.getControllerMetrics()
+ .getValueOfTableGauge(TASK_TYPE + "." + TaskState.IN_PROGRESS, ControllerGauge.TASK_STATUS), 0);
+ Assert.assertEquals(_controllerStarter.getControllerMetrics()
+ .getValueOfTableGauge(TASK_TYPE + "." + TaskState.COMPLETED, ControllerGauge.TASK_STATUS), NUM_TASKS);
+ Assert.assertEquals(_controllerStarter.getControllerMetrics()
+ .getValueOfTableGauge(TASK_TYPE + "." + TaskState.STOPPED, ControllerGauge.TASK_STATUS), 0);
}
@AfterClass
@@ -209,10 +244,10 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest {
@Override
public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
- assertEquals(tableConfigs.size(), 2);
+ assertEquals(tableConfigs.size(), NUM_TASKS);
// Generate at most 2 tasks
- if (_clusterInfoAccessor.getTaskStates(TASK_TYPE).size() >= 2) {
+ if (_clusterInfoAccessor.getTaskStates(TASK_TYPE).size() >= NUM_TASKS) {
return Collections.emptyList();
}
@@ -249,7 +284,7 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest {
assertEquals(pinotTaskConfig.getTaskType(), TASK_TYPE);
Map<String, String> configs = pinotTaskConfig.getConfigs();
- assertEquals(configs.size(), 2);
+ assertEquals(configs.size(), NUM_TASKS);
String offlineTableName = configs.get("tableName");
assertEquals(TableNameBuilder.getTableTypeFromTableName(offlineTableName), TableType.OFFLINE);
String rawTableName = TableNameBuilder.extractRawTableName(offlineTableName);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org