You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2022/04/08 18:27:52 UTC

[pinot] branch master updated: Add endpoints for some finer control on minion tasks (#8486)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e21aec1c2 Add endpoints for some finer control on minion tasks (#8486)
8e21aec1c2 is described below

commit 8e21aec1c2308b8e6b37f8086a696a72c4e03252
Author: Xiaobing <61...@users.noreply.github.com>
AuthorDate: Fri Apr 8 11:27:46 2022 -0700

    Add endpoints for some finer control on minion tasks (#8486)
    
    - allow to delete individual tasks w/o deleting the task queue
    - inspect the states or configs of the subtasks of a task
---
 .../api/resources/PinotTaskRestletResource.java    | 32 +++++++++
 .../core/minion/PinotHelixTaskResourceManager.java | 76 ++++++++++++++++++++++
 .../tests/SimpleMinionClusterIntegrationTest.java  | 18 +++--
 3 files changed, 119 insertions(+), 7 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
index a36a41fafe..12b16edca6 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
@@ -27,6 +27,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import javax.annotation.Nullable;
 import javax.inject.Inject;
 import javax.ws.rs.DELETE;
 import javax.ws.rs.DefaultValue;
@@ -37,6 +38,7 @@ import javax.ws.rs.PUT;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
 import javax.ws.rs.QueryParam;
+import org.apache.helix.task.TaskPartitionState;
 import org.apache.helix.task.TaskState;
 import org.apache.pinot.controller.api.access.AccessType;
 import org.apache.pinot.controller.api.access.Authenticate;
@@ -205,6 +207,14 @@ public class PinotTaskRestletResource {
     return new StringResultResponse(_pinotHelixTaskResourceManager.getTaskState(taskName).toString());
   }
 
+  @GET
+  @Path("/tasks/subtask/{taskName}/state")
+  @ApiOperation("Get the states of all the sub tasks for the given task")
+  public Map<String, TaskPartitionState> getSubtaskStates(
+      @ApiParam(value = "Task name", required = true) @PathParam("taskName") String taskName) {
+    return _pinotHelixTaskResourceManager.getSubtaskStates(taskName);
+  }
+
   @GET
   @Path("/tasks/task/{taskName}/config")
   @ApiOperation("Get the task config (a list of child task configs) for the given task")
@@ -222,6 +232,16 @@ public class PinotTaskRestletResource {
     return _pinotHelixTaskResourceManager.getTaskConfigs(taskName);
   }
 
+  @GET
+  @Path("/tasks/subtask/{taskName}/config")
+  @ApiOperation("Get the configs of specified sub tasks for the given task")
+  public Map<String, PinotTaskConfig> getSubtaskConfigs(
+      @ApiParam(value = "Task name", required = true) @PathParam("taskName") String taskName,
+      @ApiParam(value = "Sub task names separated by comma") @QueryParam("subtaskNames") @Nullable
+          String subtaskNames) {
+    return _pinotHelixTaskResourceManager.getSubtaskConfigs(taskName, subtaskNames);
+  }
+
   @GET
   @Path("/tasks/scheduler/information")
   @ApiOperation("Fetch cron scheduler information")
@@ -428,6 +448,18 @@ public class PinotTaskRestletResource {
     return new SuccessResponse("Successfully deleted tasks for task type: " + taskType);
   }
 
+  @DELETE
+  @Path("/tasks/task/{taskName}")
+  @Authenticate(AccessType.DELETE)
+  @ApiOperation("Delete a single task given its task name")
+  public SuccessResponse deleteTask(
+      @ApiParam(value = "Task name", required = true) @PathParam("taskName") String taskName,
+      @ApiParam(value = "Whether to force deleting the task (expert only option, enable with cautious")
+      @DefaultValue("false") @QueryParam("forceDelete") boolean forceDelete) {
+    _pinotHelixTaskResourceManager.deleteTask(taskName, forceDelete);
+    return new SuccessResponse("Successfully deleted task: " + taskName);
+  }
+
   @Deprecated
   @DELETE
   @Path("/tasks/taskqueue/{taskType}")
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
index e825dbbca7..081377899d 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
@@ -24,6 +24,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.Uninterruptibles;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -31,6 +32,8 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.commons.lang.StringUtils;
 import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.JobContext;
 import org.apache.helix.task.JobQueue;
@@ -162,6 +165,25 @@ public class PinotHelixTaskResourceManager {
     _taskDriver.delete(helixJobQueueName, forceDelete);
   }
 
+  /**
+   * Delete a single task from the task queue. The task queue should be
+   * stopped before deleting the task, otherwise it fails with exception.
+   *
+   * @param taskName the task to delete from the queue.
+   * @param forceDelete as said in helix comment, if set true, all job's related zk nodes will
+   *                    be clean up from zookeeper even if its workflow information can not be found.
+   */
+  public synchronized void deleteTask(String taskName, boolean forceDelete) {
+    String taskType = getTaskType(taskName);
+    String helixJobQueueName = getHelixJobQueueName(taskType);
+    if (forceDelete) {
+      LOGGER.warn("Force deleting task: {} from queue: {} of task type: {}", taskName, helixJobQueueName, taskType);
+    } else {
+      LOGGER.info("Deleting task: {} from queue: {} of task type: {}", taskName, helixJobQueueName, taskType);
+    }
+    _taskDriver.deleteJob(helixJobQueueName, taskName, forceDelete);
+  }
+
   /**
    * Get all task queues.
    *
@@ -328,6 +350,33 @@ public class PinotHelixTaskResourceManager {
     return _taskDriver.getWorkflowContext(getHelixJobQueueName(taskType)).getJobState(getHelixJobName(taskName));
   }
 
+  /**
+   * Get states of all the sub tasks for a given task.
+   *
+   * @param taskName the task whose sub tasks to check
+   * @return states of all the sub tasks
+   */
+  public synchronized Map<String, TaskPartitionState> getSubtaskStates(String taskName) {
+    String taskType = getTaskType(taskName);
+    WorkflowContext workflowContext = _taskDriver.getWorkflowContext(getHelixJobQueueName(taskType));
+    if (workflowContext == null) {
+      return Collections.emptyMap();
+    }
+    String helixJobName = getHelixJobName(taskName);
+    JobContext jobContext = _taskDriver.getJobContext(helixJobName);
+    if (jobContext == null) {
+      return Collections.emptyMap();
+    }
+    Map<String, TaskPartitionState> subtaskStates = new HashMap<>();
+    Set<Integer> partitionSet = jobContext.getPartitionSet();
+    for (int partition : partitionSet) {
+      String taskIdForPartition = jobContext.getTaskIdForPartition(partition);
+      TaskPartitionState partitionState = jobContext.getPartitionState(partition);
+      subtaskStates.put(taskIdForPartition, partitionState);
+    }
+    return subtaskStates;
+  }
+
   /**
    * Get the child task configs for the given task name.
    *
@@ -344,6 +393,33 @@ public class PinotHelixTaskResourceManager {
     return taskConfigs;
   }
 
+  /**
+   * Get configs of the specified sub task for a given task.
+   *
+   * @param taskName the task whose sub tasks to check
+   * @param subtaskNames the sub tasks to check
+   * @return the configs of the sub tasks
+   */
+  public synchronized Map<String, PinotTaskConfig> getSubtaskConfigs(String taskName, @Nullable String subtaskNames) {
+    JobConfig jobConfig = _taskDriver.getJobConfig(getHelixJobName(taskName));
+    if (jobConfig == null) {
+      return Collections.emptyMap();
+    }
+    Map<String, TaskConfig> helixTaskConfigs = jobConfig.getTaskConfigMap();
+    Map<String, PinotTaskConfig> taskConfigs = new HashMap<>(helixTaskConfigs.size());
+    if (StringUtils.isEmpty(subtaskNames)) {
+      helixTaskConfigs.forEach((sub, cfg) -> taskConfigs.put(sub, PinotTaskConfig.fromHelixTaskConfig(cfg)));
+      return taskConfigs;
+    }
+    for (String subtaskName : StringUtils.split(subtaskNames, ',')) {
+      TaskConfig taskConfig = helixTaskConfigs.get(subtaskName);
+      if (taskConfig != null) {
+        taskConfigs.put(subtaskName, PinotTaskConfig.fromHelixTaskConfig(taskConfig));
+      }
+    }
+    return taskConfigs;
+  }
+
   /**
    * Helper method to return a map of task names to corresponding task state
    * where the task corresponds to the given Pinot table name. This is used to
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
index d7d1f994f9..b2212bd4db 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
@@ -79,8 +79,8 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest {
     // Set task timeout in cluster config
     PinotHelixResourceManager helixResourceManager = _controllerStarter.getHelixResourceManager();
     helixResourceManager.getHelixAdmin().setConfig(
-        new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(
-            helixResourceManager.getHelixClusterName()).build(),
+        new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER)
+            .forCluster(helixResourceManager.getHelixClusterName()).build(),
         Collections.singletonMap(TASK_TYPE + MinionConstants.TIMEOUT_MS_KEY_SUFFIX, Long.toString(600_000L)));
 
     // Add 3 offline tables, where 2 of them have TestTask enabled
@@ -106,9 +106,7 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest {
     // Wait for at most 10 seconds for Helix to generate the tasks
     TestUtils.waitForCondition((aVoid) -> {
       PinotHelixTaskResourceManager.TaskCount taskCount = _helixTaskResourceManager.getTaskCount(task);
-      return taskCount.getError() == errors
-          && taskCount.getWaiting() == waiting
-          && taskCount.getRunning() == running
+      return taskCount.getError() == errors && taskCount.getWaiting() == waiting && taskCount.getRunning() == running
           && taskCount.getTotal() == total;
     }, 10_000L, "Failed to reach expected task count");
   }
@@ -199,6 +197,10 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest {
             && controllerMetrics.getValueOfTableGauge(completedGauge, ControllerGauge.TASK_STATUS) == 0,
         ZK_CALLBACK_TIMEOUT_MS, "Failed to update the controller gauges");
 
+    // Task deletion requires the task queue to be stopped,
+    // so deleting task1 here before resuming the task queue.
+    assertTrue(_helixTaskResourceManager.getTaskStates(TASK_TYPE).containsKey(task1));
+    _helixTaskResourceManager.deleteTask(task1, false);
     // Resume the task queue, and let the task complete
     _helixTaskResourceManager.resumeTaskQueue(TASK_TYPE);
     HOLD.set(false);
@@ -206,12 +208,14 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest {
     // Wait at most 60 seconds for all tasks COMPLETED
     TestUtils.waitForCondition(input -> {
       Collection<TaskState> taskStates = _helixTaskResourceManager.getTaskStates(TASK_TYPE).values();
-      assertEquals(taskStates.size(), NUM_TASKS);
       for (TaskState taskState : taskStates) {
         if (taskState != TaskState.COMPLETED) {
           return false;
         }
       }
+      // Task deletion happens eventually along with other state transitions.
+      assertFalse(_helixTaskResourceManager.getTaskStates(TASK_TYPE).containsKey(task1));
+      assertEquals(taskStates.size(), (NUM_TASKS - 1));
       assertTrue(TASK_START_NOTIFIED.get());
       assertTrue(TASK_SUCCESS_NOTIFIED.get());
       assertTrue(TASK_CANCELLED_NOTIFIED.get());
@@ -223,7 +227,7 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest {
     TestUtils.waitForCondition(
         input -> controllerMetrics.getValueOfTableGauge(inProgressGauge, ControllerGauge.TASK_STATUS) == 0
             && controllerMetrics.getValueOfTableGauge(stoppedGauge, ControllerGauge.TASK_STATUS) == 0
-            && controllerMetrics.getValueOfTableGauge(completedGauge, ControllerGauge.TASK_STATUS) == NUM_TASKS,
+            && controllerMetrics.getValueOfTableGauge(completedGauge, ControllerGauge.TASK_STATUS) == (NUM_TASKS - 1),
         ZK_CALLBACK_TIMEOUT_MS, "Failed to update the controller gauges");
 
     // Delete the task queue


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org