You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by mc...@apache.org on 2021/06/28 21:01:29 UTC
[incubator-pinot] branch master updated: Added TaskMetricsEmitted
periodic controller job (#7091)
This is an automated email from the ASF dual-hosted git repository.
mcvsubbu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new c950926 Added TaskMetricsEmitted periodic controller job (#7091)
c950926 is described below
commit c9509261672d0f9c2b35fa834eb8fe998ed23161
Author: Subbu Subramaniam <mc...@users.noreply.github.com>
AuthorDate: Mon Jun 28 14:01:14 2021 -0700
Added TaskMetricsEmitted periodic controller job (#7091)
* Added TaskMetricsEmitted periodic controller job
This job runs every 5 mins by default and emits metrics about Minion tasks
scheduled in Pinot. For now, the following metrics are emitted for each
task type:
- Number of running tasks
- Number of running sub-tasks
- Number of waiting sub-taks (unassigned to a minion as yet)
- Number of error sub-tasks (completed with an error/exception)
- Percent of sub-tasks in error
- Percent of sub-tasks in waiting or running states.
* Addressed review comments and added unit tests
* Addressed review comments and resolved conflicts
* Cleaned up with spotless:apply
* Handle another null case
* spotless apply
* Fix compile error after merge
* Added comments on the minion integration tests
---
.../pinot/common/metrics/ControllerGauge.java | 6 ++
.../apache/pinot/controller/ControllerConf.java | 12 +++
.../core/minion/PinotHelixTaskResourceManager.java | 118 ++++++++++++++++++++-
.../helix/core/minion/PinotTaskManager.java | 3 +-
.../helix/core/minion/TaskMetricsEmitter.java | 90 ++++++++++++++++
.../tests/SimpleMinionClusterIntegrationTest.java | 36 +++++--
6 files changed, 253 insertions(+), 12 deletions(-)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
index 1ebeac1..2b91ba0 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
@@ -45,6 +45,12 @@ public enum ControllerGauge implements AbstractMetrics.Gauge {
OFFLINE_TABLE_COUNT("TableCount", true),
DISABLED_TABLE_COUNT("TableCount", true),
PERIODIC_TASK_NUM_TABLES_PROCESSED("PeriodicTaskNumTablesProcessed", true),
+ NUM_MINION_TASKS_IN_PROGRESS("NumMinionTasksInProgress", true),
+ NUM_MINION_SUBTASKS_WAITING("NumMinionSubtasksWaiting", true),
+ NUM_MINION_SUBTASKS_RUNNING("NumMinionSubtasksRunning", true),
+ NUM_MINION_SUBTASKS_ERROR("NumMinionSubtasksError", true),
+ PERCENT_MINION_SUBTASKS_IN_QUEUE("PercentMinionSubtasksInQueue", true),
+ PERCENT_MINION_SUBTASKS_IN_ERROR("PercentMinionSubtasksInError", true),
// Pinot controller leader
PINOT_CONTROLLER_LEADER("PinotControllerLeader", true),
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index 80bd173..7db3a27 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -98,6 +98,7 @@ public class ControllerConf extends PinotConfiguration {
"controller.minion.instances.cleanup.task.frequencyInSeconds";
public static final String MINION_INSTANCES_CLEANUP_TASK_INITIAL_DELAY_SECONDS =
"controller.minion.instances.cleanup.task.initialDelaySeconds";
+ public static final String TASK_METRICS_EMITTER_FREQUENCY_IN_SECONDS = "controller.minion.task.metrics.emitter.frequencyInSeconds";
public static final String PINOT_TASK_MANAGER_SCHEDULER_ENABLED = "controller.task.scheduler.enabled";
@Deprecated
@@ -140,6 +141,7 @@ public class ControllerConf extends PinotConfiguration {
private static final int DEFAULT_REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS = 60 * 60; // 1 Hour.
private static final int DEFAULT_BROKER_RESOURCE_VALIDATION_FREQUENCY_IN_SECONDS = 60 * 60; // 1 Hour.
private static final int DEFAULT_STATUS_CONTROLLER_FREQUENCY_IN_SECONDS = 5 * 60; // 5 minutes
+ private static final int DEFAULT_TASK_METRICS_EMITTER_FREQUENCY_IN_SECONDS = 5 * 60; // 5 minutes
private static final int DEFAULT_STATUS_CONTROLLER_WAIT_FOR_PUSH_TIME_IN_SECONDS = 10 * 60; // 10 minutes
private static final int DEFAULT_TASK_MANAGER_FREQUENCY_IN_SECONDS = -1; // Disabled
private static final int DEFAULT_MINION_INSTANCES_CLEANUP_TASK_FREQUENCY_IN_SECONDS = 60 * 60; // 1 Hour.
@@ -490,6 +492,16 @@ public class ControllerConf extends PinotConfiguration {
Integer.toString(statusCheckerFrequencyInSeconds));
}
+ public int getTaskMetricsEmitterFrequencyInSeconds() {
+ return getProperty(ControllerPeriodicTasksConf.TASK_METRICS_EMITTER_FREQUENCY_IN_SECONDS,
+ ControllerPeriodicTasksConf.DEFAULT_TASK_METRICS_EMITTER_FREQUENCY_IN_SECONDS);
+ }
+
+ public void setTaskMetricsEmitterFrequencyInSeconds(int taskMetricsEmitterFrequencyInSeconds) {
+ setProperty(ControllerPeriodicTasksConf.TASK_METRICS_EMITTER_FREQUENCY_IN_SECONDS,
+ Integer.toString(taskMetricsEmitterFrequencyInSeconds));
+ }
+
public int getStatusCheckerWaitForPushTimeInSeconds() {
return getProperty(ControllerPeriodicTasksConf.STATUS_CHECKER_WAIT_FOR_PUSH_TIME_IN_SECONDS,
ControllerPeriodicTasksConf.DEFAULT_STATUS_CONTROLLER_WAIT_FOR_PUSH_TIME_IN_SECONDS);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
index ee08384..265d9f3 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
@@ -29,11 +29,14 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobContext;
import org.apache.helix.task.JobQueue;
import org.apache.helix.task.TaskConfig;
import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.TaskPartitionState;
import org.apache.helix.task.TaskState;
import org.apache.helix.task.WorkflowConfig;
+import org.apache.helix.task.WorkflowContext;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.spi.utils.CommonConstants.Helix;
import org.slf4j.Logger;
@@ -42,6 +45,7 @@ import org.slf4j.LoggerFactory;
/**
* The class <code>PinotHelixTaskResourceManager</code> manages all the task resources in Pinot cluster.
+ * In case you are wondering why methods that access taskDriver are synchronized, see comment in PR #1437
*/
public class PinotHelixTaskResourceManager {
private static final Logger LOGGER = LoggerFactory.getLogger(PinotHelixTaskResourceManager.class);
@@ -60,7 +64,8 @@ public class PinotHelixTaskResourceManager {
/**
* Get all task types.
- *
+ * @note: It reads all resource config back and check which are workflows and which are jobs, so it can take some time
+ * if there are a lot of tasks.
* @return Set of all task types
*/
public synchronized Set<String> getTaskTypes() {
@@ -226,7 +231,6 @@ public class PinotHelixTaskResourceManager {
return parentTaskName;
}
-
/**
* Get all tasks for the given task type.
*
@@ -249,8 +253,13 @@ public class PinotHelixTaskResourceManager {
* @return Map from task name to task state
*/
public synchronized Map<String, TaskState> getTaskStates(String taskType) {
- Map<String, TaskState> helixJobStates =
- _taskDriver.getWorkflowContext(getHelixJobQueueName(taskType)).getJobStates();
+ Map<String, TaskState> helixJobStates = new HashMap<>();
+ WorkflowContext workflowContext = _taskDriver.getWorkflowContext(getHelixJobQueueName(taskType));
+
+ if (workflowContext == null) {
+ return helixJobStates;
+ }
+ helixJobStates = workflowContext.getJobStates();
Map<String, TaskState> taskStates = new HashMap<>(helixJobStates.size());
for (Map.Entry<String, TaskState> entry : helixJobStates.entrySet()) {
taskStates.put(getPinotTaskName(entry.getKey()), entry.getValue());
@@ -259,6 +268,58 @@ public class PinotHelixTaskResourceManager {
}
/**
+ * This method returns a count of sub-tasks in various states, given the top-level task name.
+ * @param parentTaskName (e.g. "Task_TestTask_1624403781879")
+ * @return TaskCount object
+ */
+ public synchronized TaskCount getTaskCount(String parentTaskName) {
+ TaskCount taskCount = new TaskCount();
+ JobContext jobContext = _taskDriver.getJobContext(getHelixJobName(parentTaskName));
+
+ if (jobContext == null) {
+ return taskCount;
+ }
+ Set<Integer> partitionSet = jobContext.getPartitionSet();
+ taskCount.addToTotal(partitionSet.size());
+ for (int partition : partitionSet) {
+ TaskPartitionState state = jobContext.getPartitionState(partition);
+ // Helix returns state as null if the task is not enqueued anywhere yet
+ if (state == null) {
+ // task is not yet assigned to a participant
+ taskCount.addToWaiting(1);
+ } else if (state.equals(TaskPartitionState.INIT) || state.equals(TaskPartitionState.RUNNING)) {
+ taskCount.addToRunning(1);
+ } else if (state.equals(TaskPartitionState.TASK_ERROR)) {
+ taskCount.addToError(1);
+ }
+ }
+ return taskCount;
+ }
+
+ /**
+ * Returns a set of Task names (in the form "Task_TestTask_1624403781879") that are in progress or not started yet.
+ *
+ * @param taskType
+ * @return Set of task names
+ */
+ public synchronized Set<String> getTasksInProgress(String taskType) {
+ Set<String> tasksInProgress = new HashSet<>();
+ WorkflowContext workflowContext = _taskDriver.getWorkflowContext(getHelixJobQueueName(taskType));
+ if (workflowContext == null) {
+ return tasksInProgress;
+ }
+
+ Map<String, TaskState> helixJobStates = workflowContext.getJobStates();
+
+ for (Map.Entry<String, TaskState> entry : helixJobStates.entrySet()) {
+ if (entry.getValue().equals(TaskState.NOT_STARTED) || entry.getValue().equals(TaskState.IN_PROGRESS)) {
+ tasksInProgress.add(getPinotTaskName(entry.getKey()));
+ }
+ }
+ return tasksInProgress;
+ }
+
+ /**
* Get the task state for the given task name.
*
* @param taskName Task name
@@ -330,4 +391,53 @@ public class PinotHelixTaskResourceManager {
private static String getTaskType(String name) {
return name.split(TASK_NAME_SEPARATOR)[1];
}
+
+ public static class TaskCount {
+ private int _waiting; // Number of tasks waiting to be scheduled on minions
+ private int _error; // Number of tasks in error
+ private int _running; // Number of tasks currently running in minions
+ private int _total; // Total number of tasks in the batch
+
+ public TaskCount() {
+ }
+
+ public void addToWaiting(int waiting) {
+ _waiting += waiting;
+ }
+
+ public void addToRunning(int running) {
+ _running += running;
+ }
+
+ public void addToTotal(int total) {
+ _total += total;
+ }
+
+ public void addToError(int error) {
+ _error += error;
+ }
+
+ public int getWaiting() {
+ return _waiting;
+ }
+
+ public int getRunning() {
+ return _running;
+ }
+
+ public int getTotal() {
+ return _total;
+ }
+
+ public int getError() {
+ return _error;
+ }
+
+ public void accumulate(TaskCount other) {
+ addToWaiting(other.getWaiting());
+ addToRunning(other.getRunning());
+ addToError(other.getError());
+ addToTotal(other.getTotal());
+ }
+ }
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
index f7a673a..6ae1449 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
@@ -401,8 +401,7 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> {
@Nullable
private String scheduleTask(PinotTaskGenerator taskGenerator, List<TableConfig> enabledTableConfigs,
boolean isLeader) {
- LOGGER.info("Trying to schedule task type: {}, with table config: {}, isLeader: {}", taskGenerator.getTaskType(),
- enabledTableConfigs, isLeader);
+ LOGGER.info("Trying to schedule task type: {}, isLeader: {}", taskGenerator.getTaskType(), isLeader);
List<PinotTaskConfig> pinotTaskConfigs = taskGenerator.generateTasks(enabledTableConfigs);
if (!isLeader) {
taskGenerator.nonLeaderCleanUp();
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TaskMetricsEmitter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TaskMetricsEmitter.java
new file mode 100644
index 0000000..5874d9e
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TaskMetricsEmitter.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.minion;
+
+import java.util.Set;
+import org.apache.pinot.common.metrics.ControllerGauge;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
+import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager.TaskCount;
+import org.apache.pinot.core.periodictask.BasePeriodicTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This class emits task metrics for each type of minion task that is set up in
+ * a Pinot cluster. It is intended to be scheduled with a fairly high frequency,
+ * of the order of minutes.
+ * See ControllerConf class for the default value.
+ */
+public class TaskMetricsEmitter extends BasePeriodicTask {
+ private static final Logger LOGGER = LoggerFactory.getLogger(TaskMetricsEmitter.class);
+ private final static String TASK_NAME = "TaskMetricsEmitter";
+
+ private final PinotHelixTaskResourceManager _helixTaskResourceManager;
+ private final ControllerMetrics _controllerMetrics;
+ private final LeadControllerManager _leadControllerManager;
+
+ public TaskMetricsEmitter(PinotHelixTaskResourceManager helixTaskResourceManager,
+ LeadControllerManager leadControllerManager, ControllerConf controllerConf, ControllerMetrics controllerMetrics) {
+ super(TASK_NAME, controllerConf.getTaskMetricsEmitterFrequencyInSeconds(),
+ controllerConf.getPeriodicTaskInitialDelayInSeconds());
+ _helixTaskResourceManager = helixTaskResourceManager;
+ _controllerMetrics = controllerMetrics;
+ _leadControllerManager = leadControllerManager;
+ }
+
+ @Override
+ protected final void runTask() {
+ // Make it so that only one controller returns the metric for all the tasks.
+ if (!_leadControllerManager.isLeaderForTable(TASK_NAME)) {
+ return;
+ }
+
+ // The call to get task types can take time if there are a lot of tasks.
+ // Potential optimization is to call it every (say) 30m if we detect a barrage of
+ // zk requests.
+ Set<String> taskTypes = _helixTaskResourceManager.getTaskTypes();
+ for (String taskType : taskTypes) {
+ TaskCount accumulated = new TaskCount();
+ try {
+ Set<String> tasksInProgress = _helixTaskResourceManager.getTasksInProgress(taskType);
+ final int numRunningTasks = tasksInProgress.size();
+ for (String task : tasksInProgress) {
+ TaskCount taskCount = _helixTaskResourceManager.getTaskCount(task);
+ accumulated.accumulate(taskCount);
+ }
+ // Emit metrics for taskType.
+ _controllerMetrics.setValueOfGlobalGauge(ControllerGauge.NUM_MINION_TASKS_IN_PROGRESS, taskType, numRunningTasks);
+ _controllerMetrics.setValueOfGlobalGauge(ControllerGauge.NUM_MINION_SUBTASKS_RUNNING, taskType, accumulated.getRunning());
+ _controllerMetrics.setValueOfGlobalGauge(ControllerGauge.NUM_MINION_SUBTASKS_WAITING, taskType, accumulated.getWaiting());
+ _controllerMetrics.setValueOfGlobalGauge(ControllerGauge.NUM_MINION_SUBTASKS_ERROR, taskType, accumulated.getError());
+ int total = accumulated.getTotal();
+ int percent = total != 0 ? (accumulated.getWaiting() + accumulated.getRunning()) * 100 / total : 0;
+ _controllerMetrics.setValueOfGlobalGauge(ControllerGauge.PERCENT_MINION_SUBTASKS_IN_QUEUE, taskType, percent);
+ percent = total != 0 ? accumulated.getError() * 100 / total : 0;
+ _controllerMetrics.setValueOfGlobalGauge(ControllerGauge.PERCENT_MINION_SUBTASKS_IN_ERROR, taskType, percent);
+ } catch (Exception e) {
+ LOGGER.error("Caught exception while getting metrics for task type {}", taskType, e);
+ }
+ }
+ }
+}
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
index 17e984c..930a41e 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
@@ -85,20 +85,44 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest {
startMinion();
}
+ private void verifyTaskCount(String task, int errors, int waiting, int running, int total) {
+ PinotHelixTaskResourceManager.TaskCount taskCount = _helixTaskResourceManager.getTaskCount(task);
+ assertEquals(taskCount.getError(), errors);
+ assertEquals(taskCount.getWaiting(), waiting);
+ assertEquals(taskCount.getRunning(), running);
+ assertEquals(taskCount.getTotal(), total);
+ }
+
@Test
public void testStopResumeDeleteTaskQueue() {
// Hold the task
HOLD.set(true);
+ // No tasks before we start.
+ assertEquals(_helixTaskResourceManager.getTasksInProgress(TASK_TYPE).size(),0);
+ verifyTaskCount("Task_" + TASK_TYPE + "_1624403781879", 0, 0, 0, 0);
// Should create the task queues and generate a task
- assertNotNull(_taskManager.scheduleTasks().get(TASK_TYPE));
+ String task1 = _taskManager.scheduleTasks().get(TASK_TYPE);
+ assertNotNull(task1);
assertTrue(_helixTaskResourceManager.getTaskQueues()
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(TASK_TYPE)));
-
- // Should generate one more task
- assertNotNull(_taskManager.scheduleTask(TASK_TYPE));
-
- // Should not generate more tasks
+ assertTrue(_helixTaskResourceManager.getTasksInProgress(TASK_TYPE).contains(task1));
+
+ // Since we have two tables, two sub-tasks are generated -- one for each table.
+ // The default concurrent sub-tasks per minion instance is 1, and we have one minion
+ // instance spun up. So, one sub-tasks gets scheduled in a minion, and the other one
+ // waits.
+ verifyTaskCount(task1, 0, 1, 1, 2);
+ // Should generate one more task, with two sub-tasks. Both of these sub-tasks will wait
+ // since we have one minion instance that is still running one of the sub-tasks.
+ String task2 = _taskManager.scheduleTask(TASK_TYPE);
+ assertNotNull(task2);
+ assertTrue(_helixTaskResourceManager.getTasksInProgress(TASK_TYPE).contains(task2));
+ verifyTaskCount(task2, 0, 2, 0, 2);
+
+ // Should not generate more tasks since SimpleMinionClusterIntegrationTests.NUM_TASKS is 2.
+ // Our test task generator does not generate if there are already this many sub-tasks in the
+ // running+waiting count already.
assertNull(_taskManager.scheduleTasks().get(TASK_TYPE));
assertNull(_taskManager.scheduleTask(TASK_TYPE));
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org