You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by mc...@apache.org on 2021/06/28 21:01:29 UTC

[incubator-pinot] branch master updated: Added TaskMetricsEmitted periodic controller job (#7091)

This is an automated email from the ASF dual-hosted git repository.

mcvsubbu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new c950926  Added TaskMetricsEmitted periodic controller job (#7091)
c950926 is described below

commit c9509261672d0f9c2b35fa834eb8fe998ed23161
Author: Subbu Subramaniam <mc...@users.noreply.github.com>
AuthorDate: Mon Jun 28 14:01:14 2021 -0700

    Added TaskMetricsEmitted periodic controller job (#7091)
    
    * Added TaskMetricsEmitted periodic controller job
    
    This job runs every 5 mins by default and emits metrics about Minion tasks
    scheduled in Pinot. For now, the following metrics are emitted for each
    task type:
    
    - Number of running tasks
    - Number of running sub-tasks
    - Number of waiting sub-taks (unassigned to a minion as yet)
    - Number of error sub-tasks (completed with an error/exception)
    - Percent of sub-tasks in error
    - Percent of sub-tasks in waiting or running states.
    
    * Addressed review comments and added unit tests
    
    * Addressed review comments and resolved conflicts
    
    * Cleaned up with spotless:apply
    
    * Handle another null case
    
    * spotless apply
    
    * Fix compile error after merge
    
    * Added comments on the minion integration tests
---
 .../pinot/common/metrics/ControllerGauge.java      |   6 ++
 .../apache/pinot/controller/ControllerConf.java    |  12 +++
 .../core/minion/PinotHelixTaskResourceManager.java | 118 ++++++++++++++++++++-
 .../helix/core/minion/PinotTaskManager.java        |   3 +-
 .../helix/core/minion/TaskMetricsEmitter.java      |  90 ++++++++++++++++
 .../tests/SimpleMinionClusterIntegrationTest.java  |  36 +++++--
 6 files changed, 253 insertions(+), 12 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
index 1ebeac1..2b91ba0 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
@@ -45,6 +45,12 @@ public enum ControllerGauge implements AbstractMetrics.Gauge {
   OFFLINE_TABLE_COUNT("TableCount", true),
   DISABLED_TABLE_COUNT("TableCount", true),
   PERIODIC_TASK_NUM_TABLES_PROCESSED("PeriodicTaskNumTablesProcessed", true),
+  NUM_MINION_TASKS_IN_PROGRESS("NumMinionTasksInProgress", true),
+  NUM_MINION_SUBTASKS_WAITING("NumMinionSubtasksWaiting", true),
+  NUM_MINION_SUBTASKS_RUNNING("NumMinionSubtasksRunning", true),
+  NUM_MINION_SUBTASKS_ERROR("NumMinionSubtasksError", true),
+  PERCENT_MINION_SUBTASKS_IN_QUEUE("PercentMinionSubtasksInQueue", true),
+  PERCENT_MINION_SUBTASKS_IN_ERROR("PercentMinionSubtasksInError", true),
 
   // Pinot controller leader
   PINOT_CONTROLLER_LEADER("PinotControllerLeader", true),
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index 80bd173..7db3a27 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -98,6 +98,7 @@ public class ControllerConf extends PinotConfiguration {
         "controller.minion.instances.cleanup.task.frequencyInSeconds";
     public static final String MINION_INSTANCES_CLEANUP_TASK_INITIAL_DELAY_SECONDS =
         "controller.minion.instances.cleanup.task.initialDelaySeconds";
+    public static final String TASK_METRICS_EMITTER_FREQUENCY_IN_SECONDS = "controller.minion.task.metrics.emitter.frequencyInSeconds";
 
     public static final String PINOT_TASK_MANAGER_SCHEDULER_ENABLED = "controller.task.scheduler.enabled";
     @Deprecated
@@ -140,6 +141,7 @@ public class ControllerConf extends PinotConfiguration {
     private static final int DEFAULT_REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS = 60 * 60; // 1 Hour.
     private static final int DEFAULT_BROKER_RESOURCE_VALIDATION_FREQUENCY_IN_SECONDS = 60 * 60; // 1 Hour.
     private static final int DEFAULT_STATUS_CONTROLLER_FREQUENCY_IN_SECONDS = 5 * 60; // 5 minutes
+    private static final int DEFAULT_TASK_METRICS_EMITTER_FREQUENCY_IN_SECONDS = 5 * 60; // 5 minutes
     private static final int DEFAULT_STATUS_CONTROLLER_WAIT_FOR_PUSH_TIME_IN_SECONDS = 10 * 60; // 10 minutes
     private static final int DEFAULT_TASK_MANAGER_FREQUENCY_IN_SECONDS = -1; // Disabled
     private static final int DEFAULT_MINION_INSTANCES_CLEANUP_TASK_FREQUENCY_IN_SECONDS = 60 * 60; // 1 Hour.
@@ -490,6 +492,16 @@ public class ControllerConf extends PinotConfiguration {
         Integer.toString(statusCheckerFrequencyInSeconds));
   }
 
+  public int getTaskMetricsEmitterFrequencyInSeconds() {
+    return getProperty(ControllerPeriodicTasksConf.TASK_METRICS_EMITTER_FREQUENCY_IN_SECONDS,
+        ControllerPeriodicTasksConf.DEFAULT_TASK_METRICS_EMITTER_FREQUENCY_IN_SECONDS);
+  }
+
+  public void setTaskMetricsEmitterFrequencyInSeconds(int taskMetricsEmitterFrequencyInSeconds) {
+    setProperty(ControllerPeriodicTasksConf.TASK_METRICS_EMITTER_FREQUENCY_IN_SECONDS,
+        Integer.toString(taskMetricsEmitterFrequencyInSeconds));
+  }
+
   public int getStatusCheckerWaitForPushTimeInSeconds() {
     return getProperty(ControllerPeriodicTasksConf.STATUS_CHECKER_WAIT_FOR_PUSH_TIME_IN_SECONDS,
         ControllerPeriodicTasksConf.DEFAULT_STATUS_CONTROLLER_WAIT_FOR_PUSH_TIME_IN_SECONDS);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
index ee08384..265d9f3 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
@@ -29,11 +29,14 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobContext;
 import org.apache.helix.task.JobQueue;
 import org.apache.helix.task.TaskConfig;
 import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.TaskPartitionState;
 import org.apache.helix.task.TaskState;
 import org.apache.helix.task.WorkflowConfig;
+import org.apache.helix.task.WorkflowContext;
 import org.apache.pinot.core.minion.PinotTaskConfig;
 import org.apache.pinot.spi.utils.CommonConstants.Helix;
 import org.slf4j.Logger;
@@ -42,6 +45,7 @@ import org.slf4j.LoggerFactory;
 
 /**
  * The class <code>PinotHelixTaskResourceManager</code> manages all the task resources in Pinot cluster.
+ * In case you are wondering why methods that access taskDriver are synchronized, see comment in PR #1437
  */
 public class PinotHelixTaskResourceManager {
   private static final Logger LOGGER = LoggerFactory.getLogger(PinotHelixTaskResourceManager.class);
@@ -60,7 +64,8 @@ public class PinotHelixTaskResourceManager {
 
   /**
    * Get all task types.
-   *
+   * @note: It reads all resource config back and check which are workflows and which are jobs, so it can take some time
+   * if there are a lot of tasks.
    * @return Set of all task types
    */
   public synchronized Set<String> getTaskTypes() {
@@ -226,7 +231,6 @@ public class PinotHelixTaskResourceManager {
 
     return parentTaskName;
   }
-
   /**
    * Get all tasks for the given task type.
    *
@@ -249,8 +253,13 @@ public class PinotHelixTaskResourceManager {
    * @return Map from task name to task state
    */
   public synchronized Map<String, TaskState> getTaskStates(String taskType) {
-    Map<String, TaskState> helixJobStates =
-        _taskDriver.getWorkflowContext(getHelixJobQueueName(taskType)).getJobStates();
+    Map<String, TaskState> helixJobStates = new HashMap<>();
+    WorkflowContext workflowContext = _taskDriver.getWorkflowContext(getHelixJobQueueName(taskType));
+
+    if (workflowContext == null) {
+      return helixJobStates;
+    }
+    helixJobStates = workflowContext.getJobStates();
     Map<String, TaskState> taskStates = new HashMap<>(helixJobStates.size());
     for (Map.Entry<String, TaskState> entry : helixJobStates.entrySet()) {
       taskStates.put(getPinotTaskName(entry.getKey()), entry.getValue());
@@ -259,6 +268,58 @@ public class PinotHelixTaskResourceManager {
   }
 
   /**
+   * This method returns a count of sub-tasks in various states, given the top-level task name.
+   * @param parentTaskName (e.g. "Task_TestTask_1624403781879")
+   * @return TaskCount object
+   */
+  public synchronized TaskCount getTaskCount(String parentTaskName) {
+    TaskCount taskCount = new TaskCount();
+    JobContext jobContext = _taskDriver.getJobContext(getHelixJobName(parentTaskName));
+
+    if (jobContext == null) {
+      return taskCount;
+    }
+    Set<Integer> partitionSet = jobContext.getPartitionSet();
+    taskCount.addToTotal(partitionSet.size());
+    for (int partition : partitionSet) {
+      TaskPartitionState state = jobContext.getPartitionState(partition);
+      // Helix returns state as null if the task is not enqueued anywhere yet
+      if (state == null) {
+        // task is not yet assigned to a participant
+        taskCount.addToWaiting(1);
+      } else if (state.equals(TaskPartitionState.INIT) || state.equals(TaskPartitionState.RUNNING)) {
+        taskCount.addToRunning(1);
+      } else if (state.equals(TaskPartitionState.TASK_ERROR)) {
+        taskCount.addToError(1);
+      }
+    }
+    return taskCount;
+  }
+
+  /**
+   * Returns a set of Task names (in the form "Task_TestTask_1624403781879") that are in progress or not started yet.
+   *
+   * @param taskType
+   * @return Set of task names
+   */
+  public synchronized Set<String> getTasksInProgress(String taskType) {
+    Set<String> tasksInProgress = new HashSet<>();
+    WorkflowContext workflowContext = _taskDriver.getWorkflowContext(getHelixJobQueueName(taskType));
+    if (workflowContext == null) {
+      return tasksInProgress;
+    }
+
+    Map<String, TaskState> helixJobStates = workflowContext.getJobStates();
+
+    for (Map.Entry<String, TaskState> entry : helixJobStates.entrySet()) {
+      if (entry.getValue().equals(TaskState.NOT_STARTED) || entry.getValue().equals(TaskState.IN_PROGRESS)) {
+        tasksInProgress.add(getPinotTaskName(entry.getKey()));
+      }
+    }
+    return tasksInProgress;
+  }
+
+  /**
    * Get the task state for the given task name.
    *
    * @param taskName Task name
@@ -330,4 +391,53 @@ public class PinotHelixTaskResourceManager {
   private static String getTaskType(String name) {
     return name.split(TASK_NAME_SEPARATOR)[1];
   }
+
+  public static class TaskCount {
+    private int _waiting;   // Number of tasks waiting to be scheduled on minions
+    private int _error;     // Number of tasks in error
+    private int _running;   // Number of tasks currently running in minions
+    private int _total;     // Total number of tasks in the batch
+
+    public TaskCount() {
+    }
+
+    public void addToWaiting(int waiting) {
+      _waiting += waiting;
+    }
+
+    public void addToRunning(int running) {
+      _running += running;
+    }
+
+    public void addToTotal(int total) {
+      _total += total;
+    }
+
+    public void addToError(int error) {
+      _error += error;
+    }
+
+    public int getWaiting() {
+      return _waiting;
+    }
+
+    public int getRunning() {
+      return _running;
+    }
+
+    public int getTotal() {
+      return _total;
+    }
+
+    public int getError() {
+      return _error;
+    }
+
+    public void accumulate(TaskCount other) {
+      addToWaiting(other.getWaiting());
+      addToRunning(other.getRunning());
+      addToError(other.getError());
+      addToTotal(other.getTotal());
+    }
+  }
 }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
index f7a673a..6ae1449 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
@@ -401,8 +401,7 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> {
   @Nullable
   private String scheduleTask(PinotTaskGenerator taskGenerator, List<TableConfig> enabledTableConfigs,
       boolean isLeader) {
-    LOGGER.info("Trying to schedule task type: {}, with table config: {}, isLeader: {}", taskGenerator.getTaskType(),
-        enabledTableConfigs, isLeader);
+    LOGGER.info("Trying to schedule task type: {}, isLeader: {}", taskGenerator.getTaskType(), isLeader);
     List<PinotTaskConfig> pinotTaskConfigs = taskGenerator.generateTasks(enabledTableConfigs);
     if (!isLeader) {
       taskGenerator.nonLeaderCleanUp();
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TaskMetricsEmitter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TaskMetricsEmitter.java
new file mode 100644
index 0000000..5874d9e
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TaskMetricsEmitter.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.minion;
+
+import java.util.Set;
+import org.apache.pinot.common.metrics.ControllerGauge;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
+import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager.TaskCount;
+import org.apache.pinot.core.periodictask.BasePeriodicTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This class emits task metrics for each type of minion task that is set up in
+ * a Pinot cluster. It is intended to be scheduled with a fairly high frequency,
+ * of the order of minutes.
+ * See ControllerConf class for the default value.
+ */
+public class TaskMetricsEmitter extends BasePeriodicTask {
+  private static final Logger LOGGER = LoggerFactory.getLogger(TaskMetricsEmitter.class);
+  private final static String TASK_NAME = "TaskMetricsEmitter";
+
+  private final PinotHelixTaskResourceManager _helixTaskResourceManager;
+  private final ControllerMetrics _controllerMetrics;
+  private final LeadControllerManager _leadControllerManager;
+
+  public TaskMetricsEmitter(PinotHelixTaskResourceManager helixTaskResourceManager,
+      LeadControllerManager leadControllerManager, ControllerConf controllerConf, ControllerMetrics controllerMetrics) {
+    super(TASK_NAME, controllerConf.getTaskMetricsEmitterFrequencyInSeconds(),
+        controllerConf.getPeriodicTaskInitialDelayInSeconds());
+    _helixTaskResourceManager = helixTaskResourceManager;
+    _controllerMetrics = controllerMetrics;
+    _leadControllerManager = leadControllerManager;
+  }
+
+  @Override
+  protected final void runTask() {
+    // Make it so that only one controller returns the metric for all the tasks.
+    if (!_leadControllerManager.isLeaderForTable(TASK_NAME)) {
+      return;
+    }
+
+    // The call to get task types can take time if there are a lot of tasks.
+    // Potential optimization is to call it every (say) 30m if we detect a barrage of
+    // zk requests.
+    Set<String> taskTypes = _helixTaskResourceManager.getTaskTypes();
+    for (String taskType : taskTypes) {
+      TaskCount accumulated = new TaskCount();
+      try {
+        Set<String> tasksInProgress = _helixTaskResourceManager.getTasksInProgress(taskType);
+        final int numRunningTasks = tasksInProgress.size();
+        for (String task : tasksInProgress) {
+          TaskCount taskCount = _helixTaskResourceManager.getTaskCount(task);
+          accumulated.accumulate(taskCount);
+        }
+        // Emit metrics for taskType.
+        _controllerMetrics.setValueOfGlobalGauge(ControllerGauge.NUM_MINION_TASKS_IN_PROGRESS, taskType, numRunningTasks);
+        _controllerMetrics.setValueOfGlobalGauge(ControllerGauge.NUM_MINION_SUBTASKS_RUNNING, taskType, accumulated.getRunning());
+        _controllerMetrics.setValueOfGlobalGauge(ControllerGauge.NUM_MINION_SUBTASKS_WAITING, taskType, accumulated.getWaiting());
+        _controllerMetrics.setValueOfGlobalGauge(ControllerGauge.NUM_MINION_SUBTASKS_ERROR, taskType, accumulated.getError());
+        int total = accumulated.getTotal();
+        int percent = total != 0 ? (accumulated.getWaiting() + accumulated.getRunning()) * 100 / total : 0;
+        _controllerMetrics.setValueOfGlobalGauge(ControllerGauge.PERCENT_MINION_SUBTASKS_IN_QUEUE, taskType, percent);
+        percent = total != 0 ? accumulated.getError() * 100 / total : 0;
+        _controllerMetrics.setValueOfGlobalGauge(ControllerGauge.PERCENT_MINION_SUBTASKS_IN_ERROR, taskType, percent);
+      } catch (Exception e) {
+        LOGGER.error("Caught exception while getting metrics for task type {}", taskType, e);
+      }
+    }
+  }
+}
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
index 17e984c..930a41e 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
@@ -85,20 +85,44 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest {
     startMinion();
   }
 
+  private void verifyTaskCount(String task, int errors, int waiting, int running, int total) {
+    PinotHelixTaskResourceManager.TaskCount taskCount = _helixTaskResourceManager.getTaskCount(task);
+    assertEquals(taskCount.getError(), errors);
+    assertEquals(taskCount.getWaiting(), waiting);
+    assertEquals(taskCount.getRunning(), running);
+    assertEquals(taskCount.getTotal(), total);
+  }
+
   @Test
   public void testStopResumeDeleteTaskQueue() {
     // Hold the task
     HOLD.set(true);
+    // No tasks before we start.
+    assertEquals(_helixTaskResourceManager.getTasksInProgress(TASK_TYPE).size(),0);
+    verifyTaskCount("Task_" + TASK_TYPE + "_1624403781879", 0, 0, 0, 0);
 
     // Should create the task queues and generate a task
-    assertNotNull(_taskManager.scheduleTasks().get(TASK_TYPE));
+    String task1 = _taskManager.scheduleTasks().get(TASK_TYPE);
+    assertNotNull(task1);
     assertTrue(_helixTaskResourceManager.getTaskQueues()
         .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(TASK_TYPE)));
-
-    // Should generate one more task
-    assertNotNull(_taskManager.scheduleTask(TASK_TYPE));
-
-    // Should not generate more tasks
+    assertTrue(_helixTaskResourceManager.getTasksInProgress(TASK_TYPE).contains(task1));
+
+    // Since we have two tables, two sub-tasks are generated -- one for each table.
+    // The default concurrent sub-tasks per minion instance is 1, and we have one minion
+    // instance spun up. So, one sub-tasks gets scheduled in a minion, and the other one
+    // waits.
+    verifyTaskCount(task1, 0, 1, 1, 2);
+    // Should generate one more task, with two sub-tasks. Both of these sub-tasks will wait
+    // since we have one minion instance that is still running one of the sub-tasks.
+    String task2 = _taskManager.scheduleTask(TASK_TYPE);
+    assertNotNull(task2);
+    assertTrue(_helixTaskResourceManager.getTasksInProgress(TASK_TYPE).contains(task2));
+    verifyTaskCount(task2, 0, 2, 0, 2);
+
+    // Should not generate more tasks since SimpleMinionClusterIntegrationTests.NUM_TASKS is 2.
+    // Our test task generator does not generate if there are already this many sub-tasks in the
+    // running+waiting count already.
     assertNull(_taskManager.scheduleTasks().get(TASK_TYPE));
     assertNull(_taskManager.scheduleTask(TASK_TYPE));
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org