You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2022/04/08 18:27:52 UTC
[pinot] branch master updated: Add endpoints for some finer control on minion tasks (#8486)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 8e21aec1c2 Add endpoints for some finer control on minion tasks (#8486)
8e21aec1c2 is described below
commit 8e21aec1c2308b8e6b37f8086a696a72c4e03252
Author: Xiaobing <61...@users.noreply.github.com>
AuthorDate: Fri Apr 8 11:27:46 2022 -0700
Add endpoints for some finer control on minion tasks (#8486)
- allow to delete individual tasks w/o deleting the task queue
- inspect the states or configs of the subtasks of a task
---
.../api/resources/PinotTaskRestletResource.java | 32 +++++++++
.../core/minion/PinotHelixTaskResourceManager.java | 76 ++++++++++++++++++++++
.../tests/SimpleMinionClusterIntegrationTest.java | 18 +++--
3 files changed, 119 insertions(+), 7 deletions(-)
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
index a36a41fafe..12b16edca6 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
@@ -27,6 +27,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import javax.annotation.Nullable;
import javax.inject.Inject;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
@@ -37,6 +38,7 @@ import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.QueryParam;
+import org.apache.helix.task.TaskPartitionState;
import org.apache.helix.task.TaskState;
import org.apache.pinot.controller.api.access.AccessType;
import org.apache.pinot.controller.api.access.Authenticate;
@@ -205,6 +207,14 @@ public class PinotTaskRestletResource {
return new StringResultResponse(_pinotHelixTaskResourceManager.getTaskState(taskName).toString());
}
+ @GET
+ @Path("/tasks/subtask/{taskName}/state")
+ @ApiOperation("Get the states of all the sub tasks for the given task")
+ public Map<String, TaskPartitionState> getSubtaskStates(
+ @ApiParam(value = "Task name", required = true) @PathParam("taskName") String taskName) {
+ return _pinotHelixTaskResourceManager.getSubtaskStates(taskName);
+ }
+
@GET
@Path("/tasks/task/{taskName}/config")
@ApiOperation("Get the task config (a list of child task configs) for the given task")
@@ -222,6 +232,16 @@ public class PinotTaskRestletResource {
return _pinotHelixTaskResourceManager.getTaskConfigs(taskName);
}
+ @GET
+ @Path("/tasks/subtask/{taskName}/config")
+ @ApiOperation("Get the configs of specified sub tasks for the given task")
+ public Map<String, PinotTaskConfig> getSubtaskConfigs(
+ @ApiParam(value = "Task name", required = true) @PathParam("taskName") String taskName,
+ @ApiParam(value = "Sub task names separated by comma") @QueryParam("subtaskNames") @Nullable
+ String subtaskNames) {
+ return _pinotHelixTaskResourceManager.getSubtaskConfigs(taskName, subtaskNames);
+ }
+
@GET
@Path("/tasks/scheduler/information")
@ApiOperation("Fetch cron scheduler information")
@@ -428,6 +448,18 @@ public class PinotTaskRestletResource {
return new SuccessResponse("Successfully deleted tasks for task type: " + taskType);
}
+ @DELETE
+ @Path("/tasks/task/{taskName}")
+ @Authenticate(AccessType.DELETE)
+ @ApiOperation("Delete a single task given its task name")
+ public SuccessResponse deleteTask(
+ @ApiParam(value = "Task name", required = true) @PathParam("taskName") String taskName,
+ @ApiParam(value = "Whether to force deleting the task (expert only option, enable with cautious")
+ @DefaultValue("false") @QueryParam("forceDelete") boolean forceDelete) {
+ _pinotHelixTaskResourceManager.deleteTask(taskName, forceDelete);
+ return new SuccessResponse("Successfully deleted task: " + taskName);
+ }
+
@Deprecated
@DELETE
@Path("/tasks/taskqueue/{taskType}")
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
index e825dbbca7..081377899d 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
@@ -24,6 +24,7 @@ import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Uninterruptibles;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -31,6 +32,8 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.commons.lang.StringUtils;
import org.apache.helix.task.JobConfig;
import org.apache.helix.task.JobContext;
import org.apache.helix.task.JobQueue;
@@ -162,6 +165,25 @@ public class PinotHelixTaskResourceManager {
_taskDriver.delete(helixJobQueueName, forceDelete);
}
+ /**
+ * Delete a single task from the task queue. The task queue should be
+ * stopped before deleting the task, otherwise it fails with exception.
+ *
+ * @param taskName the task to delete from the queue.
+ * @param forceDelete as said in helix comment, if set true, all job's related zk nodes will
+ * be clean up from zookeeper even if its workflow information can not be found.
+ */
+ public synchronized void deleteTask(String taskName, boolean forceDelete) {
+ String taskType = getTaskType(taskName);
+ String helixJobQueueName = getHelixJobQueueName(taskType);
+ if (forceDelete) {
+ LOGGER.warn("Force deleting task: {} from queue: {} of task type: {}", taskName, helixJobQueueName, taskType);
+ } else {
+ LOGGER.info("Deleting task: {} from queue: {} of task type: {}", taskName, helixJobQueueName, taskType);
+ }
+ _taskDriver.deleteJob(helixJobQueueName, taskName, forceDelete);
+ }
+
/**
* Get all task queues.
*
@@ -328,6 +350,33 @@ public class PinotHelixTaskResourceManager {
return _taskDriver.getWorkflowContext(getHelixJobQueueName(taskType)).getJobState(getHelixJobName(taskName));
}
+ /**
+ * Get states of all the sub tasks for a given task.
+ *
+ * @param taskName the task whose sub tasks to check
+ * @return states of all the sub tasks
+ */
+ public synchronized Map<String, TaskPartitionState> getSubtaskStates(String taskName) {
+ String taskType = getTaskType(taskName);
+ WorkflowContext workflowContext = _taskDriver.getWorkflowContext(getHelixJobQueueName(taskType));
+ if (workflowContext == null) {
+ return Collections.emptyMap();
+ }
+ String helixJobName = getHelixJobName(taskName);
+ JobContext jobContext = _taskDriver.getJobContext(helixJobName);
+ if (jobContext == null) {
+ return Collections.emptyMap();
+ }
+ Map<String, TaskPartitionState> subtaskStates = new HashMap<>();
+ Set<Integer> partitionSet = jobContext.getPartitionSet();
+ for (int partition : partitionSet) {
+ String taskIdForPartition = jobContext.getTaskIdForPartition(partition);
+ TaskPartitionState partitionState = jobContext.getPartitionState(partition);
+ subtaskStates.put(taskIdForPartition, partitionState);
+ }
+ return subtaskStates;
+ }
+
/**
* Get the child task configs for the given task name.
*
@@ -344,6 +393,33 @@ public class PinotHelixTaskResourceManager {
return taskConfigs;
}
+ /**
+ * Get configs of the specified sub task for a given task.
+ *
+ * @param taskName the task whose sub tasks to check
+ * @param subtaskNames the sub tasks to check
+ * @return the configs of the sub tasks
+ */
+ public synchronized Map<String, PinotTaskConfig> getSubtaskConfigs(String taskName, @Nullable String subtaskNames) {
+ JobConfig jobConfig = _taskDriver.getJobConfig(getHelixJobName(taskName));
+ if (jobConfig == null) {
+ return Collections.emptyMap();
+ }
+ Map<String, TaskConfig> helixTaskConfigs = jobConfig.getTaskConfigMap();
+ Map<String, PinotTaskConfig> taskConfigs = new HashMap<>(helixTaskConfigs.size());
+ if (StringUtils.isEmpty(subtaskNames)) {
+ helixTaskConfigs.forEach((sub, cfg) -> taskConfigs.put(sub, PinotTaskConfig.fromHelixTaskConfig(cfg)));
+ return taskConfigs;
+ }
+ for (String subtaskName : StringUtils.split(subtaskNames, ',')) {
+ TaskConfig taskConfig = helixTaskConfigs.get(subtaskName);
+ if (taskConfig != null) {
+ taskConfigs.put(subtaskName, PinotTaskConfig.fromHelixTaskConfig(taskConfig));
+ }
+ }
+ return taskConfigs;
+ }
+
/**
* Helper method to return a map of task names to corresponding task state
* where the task corresponds to the given Pinot table name. This is used to
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
index d7d1f994f9..b2212bd4db 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
@@ -79,8 +79,8 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest {
// Set task timeout in cluster config
PinotHelixResourceManager helixResourceManager = _controllerStarter.getHelixResourceManager();
helixResourceManager.getHelixAdmin().setConfig(
- new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(
- helixResourceManager.getHelixClusterName()).build(),
+ new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER)
+ .forCluster(helixResourceManager.getHelixClusterName()).build(),
Collections.singletonMap(TASK_TYPE + MinionConstants.TIMEOUT_MS_KEY_SUFFIX, Long.toString(600_000L)));
// Add 3 offline tables, where 2 of them have TestTask enabled
@@ -106,9 +106,7 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest {
// Wait for at most 10 seconds for Helix to generate the tasks
TestUtils.waitForCondition((aVoid) -> {
PinotHelixTaskResourceManager.TaskCount taskCount = _helixTaskResourceManager.getTaskCount(task);
- return taskCount.getError() == errors
- && taskCount.getWaiting() == waiting
- && taskCount.getRunning() == running
+ return taskCount.getError() == errors && taskCount.getWaiting() == waiting && taskCount.getRunning() == running
&& taskCount.getTotal() == total;
}, 10_000L, "Failed to reach expected task count");
}
@@ -199,6 +197,10 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest {
&& controllerMetrics.getValueOfTableGauge(completedGauge, ControllerGauge.TASK_STATUS) == 0,
ZK_CALLBACK_TIMEOUT_MS, "Failed to update the controller gauges");
+ // Task deletion requires the task queue to be stopped,
+ // so deleting task1 here before resuming the task queue.
+ assertTrue(_helixTaskResourceManager.getTaskStates(TASK_TYPE).containsKey(task1));
+ _helixTaskResourceManager.deleteTask(task1, false);
// Resume the task queue, and let the task complete
_helixTaskResourceManager.resumeTaskQueue(TASK_TYPE);
HOLD.set(false);
@@ -206,12 +208,14 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest {
// Wait at most 60 seconds for all tasks COMPLETED
TestUtils.waitForCondition(input -> {
Collection<TaskState> taskStates = _helixTaskResourceManager.getTaskStates(TASK_TYPE).values();
- assertEquals(taskStates.size(), NUM_TASKS);
for (TaskState taskState : taskStates) {
if (taskState != TaskState.COMPLETED) {
return false;
}
}
+ // Task deletion happens eventually along with other state transitions.
+ assertFalse(_helixTaskResourceManager.getTaskStates(TASK_TYPE).containsKey(task1));
+ assertEquals(taskStates.size(), (NUM_TASKS - 1));
assertTrue(TASK_START_NOTIFIED.get());
assertTrue(TASK_SUCCESS_NOTIFIED.get());
assertTrue(TASK_CANCELLED_NOTIFIED.get());
@@ -223,7 +227,7 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest {
TestUtils.waitForCondition(
input -> controllerMetrics.getValueOfTableGauge(inProgressGauge, ControllerGauge.TASK_STATUS) == 0
&& controllerMetrics.getValueOfTableGauge(stoppedGauge, ControllerGauge.TASK_STATUS) == 0
- && controllerMetrics.getValueOfTableGauge(completedGauge, ControllerGauge.TASK_STATUS) == NUM_TASKS,
+ && controllerMetrics.getValueOfTableGauge(completedGauge, ControllerGauge.TASK_STATUS) == (NUM_TASKS - 1),
ZK_CALLBACK_TIMEOUT_MS, "Failed to update the controller gauges");
// Delete the task queue
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org