You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2022/09/20 22:46:49 UTC

[GitHub] [pinot] klsince commented on a diff in pull request #9432: use MinionEventObserver to track finer grained task progress status on worker

klsince commented on code in PR #9432:
URL: https://github.com/apache/pinot/pull/9432#discussion_r975873009


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java:
##########
@@ -456,6 +461,75 @@ public synchronized Map<String, PinotTaskConfig> getSubtaskConfigs(String taskNa
     return taskConfigs;
   }
 
+  public synchronized Map<String, String> getSubtaskProgress(String taskName, @Nullable String subtaskNames,
+      Executor executor, HttpConnectionManager connMgr, Map<String, String> workerEndpoints,
+      Map<String, String> requestHeaders, int timeoutMs)
+      throws Exception {
+    String taskType = getTaskType(taskName);
+    WorkflowContext workflowContext = _taskDriver.getWorkflowContext(getHelixJobQueueName(taskType));
+    if (workflowContext == null) {
+      throw new UnknownTaskTypeException("Workflow context for task type doesn't exist: " + taskType);
+    }
+    String helixJobName = getHelixJobName(taskName);
+    JobContext jobContext = _taskDriver.getJobContext(helixJobName);
+    if (jobContext == null) {
+      throw new NoTaskScheduledException("No task scheduled with name: " + helixJobName);
+    }
+    Set<String> selectedSubtasks = new HashSet<>();
+    if (StringUtils.isNotEmpty(subtaskNames)) {
+      Collections.addAll(selectedSubtasks, StringUtils.split(subtaskNames, ','));
+    }
+    Map<String, String> allSubtaskWorkerMap = new HashMap<>();
+    Map<String, Set<String>> workerSelectedSubtasksMap = new HashMap<>();
+    for (int partition : jobContext.getPartitionSet()) {
+      String subtaskName = jobContext.getTaskIdForPartition(partition);
+      String worker = jobContext.getAssignedParticipant(partition);
+      allSubtaskWorkerMap.put(subtaskName, worker);
+      if (selectedSubtasks.isEmpty() || selectedSubtasks.contains(subtaskName)) {
+        workerSelectedSubtasksMap.computeIfAbsent(worker, k -> new HashSet<>()).add(subtaskName);
+      }
+    }
+    LOGGER.debug("Found subtasks on workers: {}", workerSelectedSubtasksMap);

Review Comment:
   we pass the Map reference to debug(), so the map is not serialized unless Debug is enabled as in Log4j's logIfEnabled(...)
   ```
   if (isEnabled(level, marker, message, p0, p1)) {
               logMessage(fqcn, level, marker, message, p0, p1);
           }
   ```
   
   anyway, this code path is not strict on perf, so should be fine not check that for a bit more readability. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org