You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2022/10/06 21:06:21 UTC
[pinot] branch master updated: get task runtime configs tracked in Helix (#9540)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 1c202c7ea0 get task runtime configs tracked in Helix (#9540)
1c202c7ea0 is described below
commit 1c202c7ea0a184de4c216943b1e36ba313431d22
Author: Xiaobing <61...@users.noreply.github.com>
AuthorDate: Thu Oct 6 14:06:15 2022 -0700
get task runtime configs tracked in Helix (#9540)
---
.../api/resources/PinotTaskRestletResource.java | 20 +++++++++++++------
.../helix/core/minion/ClusterInfoAccessor.java | 2 +-
.../core/minion/PinotHelixTaskResourceManager.java | 23 +++++++++++++++++++---
.../MergeRollupMinionClusterIntegrationTest.java | 6 +++---
.../pinot/minion/event/MinionEventObservers.java | 9 ++++++++-
5 files changed, 46 insertions(+), 14 deletions(-)
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
index e0535be493..c2605c7614 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
@@ -292,10 +292,9 @@ public class PinotTaskRestletResource {
// Relying on original schema that was used to query the controller
URI uri = _uriInfo.getRequestUri();
String scheme = uri.getScheme();
- List<String> controllerUrls = controllers.stream().map(controller -> {
- return String.format("%s://%s:%d/tasks/generator/%s/%s/debug?localOnly=true", scheme, controller.getHostName(),
- Integer.parseInt(controller.getPort()), tableNameWithType, taskType);
- }).collect(Collectors.toList());
+ List<String> controllerUrls = controllers.stream().map(controller -> String
+ .format("%s://%s:%d/tasks/generator/%s/%s/debug?localOnly=true", scheme, controller.getHostName(),
+ Integer.parseInt(controller.getPort()), tableNameWithType, taskType)).collect(Collectors.toList());
CompletionServiceHelper completionServiceHelper =
new CompletionServiceHelper(_executor, _connectionManager, HashBiMap.create(0));
@@ -303,6 +302,7 @@ public class PinotTaskRestletResource {
httpHeaders.getRequestHeaders().keySet().forEach(header -> {
requestHeaders.put(header, httpHeaders.getHeaderString(header));
});
+ LOGGER.debug("Getting task generation info with controllerUrls: {}", controllerUrls);
CompletionServiceHelper.CompletionServiceResponse serviceResponse =
completionServiceHelper.doMultiGetRequest(controllerUrls, null, true, requestHeaders, 10000);
@@ -386,7 +386,15 @@ public class PinotTaskRestletResource {
@ApiOperation("Get the task config (a list of child task configs) for the given task")
public List<PinotTaskConfig> getTaskConfigs(
@ApiParam(value = "Task name", required = true) @PathParam("taskName") String taskName) {
- return _pinotHelixTaskResourceManager.getTaskConfigs(taskName);
+ return _pinotHelixTaskResourceManager.getSubtaskConfigs(taskName);
+ }
+
+ @GET
+ @Path("/tasks/task/{taskName}/runtime/config")
+ @ApiOperation("Get the task runtime config for the given task")
+ public Map<String, String> getTaskConfig(
+ @ApiParam(value = "Task name", required = true) @PathParam("taskName") String taskName) {
+ return _pinotHelixTaskResourceManager.getTaskRuntimeConfig(taskName);
}
@Deprecated
@@ -395,7 +403,7 @@ public class PinotTaskRestletResource {
@ApiOperation("Get the task config (a list of child task configs) for the given task (deprecated)")
public List<PinotTaskConfig> getTaskConfigsDeprecated(
@ApiParam(value = "Task name", required = true) @PathParam("taskName") String taskName) {
- return _pinotHelixTaskResourceManager.getTaskConfigs(taskName);
+ return _pinotHelixTaskResourceManager.getSubtaskConfigs(taskName);
}
@GET
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java
index 14f5e0ee2c..bcb1491988 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java
@@ -148,7 +148,7 @@ public class ClusterInfoAccessor {
* @return List of child task configs
*/
public List<PinotTaskConfig> getTaskConfigs(String taskName) {
- return _pinotHelixTaskResourceManager.getTaskConfigs(taskName);
+ return _pinotHelixTaskResourceManager.getSubtaskConfigs(taskName);
}
/**
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
index 04cb52481f..48e3717a72 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
@@ -434,7 +434,7 @@ public class PinotHelixTaskResourceManager {
* @param taskName Task name
* @return List of child task configs
*/
- public synchronized List<PinotTaskConfig> getTaskConfigs(String taskName) {
+ public synchronized List<PinotTaskConfig> getSubtaskConfigs(String taskName) {
Collection<TaskConfig> helixTaskConfigs =
_taskDriver.getJobConfig(getHelixJobName(taskName)).getTaskConfigMap().values();
List<PinotTaskConfig> taskConfigs = new ArrayList<>(helixTaskConfigs.size());
@@ -444,6 +444,23 @@ public class PinotHelixTaskResourceManager {
return taskConfigs;
}
+ /**
+ * Get the task runtime config for the given task name. A task can have multiple subtasks, whose configs can be
+ * retrieved via the getSubtaskConfigs() method instead.
+ *
+ * @param taskName Task name
+ * @return Configs for the task returned as a Map.
+ */
+ public synchronized Map<String, String> getTaskRuntimeConfig(String taskName) {
+ JobConfig jobConfig = _taskDriver.getJobConfig(getHelixJobName(taskName));
+ HashMap<String, String> configs = new HashMap<>();
+ configs.put("ConcurrentTasksPerWorker", String.valueOf(jobConfig.getNumConcurrentTasksPerInstance()));
+ configs.put("TaskTimeoutMs", String.valueOf(jobConfig.getTimeoutPerTask()));
+ configs.put("TaskExpireTimeMs", String.valueOf(jobConfig.getExpiry()));
+ configs.put("MinionWorkerGroupTag", jobConfig.getInstanceGroupTag());
+ return configs;
+ }
+
/**
* Get configs of the specified sub task for a given task.
*
@@ -567,7 +584,7 @@ public class PinotHelixTaskResourceManager {
String taskName = taskState.getKey();
// Iterate through all task configs associated with this task name
- for (PinotTaskConfig taskConfig : getTaskConfigs(taskName)) {
+ for (PinotTaskConfig taskConfig : getSubtaskConfigs(taskName)) {
Map<String, String> pinotConfigs = taskConfig.getConfigs();
// Filter task configs that matches this table name
@@ -650,7 +667,7 @@ public class PinotHelixTaskResourceManager {
String pinotTaskName = getPinotTaskName(helixJobName);
// Iterate through all task configs associated with this task name
- for (PinotTaskConfig taskConfig : getTaskConfigs(pinotTaskName)) {
+ for (PinotTaskConfig taskConfig : getSubtaskConfigs(pinotTaskName)) {
Map<String, String> pinotConfigs = taskConfig.getConfigs();
// Filter task configs that matches this table name
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java
index e80e9cfbb9..5846eba2d3 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java
@@ -288,7 +288,7 @@ public class MergeRollupMinionClusterIntegrationTest extends BaseClusterIntegrat
for (String tasks = _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE);
tasks != null; tasks =
_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE), numTasks++) {
- assertEquals(_helixTaskResourceManager.getTaskConfigs(tasks).size(), expectedNumSubTasks[numTasks]);
+ assertEquals(_helixTaskResourceManager.getSubtaskConfigs(tasks).size(), expectedNumSubTasks[numTasks]);
assertTrue(_helixTaskResourceManager.getTaskQueues()
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE)));
// Will not schedule task if there's incomplete task
@@ -393,7 +393,7 @@ public class MergeRollupMinionClusterIntegrationTest extends BaseClusterIntegrat
for (String tasks = _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE);
tasks != null; tasks =
_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE), numTasks++) {
- assertEquals(_helixTaskResourceManager.getTaskConfigs(tasks).size(), 1);
+ assertEquals(_helixTaskResourceManager.getSubtaskConfigs(tasks).size(), 1);
assertTrue(_helixTaskResourceManager.getTaskQueues()
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE)));
// Will not schedule task if there's incomplete task
@@ -541,7 +541,7 @@ public class MergeRollupMinionClusterIntegrationTest extends BaseClusterIntegrat
for (String tasks = _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE);
tasks != null; tasks =
_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE), numTasks++) {
- assertEquals(_helixTaskResourceManager.getTaskConfigs(tasks).size(), expectedNumSubTasks[numTasks]);
+ assertEquals(_helixTaskResourceManager.getSubtaskConfigs(tasks).size(), expectedNumSubTasks[numTasks]);
assertTrue(_helixTaskResourceManager.getTaskQueues()
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE)));
// Will not schedule task if there's incomplete task
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionEventObservers.java b/pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionEventObservers.java
index 440b38a787..46befe39a8 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionEventObservers.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionEventObservers.java
@@ -62,6 +62,7 @@ public class MinionEventObservers {
LOGGER.info("Configured to clean up task event observers immediately");
return;
}
+ LOGGER.info("Configured to clean up task event observers with cleanupDelayMs: {}", _eventObserverCleanupDelayMs);
_cleanupExecutor.submit(() -> {
LOGGER.info("Start to cleanup task event observers with cleanupDelayMs: {}", _eventObserverCleanupDelayMs);
while (!Thread.interrupted()) {
@@ -97,7 +98,13 @@ public class MinionEventObservers {
}
public static MinionEventObservers getInstance() {
- return _customInstance != null ? _customInstance : DEFAULT_INSTANCE;
+ if (_customInstance != null) {
+ return _customInstance;
+ }
+ // Test code might reach here, but this should never happen in prod case, as instance is created upon worker
+ // starts before any tasks can run. But log something for debugging just in case.
+ LOGGER.warn("Using default MinionEventObservers instance");
+ return DEFAULT_INSTANCE;
}
public MinionEventObserver getMinionEventObserver(String taskId) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org