You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2022/09/29 00:25:41 UTC
[pinot] branch master updated: refine the minion task progress api a bit (#9482)
This is an automated email from the ASF dual-hosted git repository.
jlli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new ae239cd100 refine the minion task progress api a bit (#9482)
ae239cd100 is described below
commit ae239cd10056330c008d23b1faaec158d660446c
Author: Xiaobing <61...@users.noreply.github.com>
AuthorDate: Wed Sep 28 17:25:34 2022 -0700
refine the minion task progress api a bit (#9482)
* refine the minion task progress api a bit
* fix ut
---
.../core/minion/PinotHelixTaskResourceManager.java | 13 ++++++++-----
.../minion/PinotHelixTaskResourceManagerTest.java | 19 +++++++++++++------
.../pinot/minion/event/MinionProgressObserver.java | 4 ++--
.../pinot/minion/taskfactory/TaskFactoryRegistry.java | 8 ++++----
.../java/org/apache/pinot/spi/utils/StringUtil.java | 13 -------------
5 files changed, 27 insertions(+), 30 deletions(-)
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
index fb28be3115..85b913c471 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
@@ -516,6 +516,10 @@ public class PinotHelixTaskResourceManager {
}
}
// Check if any subtask missed their progress from the worker.
+ // And simply check all subtasks if no subtasks are specified.
+ if (selectedSubtasks.isEmpty()) {
+ selectedSubtasks = allSubtasks.keySet();
+ }
for (String subtaskName : selectedSubtasks) {
if (subtaskProgressMap.containsKey(subtaskName)) {
continue;
@@ -530,12 +534,11 @@ public class PinotHelixTaskResourceManager {
String.format("No status from worker: %s. Got status: %s from Helix", taskWorker, helixState));
}
}
- // Raise error if any worker failed to report progress, with partial result.
if (serviceResponse._failedResponseCount > 0) {
- // TODO: track detailed worker failure via CompletionServiceHelper and send them back in response.
- throw new RuntimeException(String
- .format("There were %d workers failed to report task progress. Got partial progress info: %s",
- serviceResponse._failedResponseCount, subtaskProgressMap));
+ // Subtasks without worker side progress are filled with status tracked by Helix so return them back.
+ // The detailed worker failure response is logged as error by CompletionServiceResponse for debugging.
+ LOGGER.warn("There were {} workers failed to report task progress. Got partial progress info: {}",
+ serviceResponse._failedResponseCount, subtaskProgressMap);
}
return subtaskProgressMap;
}
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManagerTest.java
index fd601a21bd..fb3e8412f0 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManagerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManagerTest.java
@@ -29,6 +29,7 @@ import org.apache.helix.task.TaskDriver;
import org.apache.helix.task.TaskPartitionState;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.util.CompletionServiceHelper;
+import org.apache.pinot.spi.utils.JsonUtils;
import org.testng.annotations.Test;
import static org.mockito.ArgumentMatchers.any;
@@ -81,6 +82,7 @@ public class PinotHelixTaskResourceManagerTest {
new CompletionServiceHelper.CompletionServiceResponse();
when(httpHelper.doMultiGetRequest(any(), any(), anyBoolean(), any(), anyInt())).thenReturn(httpResp);
// Three workers to run 3 subtasks but got no progress status from workers.
+ httpResp._failedResponseCount = 3;
String[] workers = new String[]{"worker01", "worker02", "worker03"};
Map<String, String> workerEndpoints = new HashMap<>();
for (String worker : workers) {
@@ -108,8 +110,8 @@ public class PinotHelixTaskResourceManagerTest {
}
}
- @Test(expectedExceptions = RuntimeException.class)
- public void testGetSubtaskProgressWithFailure()
+ @Test
+ public void testGetSubtaskProgressWithResponse()
throws Exception {
TaskDriver taskDriver = mock(TaskDriver.class);
JobContext jobContext = mock(JobContext.class);
@@ -120,8 +122,6 @@ public class PinotHelixTaskResourceManagerTest {
CompletionServiceHelper.CompletionServiceResponse httpResp =
new CompletionServiceHelper.CompletionServiceResponse();
when(httpHelper.doMultiGetRequest(any(), any(), anyBoolean(), any(), anyInt())).thenReturn(httpResp);
- // Three workers to run 3 subtasks but got failure.
- httpResp._failedResponseCount = 3;
String[] workers = new String[]{"worker01", "worker02", "worker03"};
Map<String, String> workerEndpoints = new HashMap<>();
for (String worker : workers) {
@@ -133,6 +133,8 @@ public class PinotHelixTaskResourceManagerTest {
for (int i = 0; i < 3; i++) {
subtaskIds.add(i);
subtaskNames[i] = taskName + "_" + i;
+ httpResp._httpResponses.put(workers[i],
+ JsonUtils.objectToString(Collections.singletonMap(subtaskNames[i], "running on worker: " + i)));
}
TaskPartitionState[] helixStates =
new TaskPartitionState[]{TaskPartitionState.INIT, TaskPartitionState.RUNNING, TaskPartitionState.TASK_ERROR};
@@ -140,7 +142,12 @@ public class PinotHelixTaskResourceManagerTest {
when(jobContext.getAssignedParticipant(anyInt())).thenReturn(workers[0], workers[1], workers[2]);
when(jobContext.getPartitionState(anyInt())).thenReturn(helixStates[0], helixStates[1], helixStates[2]);
when(jobContext.getPartitionSet()).thenReturn(subtaskIds);
- mgr.getSubtaskProgress(taskName, StringUtils.join(subtaskNames, ','), httpHelper, workerEndpoints,
- Collections.emptyMap(), 1000);
+ Map<String, Object> progress =
+ mgr.getSubtaskProgress(taskName, StringUtils.join(subtaskNames, ','), httpHelper, workerEndpoints,
+ Collections.emptyMap(), 1000);
+ for (int i = 0; i < 3; i++) {
+ String taskProgress = (String) progress.get(subtaskNames[i]);
+ assertEquals(taskProgress, "running on worker: " + i);
+ }
}
}
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionProgressObserver.java b/pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionProgressObserver.java
index f37e3a6f55..1cc03eec63 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionProgressObserver.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionProgressObserver.java
@@ -24,8 +24,8 @@ import java.util.LinkedList;
import java.util.List;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
+import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.pinot.core.minion.PinotTaskConfig;
-import org.apache.pinot.spi.utils.StringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -94,7 +94,7 @@ public class MinionProgressObserver extends DefaultMinionEventObserver {
@Override
public synchronized void notifyTaskError(PinotTaskConfig pinotTaskConfig, Exception e) {
long endTs = System.currentTimeMillis();
- addStatus(endTs, "Task failed in " + (endTs - _startTs) + "ms with error: " + StringUtil.getStackTraceAsString(e));
+ addStatus(endTs, "Task failed in " + (endTs - _startTs) + "ms with error: " + ExceptionUtils.getStackTrace(e));
super.notifyTaskError(pinotTaskConfig, e);
}
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/taskfactory/TaskFactoryRegistry.java b/pinot-minion/src/main/java/org/apache/pinot/minion/taskfactory/TaskFactoryRegistry.java
index 7d95d73dbc..4fa1a144ea 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/taskfactory/TaskFactoryRegistry.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/taskfactory/TaskFactoryRegistry.java
@@ -22,6 +22,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.helix.HelixManager;
import org.apache.helix.task.JobContext;
import org.apache.helix.task.Task;
@@ -46,7 +47,6 @@ import org.apache.pinot.minion.exception.TaskCancelledException;
import org.apache.pinot.minion.executor.PinotTaskExecutor;
import org.apache.pinot.minion.executor.PinotTaskExecutorFactory;
import org.apache.pinot.minion.executor.TaskExecutorFactoryRegistry;
-import org.apache.pinot.spi.utils.StringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
@@ -127,17 +127,17 @@ public class TaskFactoryRegistry {
_eventObserver.notifyTaskCancelled(pinotTaskConfig);
_minionMetrics.addMeteredTableValue(taskType, MinionMeter.NUMBER_TASKS_CANCELLED, 1L);
LOGGER.info("Task: {} got cancelled", _taskConfig.getId(), e);
- return new TaskResult(TaskResult.Status.CANCELED, StringUtil.getStackTraceAsString(e));
+ return new TaskResult(TaskResult.Status.CANCELED, ExceptionUtils.getStackTrace(e));
} catch (FatalException e) {
_eventObserver.notifyTaskError(pinotTaskConfig, e);
_minionMetrics.addMeteredTableValue(taskType, MinionMeter.NUMBER_TASKS_FATAL_FAILED, 1L);
LOGGER.error("Caught fatal exception while executing task: {}", _taskConfig.getId(), e);
- return new TaskResult(TaskResult.Status.FATAL_FAILED, StringUtil.getStackTraceAsString(e));
+ return new TaskResult(TaskResult.Status.FATAL_FAILED, ExceptionUtils.getStackTrace(e));
} catch (Exception e) {
_eventObserver.notifyTaskError(pinotTaskConfig, e);
_minionMetrics.addMeteredTableValue(taskType, MinionMeter.NUMBER_TASKS_FAILED, 1L);
LOGGER.error("Caught exception while executing task: {}", _taskConfig.getId(), e);
- return new TaskResult(TaskResult.Status.FAILED, StringUtil.getStackTraceAsString(e));
+ return new TaskResult(TaskResult.Status.FAILED, ExceptionUtils.getStackTrace(e));
}
}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/StringUtil.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/StringUtil.java
index c4c6649924..d7d2d25875 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/StringUtil.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/StringUtil.java
@@ -18,8 +18,6 @@
*/
package org.apache.pinot.spi.utils;
-import java.io.PrintWriter;
-import java.io.StringWriter;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang.ArrayUtils;
@@ -87,15 +85,4 @@ public class StringUtil {
}
return value.substring(0, Math.min(index, maxLength));
}
-
- /**
- * Get the exception full stack track as String.
- */
- public static String getStackTraceAsString(Exception exp) {
- StringWriter expStr = new StringWriter();
- try (PrintWriter pw = new PrintWriter(expStr)) {
- exp.printStackTrace(pw);
- }
- return expStr.toString();
- }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org