You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2022/09/29 00:25:41 UTC

[pinot] branch master updated: refine the minion task progress api a bit (#9482)

This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new ae239cd100 refine the minion task progress api a bit (#9482)
ae239cd100 is described below

commit ae239cd10056330c008d23b1faaec158d660446c
Author: Xiaobing <61...@users.noreply.github.com>
AuthorDate: Wed Sep 28 17:25:34 2022 -0700

    refine the minion task progress api a bit (#9482)
    
    * refine the minion task progress api a bit
    
    * fix ut
---
 .../core/minion/PinotHelixTaskResourceManager.java    | 13 ++++++++-----
 .../minion/PinotHelixTaskResourceManagerTest.java     | 19 +++++++++++++------
 .../pinot/minion/event/MinionProgressObserver.java    |  4 ++--
 .../pinot/minion/taskfactory/TaskFactoryRegistry.java |  8 ++++----
 .../java/org/apache/pinot/spi/utils/StringUtil.java   | 13 -------------
 5 files changed, 27 insertions(+), 30 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
index fb28be3115..85b913c471 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
@@ -516,6 +516,10 @@ public class PinotHelixTaskResourceManager {
       }
     }
     // Check if any subtask missed their progress from the worker.
+    // And simply check all subtasks if no subtasks are specified.
+    if (selectedSubtasks.isEmpty()) {
+      selectedSubtasks = allSubtasks.keySet();
+    }
     for (String subtaskName : selectedSubtasks) {
       if (subtaskProgressMap.containsKey(subtaskName)) {
         continue;
@@ -530,12 +534,11 @@ public class PinotHelixTaskResourceManager {
             String.format("No status from worker: %s. Got status: %s from Helix", taskWorker, helixState));
       }
     }
-    // Raise error if any worker failed to report progress, with partial result.
     if (serviceResponse._failedResponseCount > 0) {
-      // TODO: track detailed worker failure via CompletionServiceHelper and send them back in response.
-      throw new RuntimeException(String
-          .format("There were %d workers failed to report task progress. Got partial progress info: %s",
-              serviceResponse._failedResponseCount, subtaskProgressMap));
+      // Subtasks without worker side progress are filled with status tracked by Helix so return them back.
+      // The detailed worker failure response is logged as error by CompletionServiceResponse for debugging.
+      LOGGER.warn("There were {} workers failed to report task progress. Got partial progress info: {}",
+          serviceResponse._failedResponseCount, subtaskProgressMap);
     }
     return subtaskProgressMap;
   }
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManagerTest.java
index fd601a21bd..fb3e8412f0 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManagerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManagerTest.java
@@ -29,6 +29,7 @@ import org.apache.helix.task.TaskDriver;
 import org.apache.helix.task.TaskPartitionState;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.util.CompletionServiceHelper;
+import org.apache.pinot.spi.utils.JsonUtils;
 import org.testng.annotations.Test;
 
 import static org.mockito.ArgumentMatchers.any;
@@ -81,6 +82,7 @@ public class PinotHelixTaskResourceManagerTest {
         new CompletionServiceHelper.CompletionServiceResponse();
     when(httpHelper.doMultiGetRequest(any(), any(), anyBoolean(), any(), anyInt())).thenReturn(httpResp);
     // Three workers to run 3 subtasks but got no progress status from workers.
+    httpResp._failedResponseCount = 3;
     String[] workers = new String[]{"worker01", "worker02", "worker03"};
     Map<String, String> workerEndpoints = new HashMap<>();
     for (String worker : workers) {
@@ -108,8 +110,8 @@ public class PinotHelixTaskResourceManagerTest {
     }
   }
 
-  @Test(expectedExceptions = RuntimeException.class)
-  public void testGetSubtaskProgressWithFailure()
+  @Test
+  public void testGetSubtaskProgressWithResponse()
       throws Exception {
     TaskDriver taskDriver = mock(TaskDriver.class);
     JobContext jobContext = mock(JobContext.class);
@@ -120,8 +122,6 @@ public class PinotHelixTaskResourceManagerTest {
     CompletionServiceHelper.CompletionServiceResponse httpResp =
         new CompletionServiceHelper.CompletionServiceResponse();
     when(httpHelper.doMultiGetRequest(any(), any(), anyBoolean(), any(), anyInt())).thenReturn(httpResp);
-    // Three workers to run 3 subtasks but got failure.
-    httpResp._failedResponseCount = 3;
     String[] workers = new String[]{"worker01", "worker02", "worker03"};
     Map<String, String> workerEndpoints = new HashMap<>();
     for (String worker : workers) {
@@ -133,6 +133,8 @@ public class PinotHelixTaskResourceManagerTest {
     for (int i = 0; i < 3; i++) {
       subtaskIds.add(i);
       subtaskNames[i] = taskName + "_" + i;
+      httpResp._httpResponses.put(workers[i],
+          JsonUtils.objectToString(Collections.singletonMap(subtaskNames[i], "running on worker: " + i)));
     }
     TaskPartitionState[] helixStates =
         new TaskPartitionState[]{TaskPartitionState.INIT, TaskPartitionState.RUNNING, TaskPartitionState.TASK_ERROR};
@@ -140,7 +142,12 @@ public class PinotHelixTaskResourceManagerTest {
     when(jobContext.getAssignedParticipant(anyInt())).thenReturn(workers[0], workers[1], workers[2]);
     when(jobContext.getPartitionState(anyInt())).thenReturn(helixStates[0], helixStates[1], helixStates[2]);
     when(jobContext.getPartitionSet()).thenReturn(subtaskIds);
-    mgr.getSubtaskProgress(taskName, StringUtils.join(subtaskNames, ','), httpHelper, workerEndpoints,
-        Collections.emptyMap(), 1000);
+    Map<String, Object> progress =
+        mgr.getSubtaskProgress(taskName, StringUtils.join(subtaskNames, ','), httpHelper, workerEndpoints,
+            Collections.emptyMap(), 1000);
+    for (int i = 0; i < 3; i++) {
+      String taskProgress = (String) progress.get(subtaskNames[i]);
+      assertEquals(taskProgress, "running on worker: " + i);
+    }
   }
 }
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionProgressObserver.java b/pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionProgressObserver.java
index f37e3a6f55..1cc03eec63 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionProgressObserver.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionProgressObserver.java
@@ -24,8 +24,8 @@ import java.util.LinkedList;
 import java.util.List;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
+import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.pinot.core.minion.PinotTaskConfig;
-import org.apache.pinot.spi.utils.StringUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -94,7 +94,7 @@ public class MinionProgressObserver extends DefaultMinionEventObserver {
   @Override
   public synchronized void notifyTaskError(PinotTaskConfig pinotTaskConfig, Exception e) {
     long endTs = System.currentTimeMillis();
-    addStatus(endTs, "Task failed in " + (endTs - _startTs) + "ms with error: " + StringUtil.getStackTraceAsString(e));
+    addStatus(endTs, "Task failed in " + (endTs - _startTs) + "ms with error: " + ExceptionUtils.getStackTrace(e));
     super.notifyTaskError(pinotTaskConfig, e);
   }
 
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/taskfactory/TaskFactoryRegistry.java b/pinot-minion/src/main/java/org/apache/pinot/minion/taskfactory/TaskFactoryRegistry.java
index 7d95d73dbc..4fa1a144ea 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/taskfactory/TaskFactoryRegistry.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/taskfactory/TaskFactoryRegistry.java
@@ -22,6 +22,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.helix.HelixManager;
 import org.apache.helix.task.JobContext;
 import org.apache.helix.task.Task;
@@ -46,7 +47,6 @@ import org.apache.pinot.minion.exception.TaskCancelledException;
 import org.apache.pinot.minion.executor.PinotTaskExecutor;
 import org.apache.pinot.minion.executor.PinotTaskExecutorFactory;
 import org.apache.pinot.minion.executor.TaskExecutorFactoryRegistry;
-import org.apache.pinot.spi.utils.StringUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
@@ -127,17 +127,17 @@ public class TaskFactoryRegistry {
                 _eventObserver.notifyTaskCancelled(pinotTaskConfig);
                 _minionMetrics.addMeteredTableValue(taskType, MinionMeter.NUMBER_TASKS_CANCELLED, 1L);
                 LOGGER.info("Task: {} got cancelled", _taskConfig.getId(), e);
-                return new TaskResult(TaskResult.Status.CANCELED, StringUtil.getStackTraceAsString(e));
+                return new TaskResult(TaskResult.Status.CANCELED, ExceptionUtils.getStackTrace(e));
               } catch (FatalException e) {
                 _eventObserver.notifyTaskError(pinotTaskConfig, e);
                 _minionMetrics.addMeteredTableValue(taskType, MinionMeter.NUMBER_TASKS_FATAL_FAILED, 1L);
                 LOGGER.error("Caught fatal exception while executing task: {}", _taskConfig.getId(), e);
-                return new TaskResult(TaskResult.Status.FATAL_FAILED, StringUtil.getStackTraceAsString(e));
+                return new TaskResult(TaskResult.Status.FATAL_FAILED, ExceptionUtils.getStackTrace(e));
               } catch (Exception e) {
                 _eventObserver.notifyTaskError(pinotTaskConfig, e);
                 _minionMetrics.addMeteredTableValue(taskType, MinionMeter.NUMBER_TASKS_FAILED, 1L);
                 LOGGER.error("Caught exception while executing task: {}", _taskConfig.getId(), e);
-                return new TaskResult(TaskResult.Status.FAILED, StringUtil.getStackTraceAsString(e));
+                return new TaskResult(TaskResult.Status.FAILED, ExceptionUtils.getStackTrace(e));
               }
             }
 
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/StringUtil.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/StringUtil.java
index c4c6649924..d7d2d25875 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/StringUtil.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/StringUtil.java
@@ -18,8 +18,6 @@
  */
 package org.apache.pinot.spi.utils;
 
-import java.io.PrintWriter;
-import java.io.StringWriter;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.commons.lang.ArrayUtils;
@@ -87,15 +85,4 @@ public class StringUtil {
     }
     return value.substring(0, Math.min(index, maxLength));
   }
-
-  /**
-   * Get the exception full stack track as String.
-   */
-  public static String getStackTraceAsString(Exception exp) {
-    StringWriter expStr = new StringWriter();
-    try (PrintWriter pw = new PrintWriter(expStr)) {
-      exp.printStackTrace(pw);
-    }
-    return expStr.toString();
-  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org