You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2022/12/15 07:53:29 UTC
[pinot] branch master updated: handle pending minion tasks properly when getting the task progress status (#9911)
This is an automated email from the ASF dual-hosted git repository.
snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new cf4356738e handle pending minion tasks properly when getting the task progress status (#9911)
cf4356738e is described below
commit cf4356738e6493b04c22416856f4ebbdcf9d54ae
Author: Xiaobing <61...@users.noreply.github.com>
AuthorDate: Wed Dec 14 23:53:22 2022 -0800
handle pending minion tasks properly when getting the task progress status (#9911)
* handle pending tasks properly when getting task progress status
* add test
---
.../core/minion/PinotHelixTaskResourceManager.java | 45 ++++++++++++---------
.../minion/PinotHelixTaskResourceManagerTest.java | 47 +++++++++++++++++++++-
2 files changed, 72 insertions(+), 20 deletions(-)
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
index 48e3717a72..4a2c0bad3c 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
@@ -517,27 +517,41 @@ public class PinotHelixTaskResourceManager {
for (int partition : jobContext.getPartitionSet()) {
String subtaskName = jobContext.getTaskIdForPartition(partition);
String worker = jobContext.getAssignedParticipant(partition);
- allSubtasks.put(subtaskName, new String[]{worker, jobContext.getPartitionState(partition).name()});
+ TaskPartitionState partitionState = jobContext.getPartitionState(partition);
+ String taskState = partitionState == null ? null : partitionState.name();
+ allSubtasks.put(subtaskName, new String[]{worker, taskState});
+ LOGGER.debug("Subtask: {} is assigned to worker: {} with state: {} in Helix", subtaskName, worker, taskState);
+ if (worker == null) {
+ continue;
+ }
if (selectedSubtasks.isEmpty() || selectedSubtasks.contains(subtaskName)) {
workerSelectedSubtasksMap.computeIfAbsent(worker, k -> new HashSet<>()).add(subtaskName);
}
}
LOGGER.debug("Found subtasks on workers: {}", workerSelectedSubtasksMap);
List<String> workerUrls = new ArrayList<>();
- workerSelectedSubtasksMap.forEach((workerId, subtasksOnWorker) -> workerUrls.add(String
- .format("%s/tasks/subtask/progress?subtaskNames=%s", workerEndpoints.get(workerId),
+ workerSelectedSubtasksMap.forEach((workerId, subtasksOnWorker) -> workerUrls.add(
+ String.format("%s/tasks/subtask/progress?subtaskNames=%s", workerEndpoints.get(workerId),
StringUtils.join(subtasksOnWorker, CommonConstants.Minion.TASK_LIST_SEPARATOR))));
LOGGER.debug("Getting task progress with workerUrls: {}", workerUrls);
// Scatter and gather progress from multiple workers.
Map<String, Object> subtaskProgressMap = new HashMap<>();
- CompletionServiceHelper.CompletionServiceResponse serviceResponse =
- completionServiceHelper.doMultiGetRequest(workerUrls, null, true, requestHeaders, timeoutMs);
- for (Map.Entry<String, String> entry : serviceResponse._httpResponses.entrySet()) {
- String worker = entry.getKey();
- String resp = entry.getValue();
- LOGGER.debug("Got resp: {} from worker: {}", resp, worker);
- if (StringUtils.isNotEmpty(resp)) {
- subtaskProgressMap.putAll(JsonUtils.stringToObject(resp, Map.class));
+ if (!workerUrls.isEmpty()) {
+ CompletionServiceHelper.CompletionServiceResponse serviceResponse =
+ completionServiceHelper.doMultiGetRequest(workerUrls, null, true, requestHeaders, timeoutMs);
+ for (Map.Entry<String, String> entry : serviceResponse._httpResponses.entrySet()) {
+ String worker = entry.getKey();
+ String resp = entry.getValue();
+ LOGGER.debug("Got resp: {} from worker: {}", resp, worker);
+ if (StringUtils.isNotEmpty(resp)) {
+ subtaskProgressMap.putAll(JsonUtils.stringToObject(resp, Map.class));
+ }
+ }
+ if (serviceResponse._failedResponseCount > 0) {
+ // Instead of aborting, subtasks without worker side progress return the task status tracked by Helix.
+ // The detailed worker failure response is logged as error by CompletionServiceResponse for debugging.
+ LOGGER.warn("There were {} workers failed to report task progress. Got partial progress info: {}",
+ serviceResponse._failedResponseCount, subtaskProgressMap);
}
}
// Check if any subtask missed their progress from the worker.
@@ -549,8 +563,9 @@ public class PinotHelixTaskResourceManager {
if (subtaskProgressMap.containsKey(subtaskName)) {
continue;
}
+ // Return the task progress status tracked by Helix.
String[] taskWorkerAndHelixState = allSubtasks.get(subtaskName);
- if (taskWorkerAndHelixState == null) {
+ if (taskWorkerAndHelixState == null || taskWorkerAndHelixState[0] == null) {
subtaskProgressMap.put(subtaskName, "No worker has run this subtask");
} else {
String taskWorker = taskWorkerAndHelixState[0];
@@ -559,12 +574,6 @@ public class PinotHelixTaskResourceManager {
String.format("No status from worker: %s. Got status: %s from Helix", taskWorker, helixState));
}
}
- if (serviceResponse._failedResponseCount > 0) {
- // Subtasks without worker side progress are filled with status tracked by Helix so return them back.
- // The detailed worker failure response is logged as error by CompletionServiceResponse for debugging.
- LOGGER.warn("There were {} workers failed to report task progress. Got partial progress info: {}",
- serviceResponse._failedResponseCount, subtaskProgressMap);
- }
return subtaskProgressMap;
}
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManagerTest.java
index fb3e8412f0..bf354d5622 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManagerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManagerTest.java
@@ -83,7 +83,7 @@ public class PinotHelixTaskResourceManagerTest {
when(httpHelper.doMultiGetRequest(any(), any(), anyBoolean(), any(), anyInt())).thenReturn(httpResp);
// Three workers to run 3 subtasks but got no progress status from workers.
httpResp._failedResponseCount = 3;
- String[] workers = new String[]{"worker01", "worker02", "worker03"};
+ String[] workers = new String[]{"worker0", "worker1", "worker2"};
Map<String, String> workerEndpoints = new HashMap<>();
for (String worker : workers) {
workerEndpoints.put(worker, "http://" + worker + ":9000");
@@ -122,7 +122,7 @@ public class PinotHelixTaskResourceManagerTest {
CompletionServiceHelper.CompletionServiceResponse httpResp =
new CompletionServiceHelper.CompletionServiceResponse();
when(httpHelper.doMultiGetRequest(any(), any(), anyBoolean(), any(), anyInt())).thenReturn(httpResp);
- String[] workers = new String[]{"worker01", "worker02", "worker03"};
+ String[] workers = new String[]{"worker0", "worker1", "worker2"};
Map<String, String> workerEndpoints = new HashMap<>();
for (String worker : workers) {
workerEndpoints.put(worker, "http://" + worker + ":9000");
@@ -150,4 +150,47 @@ public class PinotHelixTaskResourceManagerTest {
assertEquals(taskProgress, "running on worker: " + i);
}
}
+
+ @Test
+ public void testGetSubtaskProgressPending()
+ throws Exception {
+ TaskDriver taskDriver = mock(TaskDriver.class);
+ JobContext jobContext = mock(JobContext.class);
+ when(taskDriver.getJobContext(anyString())).thenReturn(jobContext);
+ PinotHelixTaskResourceManager mgr =
+ new PinotHelixTaskResourceManager(mock(PinotHelixResourceManager.class), taskDriver);
+ CompletionServiceHelper httpHelper = mock(CompletionServiceHelper.class);
+ CompletionServiceHelper.CompletionServiceResponse httpResp =
+ new CompletionServiceHelper.CompletionServiceResponse();
+ when(httpHelper.doMultiGetRequest(any(), any(), anyBoolean(), any(), anyInt())).thenReturn(httpResp);
+ String[] workers = new String[]{"worker0", "worker1", "worker2"};
+ Map<String, String> workerEndpoints = new HashMap<>();
+ for (String worker : workers) {
+ workerEndpoints.put(worker, "http://" + worker + ":9000");
+ }
+ String taskName = "Task_SegmentGenerationAndPushTask_someone";
+ String[] subtaskNames = new String[3];
+ Set<Integer> subtaskIds = new HashSet<>();
+ for (int i = 0; i < 3; i++) {
+ subtaskIds.add(i);
+ subtaskNames[i] = taskName + "_" + i;
+ }
+ // Some subtasks are pending to be run.
+ TaskPartitionState[] helixStates = new TaskPartitionState[]{TaskPartitionState.RUNNING, null, null};
+ httpResp._httpResponses.put(workers[0],
+ JsonUtils.objectToString(Collections.singletonMap(subtaskNames[0], "running on worker: 0")));
+ when(jobContext.getTaskIdForPartition(anyInt())).thenReturn(subtaskNames[0], subtaskNames[1], subtaskNames[2]);
+ when(jobContext.getAssignedParticipant(anyInt())).thenReturn(workers[0], null, null);
+ when(jobContext.getPartitionState(anyInt())).thenReturn(helixStates[0], null, null);
+ when(jobContext.getPartitionSet()).thenReturn(subtaskIds);
+ Map<String, Object> progress =
+ mgr.getSubtaskProgress(taskName, StringUtils.join(subtaskNames, ','), httpHelper, workerEndpoints,
+ Collections.emptyMap(), 1000);
+ String taskProgress = (String) progress.get(subtaskNames[0]);
+ assertEquals(taskProgress, "running on worker: 0");
+ taskProgress = (String) progress.get(subtaskNames[1]);
+ assertEquals(taskProgress, "No worker has run this subtask");
+ taskProgress = (String) progress.get(subtaskNames[2]);
+ assertEquals(taskProgress, "No worker has run this subtask");
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org