You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2020/10/06 10:26:32 UTC

[GitHub] [druid] liran-funaro commented on a change in pull request #10376: Live reporting system for parallel task

liran-funaro commented on a change in pull request #10376:
URL: https://github.com/apache/druid/pull/10376#discussion_r500166537



##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.java
##########
@@ -288,28 +297,48 @@ private void stopInternal()
   }
 
   @Override
-  public void collectReport(SubTaskReportType report)
+  public void collectLiveReport(RunningSubtaskReport report)
   {
-    // subTasks might send their reports multiple times because of the HTTP retry.
+    liveReportsMap.compute(report.getTaskId(), (taskId, prevReportCreatedTime) -> {
+      if (prevReportCreatedTime == null || prevReportCreatedTime != report.getCreatedTimeNs()) {
+        // TODO: the metrics in the report will be processed here.
+      }
+      taskMonitor.statusReport(report.getTaskId(), report.getState());
+      return report.getCreatedTimeNs();
+    });
+  }
+
+  @Override
+  public void collectReport(FinalReportType report)
+  {
+    // Even though each subtask is supposed to send its final report only once, supervisor task might receive
+    // the same report multiple times because of the HTTP retry.
     // Here, we simply make sure the current report is exactly same with the previous one.
-    reportsMap.compute(report.getTaskId(), (taskId, prevReport) -> {
-      if (prevReport != null) {
+    finalReportsMap.compute(report.getTaskId(), (taskId, prevReport) -> {
+      if (prevReport == null) {
+        // TODO: the metrics in the report will be processed here.

Review comment:
       What kind of processing is required here that wasn't needed before?
   Same question for `collectLiveReport()`?

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitor.java
##########
@@ -100,73 +100,84 @@
   @GuardedBy("startStopLock")
   private boolean running = false;
 
-  TaskMonitor(IndexingServiceClient indexingServiceClient, int maxRetry, int estimatedNumSucceededTasks)
+  TaskMonitor(IndexingServiceClient indexingServiceClient, int maxSubtaskRetries, int estimatedNumSucceededTasks)
   {
     this.indexingServiceClient = Preconditions.checkNotNull(indexingServiceClient, "indexingServiceClient");
-    this.maxRetry = maxRetry;
+    this.maxSubtaskRetries = maxSubtaskRetries;
     this.estimatedNumSucceededTasks = estimatedNumSucceededTasks;
 
     log.info("TaskMonitor is initialized with estimatedNumSucceededTasks[%d]", estimatedNumSucceededTasks);
   }
 
-  public void start(long taskStatusCheckingPeriod)
+  public void start(long taskStatusCheckPeriodMs, long liveReportTimeoutMs)
   {
+    final long liveReportTimeoutNs = TimeUnit.MILLISECONDS.toNanos(liveReportTimeoutMs);
     synchronized (startStopLock) {
       running = true;
       log.info("Starting taskMonitor");
-      // NOTE: This polling can be improved to event-driven pushing by registering TaskRunnerListener to TaskRunner.
-      // That listener should be able to send the events reported to TaskRunner to this TaskMonitor.
+      // In Parallel task, subtasks periodically report their states with metrics. However, this could not be
+      // enough for monitoring subtask status because the report can be missing or even wrong for various reasons
+      // in distributed systems. TaskMonitor always checks the final status of subtask with the Overlord where
+      // is the source of truth for task statuses.
       taskStatusChecker.scheduleAtFixedRate(
           () -> {
             try {
               final Iterator<Entry<String, MonitorEntry>> iterator = runningTasks.entrySet().iterator();
               while (iterator.hasNext()) {
                 final Entry<String, MonitorEntry> entry = iterator.next();
-                final String specId = entry.getKey();
+                final String taskId = entry.getKey();
                 final MonitorEntry monitorEntry = entry.getValue();
-                final String taskId = monitorEntry.runningTask.getId();
-                final TaskStatusResponse taskStatusResponse = indexingServiceClient.getTaskStatus(taskId);
-                final TaskStatusPlus taskStatus = taskStatusResponse.getStatus();
-                if (taskStatus != null) {
-                  switch (Preconditions.checkNotNull(taskStatus.getStatusCode(), "taskState")) {
-                    case SUCCESS:
-                      incrementNumSucceededTasks();
-
-                      // Remote the current entry after updating taskHistories to make sure that task history
-                      // exists either runningTasks or taskHistories.
-                      monitorEntry.setLastStatus(taskStatus);
-                      iterator.remove();
-                      break;
-                    case FAILED:
-                      incrementNumFailedTasks();
-
-                      log.warn("task[%s] failed!", taskId);
-                      if (monitorEntry.numTries() < maxRetry) {
-                        log.info(
-                            "We still have more chances[%d/%d] to process the spec[%s].",
-                            monitorEntry.numTries(),
-                            maxRetry,
-                            monitorEntry.spec.getId()
-                        );
-                        retry(specId, monitorEntry, taskStatus);
-                      } else {
-                        log.error(
-                            "spec[%s] failed after [%d] tries",
-                            monitorEntry.spec.getId(),
-                            monitorEntry.numTries()
-                        );
-                        // Remote the current entry after updating taskHistories to make sure that task history
+
+                // We here measure the current time for individual subtask because it could take long time to talk to
+                // the Overlord.
+                if (monitorEntry.needStatusCheck(System.nanoTime(), liveReportTimeoutNs)) {
+                  final TaskStatusResponse taskStatusResponse = indexingServiceClient.getTaskStatus(taskId);
+                  final TaskStatusPlus taskStatus = taskStatusResponse.getStatus();
+                  if (taskStatus != null) {
+                    switch (Preconditions.checkNotNull(taskStatus.getStatusCode(), "taskState")) {
+                      case SUCCESS:
+                        incrementNumSucceededTasks();
+
+                        // Remove the current entry after updating taskHistories to make sure that task history
                         // exists either runningTasks or taskHistories.
                         monitorEntry.setLastStatus(taskStatus);
                         iterator.remove();
-                      }
-                      break;
-                    case RUNNING:
-                      monitorEntry.updateStatus(taskStatus);
-                      break;
-                    default:
-                      throw new ISE("Unknown taskStatus[%s] for task[%s[", taskStatus.getStatusCode(), taskId);
+                        break;
+                      case FAILED:
+                        incrementNumFailedTasks();
+
+                        log.warn("task[%s] failed!", taskId);

Review comment:
       `taskId` or `specId`?

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitor.java
##########
@@ -100,73 +100,84 @@
   @GuardedBy("startStopLock")
   private boolean running = false;
 
-  TaskMonitor(IndexingServiceClient indexingServiceClient, int maxRetry, int estimatedNumSucceededTasks)
+  TaskMonitor(IndexingServiceClient indexingServiceClient, int maxSubtaskRetries, int estimatedNumSucceededTasks)
   {
     this.indexingServiceClient = Preconditions.checkNotNull(indexingServiceClient, "indexingServiceClient");
-    this.maxRetry = maxRetry;
+    this.maxSubtaskRetries = maxSubtaskRetries;
     this.estimatedNumSucceededTasks = estimatedNumSucceededTasks;
 
     log.info("TaskMonitor is initialized with estimatedNumSucceededTasks[%d]", estimatedNumSucceededTasks);
   }
 
-  public void start(long taskStatusCheckingPeriod)
+  public void start(long taskStatusCheckPeriodMs, long liveReportTimeoutMs)
   {
+    final long liveReportTimeoutNs = TimeUnit.MILLISECONDS.toNanos(liveReportTimeoutMs);
     synchronized (startStopLock) {
       running = true;
       log.info("Starting taskMonitor");
-      // NOTE: This polling can be improved to event-driven pushing by registering TaskRunnerListener to TaskRunner.
-      // That listener should be able to send the events reported to TaskRunner to this TaskMonitor.
+      // In Parallel task, subtasks periodically report their states with metrics. However, this could not be
+      // enough for monitoring subtask status because the report can be missing or even wrong for various reasons
+      // in distributed systems. TaskMonitor always checks the final status of subtask with the Overlord where
+      // is the source of truth for task statuses.
       taskStatusChecker.scheduleAtFixedRate(
           () -> {
             try {
               final Iterator<Entry<String, MonitorEntry>> iterator = runningTasks.entrySet().iterator();
               while (iterator.hasNext()) {
                 final Entry<String, MonitorEntry> entry = iterator.next();
-                final String specId = entry.getKey();
+                final String taskId = entry.getKey();
                 final MonitorEntry monitorEntry = entry.getValue();
-                final String taskId = monitorEntry.runningTask.getId();
-                final TaskStatusResponse taskStatusResponse = indexingServiceClient.getTaskStatus(taskId);
-                final TaskStatusPlus taskStatus = taskStatusResponse.getStatus();
-                if (taskStatus != null) {
-                  switch (Preconditions.checkNotNull(taskStatus.getStatusCode(), "taskState")) {
-                    case SUCCESS:
-                      incrementNumSucceededTasks();
-
-                      // Remote the current entry after updating taskHistories to make sure that task history
-                      // exists either runningTasks or taskHistories.
-                      monitorEntry.setLastStatus(taskStatus);
-                      iterator.remove();
-                      break;
-                    case FAILED:
-                      incrementNumFailedTasks();
-
-                      log.warn("task[%s] failed!", taskId);
-                      if (monitorEntry.numTries() < maxRetry) {
-                        log.info(
-                            "We still have more chances[%d/%d] to process the spec[%s].",
-                            monitorEntry.numTries(),
-                            maxRetry,
-                            monitorEntry.spec.getId()
-                        );
-                        retry(specId, monitorEntry, taskStatus);
-                      } else {
-                        log.error(
-                            "spec[%s] failed after [%d] tries",
-                            monitorEntry.spec.getId(),
-                            monitorEntry.numTries()
-                        );
-                        // Remote the current entry after updating taskHistories to make sure that task history
+
+                // We here measure the current time for individual subtask because it could take long time to talk to
+                // the Overlord.
+                if (monitorEntry.needStatusCheck(System.nanoTime(), liveReportTimeoutNs)) {
+                  final TaskStatusResponse taskStatusResponse = indexingServiceClient.getTaskStatus(taskId);

Review comment:
       This line wasn't modified, but `taskId` is used here albeit is now what was `specId` before this PR.
   Is this intentional? Is `specId` equals `taskId`?
   See similar issue below.

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
##########
@@ -379,24 +419,21 @@ public Granularity getSegmentGranularity()
       pushedSegments.addAll(pushed.getSegments());
       LOG.info("Pushed [%s] segments", pushed.getSegments().size());
       LOG.infoSegments(pushed.getSegments(), "Pushed segments");
-      appenderator.close();

Review comment:
       Nice catch!

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
##########
@@ -144,6 +145,8 @@
 
   private final ConcurrentHashMap<Interval, AtomicInteger> partitionNumCountersPerInterval = new ConcurrentHashMap<>();
 
+  private final List<ParallelIndexTaskRunner<?, ?>> phaseRunners = new ArrayList<>();

Review comment:
       I don't see anywhere that these `phaseRunners` are being reused. What is the justification for keeping them in this array?
   Below, I see that you create them anyway in each method call, but use their enumeration to fetch them from the list.
   Do you assume the order of the calls?
   
   If this array was indeed intended for reusability of the runners, I suggest replacing it with an enumeration based map (`Map<Enum, ParallelIndexTaskRunner>`) and create the runner only when it was not created before.
   

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitor.java
##########
@@ -100,73 +100,84 @@
   @GuardedBy("startStopLock")
   private boolean running = false;
 
-  TaskMonitor(IndexingServiceClient indexingServiceClient, int maxRetry, int estimatedNumSucceededTasks)
+  TaskMonitor(IndexingServiceClient indexingServiceClient, int maxSubtaskRetries, int estimatedNumSucceededTasks)
   {
     this.indexingServiceClient = Preconditions.checkNotNull(indexingServiceClient, "indexingServiceClient");
-    this.maxRetry = maxRetry;
+    this.maxSubtaskRetries = maxSubtaskRetries;
     this.estimatedNumSucceededTasks = estimatedNumSucceededTasks;
 
     log.info("TaskMonitor is initialized with estimatedNumSucceededTasks[%d]", estimatedNumSucceededTasks);
   }
 
-  public void start(long taskStatusCheckingPeriod)
+  public void start(long taskStatusCheckPeriodMs, long liveReportTimeoutMs)
   {
+    final long liveReportTimeoutNs = TimeUnit.MILLISECONDS.toNanos(liveReportTimeoutMs);
     synchronized (startStopLock) {
       running = true;
       log.info("Starting taskMonitor");
-      // NOTE: This polling can be improved to event-driven pushing by registering TaskRunnerListener to TaskRunner.
-      // That listener should be able to send the events reported to TaskRunner to this TaskMonitor.
+      // In Parallel task, subtasks periodically report their states with metrics. However, this could not be
+      // enough for monitoring subtask status because the report can be missing or even wrong for various reasons
+      // in distributed systems. TaskMonitor always checks the final status of subtask with the Overlord where
+      // is the source of truth for task statuses.
       taskStatusChecker.scheduleAtFixedRate(
           () -> {
             try {
               final Iterator<Entry<String, MonitorEntry>> iterator = runningTasks.entrySet().iterator();
               while (iterator.hasNext()) {
                 final Entry<String, MonitorEntry> entry = iterator.next();
-                final String specId = entry.getKey();
+                final String taskId = entry.getKey();
                 final MonitorEntry monitorEntry = entry.getValue();
-                final String taskId = monitorEntry.runningTask.getId();
-                final TaskStatusResponse taskStatusResponse = indexingServiceClient.getTaskStatus(taskId);
-                final TaskStatusPlus taskStatus = taskStatusResponse.getStatus();
-                if (taskStatus != null) {
-                  switch (Preconditions.checkNotNull(taskStatus.getStatusCode(), "taskState")) {
-                    case SUCCESS:
-                      incrementNumSucceededTasks();
-
-                      // Remote the current entry after updating taskHistories to make sure that task history
-                      // exists either runningTasks or taskHistories.
-                      monitorEntry.setLastStatus(taskStatus);
-                      iterator.remove();
-                      break;
-                    case FAILED:
-                      incrementNumFailedTasks();
-
-                      log.warn("task[%s] failed!", taskId);
-                      if (monitorEntry.numTries() < maxRetry) {
-                        log.info(
-                            "We still have more chances[%d/%d] to process the spec[%s].",
-                            monitorEntry.numTries(),
-                            maxRetry,
-                            monitorEntry.spec.getId()
-                        );
-                        retry(specId, monitorEntry, taskStatus);
-                      } else {
-                        log.error(
-                            "spec[%s] failed after [%d] tries",
-                            monitorEntry.spec.getId(),
-                            monitorEntry.numTries()
-                        );
-                        // Remote the current entry after updating taskHistories to make sure that task history
+
+                // We here measure the current time for individual subtask because it could take long time to talk to
+                // the Overlord.
+                if (monitorEntry.needStatusCheck(System.nanoTime(), liveReportTimeoutNs)) {
+                  final TaskStatusResponse taskStatusResponse = indexingServiceClient.getTaskStatus(taskId);
+                  final TaskStatusPlus taskStatus = taskStatusResponse.getStatus();
+                  if (taskStatus != null) {
+                    switch (Preconditions.checkNotNull(taskStatus.getStatusCode(), "taskState")) {
+                      case SUCCESS:
+                        incrementNumSucceededTasks();
+
+                        // Remove the current entry after updating taskHistories to make sure that task history
                         // exists either runningTasks or taskHistories.
                         monitorEntry.setLastStatus(taskStatus);
                         iterator.remove();
-                      }
-                      break;
-                    case RUNNING:
-                      monitorEntry.updateStatus(taskStatus);
-                      break;
-                    default:
-                      throw new ISE("Unknown taskStatus[%s] for task[%s[", taskStatus.getStatusCode(), taskId);
+                        break;
+                      case FAILED:
+                        incrementNumFailedTasks();
+
+                        log.warn("task[%s] failed!", taskId);
+                        if (monitorEntry.numTries() < maxSubtaskRetries) {
+                          log.info(
+                              "We still have more chances[%d/%d] to process the spec[%s].",
+                              monitorEntry.numTries(),
+                              maxSubtaskRetries,
+                              monitorEntry.spec.getId()
+                          );
+                          retry(monitorEntry, taskStatus);
+                        } else {
+                          log.error(
+                              "spec[%s] failed after [%d] tries",
+                              monitorEntry.spec.getId(),
+                              monitorEntry.numTries()
+                          );
+                          // Remove the current entry after updating taskHistories to make sure that task history
+                          // exists either runningTasks or taskHistories.
+                          monitorEntry.setLastStatus(taskStatus);
+                          iterator.remove();
+                        }
+                        break;
+                      case RUNNING:
+                        monitorEntry.updateRunningStatus(taskStatus);
+                        break;
+                      default:
+                        throw new ISE("Unknown taskStatus[%s] for task[%s[", taskStatus.getStatusCode(), taskId);

Review comment:
       `taskId` or `specId`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org