You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2022/12/15 07:53:29 UTC

[pinot] branch master updated: handle pending minion tasks properly when getting the task progress status (#9911)

This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new cf4356738e handle pending minion tasks properly when getting the task progress status (#9911)
cf4356738e is described below

commit cf4356738e6493b04c22416856f4ebbdcf9d54ae
Author: Xiaobing <61...@users.noreply.github.com>
AuthorDate: Wed Dec 14 23:53:22 2022 -0800

    handle pending minion tasks properly when getting the task progress status (#9911)
    
    * handle pending tasks properly when getting task progress status
    
    * add test
---
 .../core/minion/PinotHelixTaskResourceManager.java | 45 ++++++++++++---------
 .../minion/PinotHelixTaskResourceManagerTest.java  | 47 +++++++++++++++++++++-
 2 files changed, 72 insertions(+), 20 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
index 48e3717a72..4a2c0bad3c 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
@@ -517,27 +517,41 @@ public class PinotHelixTaskResourceManager {
     for (int partition : jobContext.getPartitionSet()) {
       String subtaskName = jobContext.getTaskIdForPartition(partition);
       String worker = jobContext.getAssignedParticipant(partition);
-      allSubtasks.put(subtaskName, new String[]{worker, jobContext.getPartitionState(partition).name()});
+      TaskPartitionState partitionState = jobContext.getPartitionState(partition);
+      String taskState = partitionState == null ? null : partitionState.name();
+      allSubtasks.put(subtaskName, new String[]{worker, taskState});
+      LOGGER.debug("Subtask: {} is assigned to worker: {} with state: {} in Helix", subtaskName, worker, taskState);
+      if (worker == null) {
+        continue;
+      }
       if (selectedSubtasks.isEmpty() || selectedSubtasks.contains(subtaskName)) {
         workerSelectedSubtasksMap.computeIfAbsent(worker, k -> new HashSet<>()).add(subtaskName);
       }
     }
     LOGGER.debug("Found subtasks on workers: {}", workerSelectedSubtasksMap);
     List<String> workerUrls = new ArrayList<>();
-    workerSelectedSubtasksMap.forEach((workerId, subtasksOnWorker) -> workerUrls.add(String
-        .format("%s/tasks/subtask/progress?subtaskNames=%s", workerEndpoints.get(workerId),
+    workerSelectedSubtasksMap.forEach((workerId, subtasksOnWorker) -> workerUrls.add(
+        String.format("%s/tasks/subtask/progress?subtaskNames=%s", workerEndpoints.get(workerId),
             StringUtils.join(subtasksOnWorker, CommonConstants.Minion.TASK_LIST_SEPARATOR))));
     LOGGER.debug("Getting task progress with workerUrls: {}", workerUrls);
     // Scatter and gather progress from multiple workers.
     Map<String, Object> subtaskProgressMap = new HashMap<>();
-    CompletionServiceHelper.CompletionServiceResponse serviceResponse =
-        completionServiceHelper.doMultiGetRequest(workerUrls, null, true, requestHeaders, timeoutMs);
-    for (Map.Entry<String, String> entry : serviceResponse._httpResponses.entrySet()) {
-      String worker = entry.getKey();
-      String resp = entry.getValue();
-      LOGGER.debug("Got resp: {} from worker: {}", resp, worker);
-      if (StringUtils.isNotEmpty(resp)) {
-        subtaskProgressMap.putAll(JsonUtils.stringToObject(resp, Map.class));
+    if (!workerUrls.isEmpty()) {
+      CompletionServiceHelper.CompletionServiceResponse serviceResponse =
+          completionServiceHelper.doMultiGetRequest(workerUrls, null, true, requestHeaders, timeoutMs);
+      for (Map.Entry<String, String> entry : serviceResponse._httpResponses.entrySet()) {
+        String worker = entry.getKey();
+        String resp = entry.getValue();
+        LOGGER.debug("Got resp: {} from worker: {}", resp, worker);
+        if (StringUtils.isNotEmpty(resp)) {
+          subtaskProgressMap.putAll(JsonUtils.stringToObject(resp, Map.class));
+        }
+      }
+      if (serviceResponse._failedResponseCount > 0) {
+        // Instead of aborting, subtasks without worker side progress return the task status tracked by Helix.
+        // The detailed worker failure response is logged as error by CompletionServiceResponse for debugging.
+        LOGGER.warn("There were {} workers failed to report task progress. Got partial progress info: {}",
+            serviceResponse._failedResponseCount, subtaskProgressMap);
       }
     }
     // Check if any subtask missed their progress from the worker.
@@ -549,8 +563,9 @@ public class PinotHelixTaskResourceManager {
       if (subtaskProgressMap.containsKey(subtaskName)) {
         continue;
       }
+      // Return the task progress status tracked by Helix.
       String[] taskWorkerAndHelixState = allSubtasks.get(subtaskName);
-      if (taskWorkerAndHelixState == null) {
+      if (taskWorkerAndHelixState == null || taskWorkerAndHelixState[0] == null) {
         subtaskProgressMap.put(subtaskName, "No worker has run this subtask");
       } else {
         String taskWorker = taskWorkerAndHelixState[0];
@@ -559,12 +574,6 @@ public class PinotHelixTaskResourceManager {
             String.format("No status from worker: %s. Got status: %s from Helix", taskWorker, helixState));
       }
     }
-    if (serviceResponse._failedResponseCount > 0) {
-      // Subtasks without worker side progress are filled with status tracked by Helix so return them back.
-      // The detailed worker failure response is logged as error by CompletionServiceResponse for debugging.
-      LOGGER.warn("There were {} workers failed to report task progress. Got partial progress info: {}",
-          serviceResponse._failedResponseCount, subtaskProgressMap);
-    }
     return subtaskProgressMap;
   }
 
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManagerTest.java
index fb3e8412f0..bf354d5622 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManagerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManagerTest.java
@@ -83,7 +83,7 @@ public class PinotHelixTaskResourceManagerTest {
     when(httpHelper.doMultiGetRequest(any(), any(), anyBoolean(), any(), anyInt())).thenReturn(httpResp);
     // Three workers to run 3 subtasks but got no progress status from workers.
     httpResp._failedResponseCount = 3;
-    String[] workers = new String[]{"worker01", "worker02", "worker03"};
+    String[] workers = new String[]{"worker0", "worker1", "worker2"};
     Map<String, String> workerEndpoints = new HashMap<>();
     for (String worker : workers) {
       workerEndpoints.put(worker, "http://" + worker + ":9000");
@@ -122,7 +122,7 @@ public class PinotHelixTaskResourceManagerTest {
     CompletionServiceHelper.CompletionServiceResponse httpResp =
         new CompletionServiceHelper.CompletionServiceResponse();
     when(httpHelper.doMultiGetRequest(any(), any(), anyBoolean(), any(), anyInt())).thenReturn(httpResp);
-    String[] workers = new String[]{"worker01", "worker02", "worker03"};
+    String[] workers = new String[]{"worker0", "worker1", "worker2"};
     Map<String, String> workerEndpoints = new HashMap<>();
     for (String worker : workers) {
       workerEndpoints.put(worker, "http://" + worker + ":9000");
@@ -150,4 +150,47 @@ public class PinotHelixTaskResourceManagerTest {
       assertEquals(taskProgress, "running on worker: " + i);
     }
   }
+
+  @Test
+  public void testGetSubtaskProgressPending()
+      throws Exception {
+    TaskDriver taskDriver = mock(TaskDriver.class);
+    JobContext jobContext = mock(JobContext.class);
+    when(taskDriver.getJobContext(anyString())).thenReturn(jobContext);
+    PinotHelixTaskResourceManager mgr =
+        new PinotHelixTaskResourceManager(mock(PinotHelixResourceManager.class), taskDriver);
+    CompletionServiceHelper httpHelper = mock(CompletionServiceHelper.class);
+    CompletionServiceHelper.CompletionServiceResponse httpResp =
+        new CompletionServiceHelper.CompletionServiceResponse();
+    when(httpHelper.doMultiGetRequest(any(), any(), anyBoolean(), any(), anyInt())).thenReturn(httpResp);
+    String[] workers = new String[]{"worker0", "worker1", "worker2"};
+    Map<String, String> workerEndpoints = new HashMap<>();
+    for (String worker : workers) {
+      workerEndpoints.put(worker, "http://" + worker + ":9000");
+    }
+    String taskName = "Task_SegmentGenerationAndPushTask_someone";
+    String[] subtaskNames = new String[3];
+    Set<Integer> subtaskIds = new HashSet<>();
+    for (int i = 0; i < 3; i++) {
+      subtaskIds.add(i);
+      subtaskNames[i] = taskName + "_" + i;
+    }
+    // Some subtasks are pending to be run.
+    TaskPartitionState[] helixStates = new TaskPartitionState[]{TaskPartitionState.RUNNING, null, null};
+    httpResp._httpResponses.put(workers[0],
+        JsonUtils.objectToString(Collections.singletonMap(subtaskNames[0], "running on worker: 0")));
+    when(jobContext.getTaskIdForPartition(anyInt())).thenReturn(subtaskNames[0], subtaskNames[1], subtaskNames[2]);
+    when(jobContext.getAssignedParticipant(anyInt())).thenReturn(workers[0], null, null);
+    when(jobContext.getPartitionState(anyInt())).thenReturn(helixStates[0], null, null);
+    when(jobContext.getPartitionSet()).thenReturn(subtaskIds);
+    Map<String, Object> progress =
+        mgr.getSubtaskProgress(taskName, StringUtils.join(subtaskNames, ','), httpHelper, workerEndpoints,
+            Collections.emptyMap(), 1000);
+    String taskProgress = (String) progress.get(subtaskNames[0]);
+    assertEquals(taskProgress, "running on worker: 0");
+    taskProgress = (String) progress.get(subtaskNames[1]);
+    assertEquals(taskProgress, "No worker has run this subtask");
+    taskProgress = (String) progress.get(subtaskNames[2]);
+    assertEquals(taskProgress, "No worker has run this subtask");
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org