You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2022/09/20 22:13:50 UTC

[GitHub] [pinot] zhtaoxiang commented on a diff in pull request #9432: use MinionEventObserver to track finer grained task progress status on worker

zhtaoxiang commented on code in PR #9432:
URL: https://github.com/apache/pinot/pull/9432#discussion_r975808771


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java:
##########
@@ -456,6 +461,75 @@ public synchronized Map<String, PinotTaskConfig> getSubtaskConfigs(String taskNa
     return taskConfigs;
   }
 
+  public synchronized Map<String, String> getSubtaskProgress(String taskName, @Nullable String subtaskNames,
+      Executor executor, HttpConnectionManager connMgr, Map<String, String> workerEndpoints,
+      Map<String, String> requestHeaders, int timeoutMs)
+      throws Exception {
+    String taskType = getTaskType(taskName);
+    WorkflowContext workflowContext = _taskDriver.getWorkflowContext(getHelixJobQueueName(taskType));
+    if (workflowContext == null) {
+      throw new UnknownTaskTypeException("Workflow context for task type doesn't exist: " + taskType);
+    }
+    String helixJobName = getHelixJobName(taskName);
+    JobContext jobContext = _taskDriver.getJobContext(helixJobName);
+    if (jobContext == null) {
+      throw new NoTaskScheduledException("No task scheduled with name: " + helixJobName);
+    }
+    Set<String> selectedSubtasks = new HashSet<>();
+    if (StringUtils.isNotEmpty(subtaskNames)) {
+      Collections.addAll(selectedSubtasks, StringUtils.split(subtaskNames, ','));

Review Comment:
   nit: define a constant variable for the delimiter `,`? 



##########
pinot-minion/src/main/java/org/apache/pinot/minion/BaseMinionStarter.java:
##########
@@ -107,6 +112,9 @@ public void init(PinotConfiguration config)
     MinionTaskZkMetadataManager minionTaskZkMetadataManager = new MinionTaskZkMetadataManager(_helixManager);
     _taskExecutorFactoryRegistry = new TaskExecutorFactoryRegistry(minionTaskZkMetadataManager, _config);
     _eventObserverFactoryRegistry = new EventObserverFactoryRegistry(minionTaskZkMetadataManager);
+    _executorService =
+        Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("async-task-thread-%d").build());

Review Comment:
   maybe we can name it "event-observer-task-thread-%d" to make it easy to debug?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java:
##########
@@ -456,6 +461,75 @@ public synchronized Map<String, PinotTaskConfig> getSubtaskConfigs(String taskNa
     return taskConfigs;
   }
 
+  public synchronized Map<String, String> getSubtaskProgress(String taskName, @Nullable String subtaskNames,
+      Executor executor, HttpConnectionManager connMgr, Map<String, String> workerEndpoints,
+      Map<String, String> requestHeaders, int timeoutMs)
+      throws Exception {
+    String taskType = getTaskType(taskName);
+    WorkflowContext workflowContext = _taskDriver.getWorkflowContext(getHelixJobQueueName(taskType));
+    if (workflowContext == null) {
+      throw new UnknownTaskTypeException("Workflow context for task type doesn't exist: " + taskType);
+    }
+    String helixJobName = getHelixJobName(taskName);
+    JobContext jobContext = _taskDriver.getJobContext(helixJobName);
+    if (jobContext == null) {
+      throw new NoTaskScheduledException("No task scheduled with name: " + helixJobName);
+    }
+    Set<String> selectedSubtasks = new HashSet<>();
+    if (StringUtils.isNotEmpty(subtaskNames)) {
+      Collections.addAll(selectedSubtasks, StringUtils.split(subtaskNames, ','));
+    }
+    Map<String, String> allSubtaskWorkerMap = new HashMap<>();
+    Map<String, Set<String>> workerSelectedSubtasksMap = new HashMap<>();
+    for (int partition : jobContext.getPartitionSet()) {
+      String subtaskName = jobContext.getTaskIdForPartition(partition);
+      String worker = jobContext.getAssignedParticipant(partition);
+      allSubtaskWorkerMap.put(subtaskName, worker);
+      if (selectedSubtasks.isEmpty() || selectedSubtasks.contains(subtaskName)) {
+        workerSelectedSubtasksMap.computeIfAbsent(worker, k -> new HashSet<>()).add(subtaskName);
+      }
+    }
+    LOGGER.debug("Found subtasks on workers: {}", workerSelectedSubtasksMap);
+    List<String> workerUrls = new ArrayList<>();
+    workerSelectedSubtasksMap.forEach((workerId, subtasksOnWorker) -> workerUrls.add(String
+        .format("%s/tasks/subtask/progress?subtaskNames=%s", workerEndpoints.get(workerId),
+            StringUtils.join(subtasksOnWorker, ","))));
+    LOGGER.debug("Getting task progress with workerUrls: {}", workerUrls);
+    // Scatter and gather progress from multiple workers.
+    Map<String, String> subtaskProgressMap = new HashMap<>();
+    CompletionServiceHelper completionServiceHelper =
+        new CompletionServiceHelper(executor, connMgr, HashBiMap.create(0));
+    CompletionServiceHelper.CompletionServiceResponse serviceResponse =
+        completionServiceHelper.doMultiGetRequest(workerUrls, null, true, requestHeaders, timeoutMs);
+    for (Map.Entry<String, String> entry : serviceResponse._httpResponses.entrySet()) {
+      String worker = entry.getKey();
+      String resp = entry.getValue();
+      LOGGER.debug("Got resp: {} from worker: {}", resp, worker);
+      if (StringUtils.isNotEmpty(resp)) {
+        subtaskProgressMap.putAll(JsonUtils.stringToObject(resp, Map.class));
+      }
+    }
+    // Check if any subtask missed their progress from the worker.
+    for (String subtaskName : selectedSubtasks) {
+      if (subtaskProgressMap.containsKey(subtaskName)) {
+        continue;
+      }
+      String worker = allSubtaskWorkerMap.get(subtaskName);
+      if (StringUtils.isEmpty(worker)) {
+        subtaskProgressMap.put(subtaskName, "No worker has run this subtask");
+      } else {
+        subtaskProgressMap.put(subtaskName, "No progress from worker: " + worker);

Review Comment:
   I feel we need to further break this part into different cases: (1) task assigned but has not been started (2) task finished but progress is deleted due to retention or minion restart.
   
   No progress seems not clear enough to me. What do you think?



##########
pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionEventObservers.java:
##########
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.minion.event;
+
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.pinot.minion.MinionConf;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class MinionEventObservers {
+  private static final Logger LOGGER = LoggerFactory.getLogger(MinionEventObservers.class);
+
+  private static final MinionEventObservers DEFAULT_INSTANCE = new MinionEventObservers();
+  private static volatile MinionEventObservers _customInstance = null;
+
+  private final Map<String, MinionEventObserver> _taskEventObservers = new ConcurrentHashMap<>();
+  // Tasks are added roughly in time order, so use LinkedList instead of PriorityQueue for simplicity.
+  private final Queue<EndedTask> _endedTaskQueue = new LinkedList<>();
+  private final ReentrantLock _queueLock = new ReentrantLock();
+  private final Condition _availableToClean = _queueLock.newCondition();

Review Comment:
   I feel that we can remove the synchronization logic by using (1) use `ConcurrentLinkedList`, and (2) run clean up periodically (since the clean up time does not need to be accurate).



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskExecutor.java:
##########
@@ -113,6 +118,9 @@ public Object executeTask(PinotTaskConfig pinotTaskConfig)
     SegmentGenerationAndPushResult.Builder resultBuilder = new SegmentGenerationAndPushResult.Builder();
     File localTempDir = new File(new File(MinionContext.getInstance().getDataDir(), "SegmentGenerationAndPushResult"),
         "tmp-" + UUID.randomUUID());
+    _pinotTaskConfig = pinotTaskConfig;
+    _eventObserver = MinionEventObservers.getInstance().getMinionEventObserver(pinotTaskConfig.getTaskId());
+    _eventObserver.notifyProgress(pinotTaskConfig, "Task running");

Review Comment:
   I feel that we may be able to define minion progress ENUM so that progress information is standardized?



##########
pinot-minion/src/main/java/org/apache/pinot/minion/api/resources/PinotTaskProgressResource.java:
##########
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.minion.api.resources;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiKeyAuthDefinition;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import io.swagger.annotations.Authorization;
+import io.swagger.annotations.SecurityDefinition;
+import io.swagger.annotations.SwaggerDefinition;
+import java.util.HashMap;
+import java.util.Map;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.Response;
+import org.apache.commons.lang.StringUtils;
+import org.apache.pinot.minion.event.MinionEventObserver;
+import org.apache.pinot.minion.event.MinionEventObservers;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_KEY;
+
+
+/**
+ * Get finer grained progress of tasks running on the minion worker.
+ */
+@Api(tags = "Progress", authorizations = {@Authorization(value = SWAGGER_AUTHORIZATION_KEY)})
+@SwaggerDefinition(securityDefinition = @SecurityDefinition(apiKeyAuthDefinitions = @ApiKeyAuthDefinition(name =
+    HttpHeaders.AUTHORIZATION, in = ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key = SWAGGER_AUTHORIZATION_KEY)))
+@Path("/")
+public class PinotTaskProgressResource {
+  private static final Logger LOGGER = LoggerFactory.getLogger(PinotTaskProgressResource.class);
+
+  @GET
+  @Path("/tasks/subtask/progress")
+  @ApiOperation("Get finer grained task progress tracked in memory for the given subtasks")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 500, message = "Internal server error")
+  })
+  public Object getSubtaskProgress(
+      @ApiParam(value = "Sub task names separated by comma") @QueryParam("subtaskNames") String subtaskNames) {
+    try {
+      LOGGER.debug("Get progress for subtasks: {}", subtaskNames);
+      Map<String, Object> progress = new HashMap<>();
+      for (String subtaskName : StringUtils.split(subtaskNames, ',')) {

Review Comment:
   nit: use the same constant variable if we decide to add it.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java:
##########
@@ -456,6 +461,75 @@ public synchronized Map<String, PinotTaskConfig> getSubtaskConfigs(String taskNa
     return taskConfigs;
   }
 
+  public synchronized Map<String, String> getSubtaskProgress(String taskName, @Nullable String subtaskNames,
+      Executor executor, HttpConnectionManager connMgr, Map<String, String> workerEndpoints,
+      Map<String, String> requestHeaders, int timeoutMs)
+      throws Exception {
+    String taskType = getTaskType(taskName);
+    WorkflowContext workflowContext = _taskDriver.getWorkflowContext(getHelixJobQueueName(taskType));
+    if (workflowContext == null) {
+      throw new UnknownTaskTypeException("Workflow context for task type doesn't exist: " + taskType);
+    }
+    String helixJobName = getHelixJobName(taskName);
+    JobContext jobContext = _taskDriver.getJobContext(helixJobName);
+    if (jobContext == null) {
+      throw new NoTaskScheduledException("No task scheduled with name: " + helixJobName);
+    }
+    Set<String> selectedSubtasks = new HashSet<>();
+    if (StringUtils.isNotEmpty(subtaskNames)) {
+      Collections.addAll(selectedSubtasks, StringUtils.split(subtaskNames, ','));
+    }
+    Map<String, String> allSubtaskWorkerMap = new HashMap<>();
+    Map<String, Set<String>> workerSelectedSubtasksMap = new HashMap<>();
+    for (int partition : jobContext.getPartitionSet()) {
+      String subtaskName = jobContext.getTaskIdForPartition(partition);
+      String worker = jobContext.getAssignedParticipant(partition);
+      allSubtaskWorkerMap.put(subtaskName, worker);
+      if (selectedSubtasks.isEmpty() || selectedSubtasks.contains(subtaskName)) {
+        workerSelectedSubtasksMap.computeIfAbsent(worker, k -> new HashSet<>()).add(subtaskName);
+      }
+    }
+    LOGGER.debug("Found subtasks on workers: {}", workerSelectedSubtasksMap);
+    List<String> workerUrls = new ArrayList<>();
+    workerSelectedSubtasksMap.forEach((workerId, subtasksOnWorker) -> workerUrls.add(String
+        .format("%s/tasks/subtask/progress?subtaskNames=%s", workerEndpoints.get(workerId),
+            StringUtils.join(subtasksOnWorker, ","))));
+    LOGGER.debug("Getting task progress with workerUrls: {}", workerUrls);
+    // Scatter and gather progress from multiple workers.
+    Map<String, String> subtaskProgressMap = new HashMap<>();
+    CompletionServiceHelper completionServiceHelper =
+        new CompletionServiceHelper(executor, connMgr, HashBiMap.create(0));
+    CompletionServiceHelper.CompletionServiceResponse serviceResponse =
+        completionServiceHelper.doMultiGetRequest(workerUrls, null, true, requestHeaders, timeoutMs);
+    for (Map.Entry<String, String> entry : serviceResponse._httpResponses.entrySet()) {
+      String worker = entry.getKey();
+      String resp = entry.getValue();
+      LOGGER.debug("Got resp: {} from worker: {}", resp, worker);
+      if (StringUtils.isNotEmpty(resp)) {
+        subtaskProgressMap.putAll(JsonUtils.stringToObject(resp, Map.class));
+      }
+    }
+    // Check if any subtask missed their progress from the worker.
+    for (String subtaskName : selectedSubtasks) {
+      if (subtaskProgressMap.containsKey(subtaskName)) {
+        continue;
+      }
+      String worker = allSubtaskWorkerMap.get(subtaskName);
+      if (StringUtils.isEmpty(worker)) {
+        subtaskProgressMap.put(subtaskName, "No worker has run this subtask");
+      } else {
+        subtaskProgressMap.put(subtaskName, "No progress from worker: " + worker);
+      }
+    }
+    // Raise error if any worker failed to report progress, with partial result.
+    if (serviceResponse._failedResponseCount > 0) {
+      throw new RuntimeException(String
+          .format("There were %d workers failed to report task progress. Got partial progress info: %s",
+              serviceResponse._failedResponseCount, subtaskProgressMap));

Review Comment:
   I feel there is another choice of passing this information back: instead of throwing exception, we can return normally, but the returned message has the worker failure information. WDYT?



##########
pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionProgressObserver.java:
##########
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.minion.event;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import javax.annotation.Nullable;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A minion event observer that can track task progress status in memory.
+ */
+public class MinionProgressObserver implements MinionEventObserver {

Review Comment:
   nit: is it possible that synchronization is needed in the future?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org