You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2022/10/06 21:06:21 UTC

[pinot] branch master updated: get task runtime configs tracked in Helix (#9540)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 1c202c7ea0 get task runtime configs tracked in Helix (#9540)
1c202c7ea0 is described below

commit 1c202c7ea0a184de4c216943b1e36ba313431d22
Author: Xiaobing <61...@users.noreply.github.com>
AuthorDate: Thu Oct 6 14:06:15 2022 -0700

    get task runtime configs tracked in Helix (#9540)
---
 .../api/resources/PinotTaskRestletResource.java    | 20 +++++++++++++------
 .../helix/core/minion/ClusterInfoAccessor.java     |  2 +-
 .../core/minion/PinotHelixTaskResourceManager.java | 23 +++++++++++++++++++---
 .../MergeRollupMinionClusterIntegrationTest.java   |  6 +++---
 .../pinot/minion/event/MinionEventObservers.java   |  9 ++++++++-
 5 files changed, 46 insertions(+), 14 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
index e0535be493..c2605c7614 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
@@ -292,10 +292,9 @@ public class PinotTaskRestletResource {
     // Relying on original schema that was used to query the controller
     URI uri = _uriInfo.getRequestUri();
     String scheme = uri.getScheme();
-    List<String> controllerUrls = controllers.stream().map(controller -> {
-      return String.format("%s://%s:%d/tasks/generator/%s/%s/debug?localOnly=true", scheme, controller.getHostName(),
-          Integer.parseInt(controller.getPort()), tableNameWithType, taskType);
-    }).collect(Collectors.toList());
+    List<String> controllerUrls = controllers.stream().map(controller -> String
+        .format("%s://%s:%d/tasks/generator/%s/%s/debug?localOnly=true", scheme, controller.getHostName(),
+            Integer.parseInt(controller.getPort()), tableNameWithType, taskType)).collect(Collectors.toList());
 
     CompletionServiceHelper completionServiceHelper =
         new CompletionServiceHelper(_executor, _connectionManager, HashBiMap.create(0));
@@ -303,6 +302,7 @@ public class PinotTaskRestletResource {
     httpHeaders.getRequestHeaders().keySet().forEach(header -> {
       requestHeaders.put(header, httpHeaders.getHeaderString(header));
     });
+    LOGGER.debug("Getting task generation info with controllerUrls: {}", controllerUrls);
     CompletionServiceHelper.CompletionServiceResponse serviceResponse =
         completionServiceHelper.doMultiGetRequest(controllerUrls, null, true, requestHeaders, 10000);
 
@@ -386,7 +386,15 @@ public class PinotTaskRestletResource {
   @ApiOperation("Get the task config (a list of child task configs) for the given task")
   public List<PinotTaskConfig> getTaskConfigs(
       @ApiParam(value = "Task name", required = true) @PathParam("taskName") String taskName) {
-    return _pinotHelixTaskResourceManager.getTaskConfigs(taskName);
+    return _pinotHelixTaskResourceManager.getSubtaskConfigs(taskName);
+  }
+
+  @GET
+  @Path("/tasks/task/{taskName}/runtime/config")
+  @ApiOperation("Get the task runtime config for the given task")
+  public Map<String, String> getTaskConfig(
+      @ApiParam(value = "Task name", required = true) @PathParam("taskName") String taskName) {
+    return _pinotHelixTaskResourceManager.getTaskRuntimeConfig(taskName);
   }
 
   @Deprecated
@@ -395,7 +403,7 @@ public class PinotTaskRestletResource {
   @ApiOperation("Get the task config (a list of child task configs) for the given task (deprecated)")
   public List<PinotTaskConfig> getTaskConfigsDeprecated(
       @ApiParam(value = "Task name", required = true) @PathParam("taskName") String taskName) {
-    return _pinotHelixTaskResourceManager.getTaskConfigs(taskName);
+    return _pinotHelixTaskResourceManager.getSubtaskConfigs(taskName);
   }
 
   @GET
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java
index 14f5e0ee2c..bcb1491988 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java
@@ -148,7 +148,7 @@ public class ClusterInfoAccessor {
    * @return List of child task configs
    */
   public List<PinotTaskConfig> getTaskConfigs(String taskName) {
-    return _pinotHelixTaskResourceManager.getTaskConfigs(taskName);
+    return _pinotHelixTaskResourceManager.getSubtaskConfigs(taskName);
   }
 
   /**
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
index 04cb52481f..48e3717a72 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
@@ -434,7 +434,7 @@ public class PinotHelixTaskResourceManager {
    * @param taskName Task name
    * @return List of child task configs
    */
-  public synchronized List<PinotTaskConfig> getTaskConfigs(String taskName) {
+  public synchronized List<PinotTaskConfig> getSubtaskConfigs(String taskName) {
     Collection<TaskConfig> helixTaskConfigs =
         _taskDriver.getJobConfig(getHelixJobName(taskName)).getTaskConfigMap().values();
     List<PinotTaskConfig> taskConfigs = new ArrayList<>(helixTaskConfigs.size());
@@ -444,6 +444,23 @@ public class PinotHelixTaskResourceManager {
     return taskConfigs;
   }
 
+  /**
+   * Get the task runtime config for the given task name. A task can have multiple subtasks, whose configs can be
+   * retrieved via the getSubtaskConfigs() method instead.
+   *
+   * @param taskName Task name
+   * @return Configs for the task returned as a Map.
+   */
+  public synchronized Map<String, String> getTaskRuntimeConfig(String taskName) {
+    JobConfig jobConfig = _taskDriver.getJobConfig(getHelixJobName(taskName));
+    HashMap<String, String> configs = new HashMap<>();
+    configs.put("ConcurrentTasksPerWorker", String.valueOf(jobConfig.getNumConcurrentTasksPerInstance()));
+    configs.put("TaskTimeoutMs", String.valueOf(jobConfig.getTimeoutPerTask()));
+    configs.put("TaskExpireTimeMs", String.valueOf(jobConfig.getExpiry()));
+    configs.put("MinionWorkerGroupTag", jobConfig.getInstanceGroupTag());
+    return configs;
+  }
+
   /**
    * Get configs of the specified sub task for a given task.
    *
@@ -567,7 +584,7 @@ public class PinotHelixTaskResourceManager {
       String taskName = taskState.getKey();
 
       // Iterate through all task configs associated with this task name
-      for (PinotTaskConfig taskConfig : getTaskConfigs(taskName)) {
+      for (PinotTaskConfig taskConfig : getSubtaskConfigs(taskName)) {
         Map<String, String> pinotConfigs = taskConfig.getConfigs();
 
         // Filter task configs that matches this table name
@@ -650,7 +667,7 @@ public class PinotHelixTaskResourceManager {
       String pinotTaskName = getPinotTaskName(helixJobName);
 
       // Iterate through all task configs associated with this task name
-      for (PinotTaskConfig taskConfig : getTaskConfigs(pinotTaskName)) {
+      for (PinotTaskConfig taskConfig : getSubtaskConfigs(pinotTaskName)) {
         Map<String, String> pinotConfigs = taskConfig.getConfigs();
 
         // Filter task configs that matches this table name
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java
index e80e9cfbb9..5846eba2d3 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java
@@ -288,7 +288,7 @@ public class MergeRollupMinionClusterIntegrationTest extends BaseClusterIntegrat
     for (String tasks = _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE);
         tasks != null; tasks =
         _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE), numTasks++) {
-      assertEquals(_helixTaskResourceManager.getTaskConfigs(tasks).size(), expectedNumSubTasks[numTasks]);
+      assertEquals(_helixTaskResourceManager.getSubtaskConfigs(tasks).size(), expectedNumSubTasks[numTasks]);
       assertTrue(_helixTaskResourceManager.getTaskQueues()
           .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE)));
       // Will not schedule task if there's incomplete task
@@ -393,7 +393,7 @@ public class MergeRollupMinionClusterIntegrationTest extends BaseClusterIntegrat
     for (String tasks = _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE);
         tasks != null; tasks =
         _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE), numTasks++) {
-      assertEquals(_helixTaskResourceManager.getTaskConfigs(tasks).size(), 1);
+      assertEquals(_helixTaskResourceManager.getSubtaskConfigs(tasks).size(), 1);
       assertTrue(_helixTaskResourceManager.getTaskQueues()
           .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE)));
       // Will not schedule task if there's incomplete task
@@ -541,7 +541,7 @@ public class MergeRollupMinionClusterIntegrationTest extends BaseClusterIntegrat
     for (String tasks = _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE);
         tasks != null; tasks =
         _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE), numTasks++) {
-      assertEquals(_helixTaskResourceManager.getTaskConfigs(tasks).size(), expectedNumSubTasks[numTasks]);
+      assertEquals(_helixTaskResourceManager.getSubtaskConfigs(tasks).size(), expectedNumSubTasks[numTasks]);
       assertTrue(_helixTaskResourceManager.getTaskQueues()
           .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE)));
       // Will not schedule task if there's incomplete task
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionEventObservers.java b/pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionEventObservers.java
index 440b38a787..46befe39a8 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionEventObservers.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionEventObservers.java
@@ -62,6 +62,7 @@ public class MinionEventObservers {
       LOGGER.info("Configured to clean up task event observers immediately");
       return;
     }
+    LOGGER.info("Configured to clean up task event observers with cleanupDelayMs: {}", _eventObserverCleanupDelayMs);
     _cleanupExecutor.submit(() -> {
       LOGGER.info("Start to cleanup task event observers with cleanupDelayMs: {}", _eventObserverCleanupDelayMs);
       while (!Thread.interrupted()) {
@@ -97,7 +98,13 @@ public class MinionEventObservers {
   }
 
   public static MinionEventObservers getInstance() {
-    return _customInstance != null ? _customInstance : DEFAULT_INSTANCE;
+    if (_customInstance != null) {
+      return _customInstance;
+    }
+    // Test code might reach here, but this should never happen in prod case, as instance is created upon worker
+    // starts before any tasks can run. But log something for debugging just in case.
+    LOGGER.warn("Using default MinionEventObservers instance");
+    return DEFAULT_INSTANCE;
   }
 
   public MinionEventObserver getMinionEventObserver(String taskId) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org