You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2020/09/10 01:46:13 UTC

[GitHub] [druid] jihoonson opened a new pull request #10376: Live reporting system for parallel task

jihoonson opened a new pull request #10376:
URL: https://github.com/apache/druid/pull/10376


   Part of https://github.com/apache/druid/issues/10352.
   
   ### Description
   
   This PR implements the live reporting system for parallel ingestion described in #10352. The live reporting system is used to help checking status of each subtask. During ingestion, subtasks can now periodically send a report to the supervisor task including its state and metrics (`LiveMetricsReporter`). When the supervisor task receives a report from a running subtask, it assumes that the task is still running and pauses status checking with Overlord for a while. As a result, it is expected to talk less to the Overlord compared to before which could be a bottleneck in scaling ingestion system. When the supervisor task receives a report from a finished subtask, it still checks the final status of the task with Overlord so that the final status is consistent across the whole cluster. If some reports are missing, the supervisor has a timer per subtask which triggers status checking with the Overlord. As a result, missing reports doesn't affect to the progress of the supervisor task.
  These are implemented in `TaskMonitor` and `ParallelIndexPhaseRunner`.
   
   A new interface `IngestionMetrics` was added in this PR, but it has only no-op implementation yet. I'm planning to add actual implementations as follow-ups.
   
   For reviewers, even though this PR adds more than 2000 of new lines, it should be not that hard to review as the new codes are mostly about new POJO classes for different report types.
   
   <hr>
   
   This PR has:
   - [x] been self-reviewed.
      - [ ] using the [concurrency checklist](https://github.com/apache/druid/blob/master/dev/code-review/concurrency.md) (Remove this item if the PR doesn't have any relation to concurrency.)
   - [ ] added documentation for new or modified features or behaviors.
   - [x] added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
   - [ ] added or updated version, license, or notice information in [licenses.yaml](https://github.com/apache/druid/blob/master/licenses.yaml)
   - [x] added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
   - [x] added unit tests or modified existing tests to cover new code paths, ensuring the threshold for [code coverage](https://github.com/apache/druid/blob/master/dev/code-review/code-coverage.md) is met.
   - [ ] added integration tests.
   - [ ] been tested in a test Druid cluster.
   
   <!-- Check the items by putting "x" in the brackets for the done things. Not all of these items apply to every PR. Remove the items which are not done or not relevant to the PR. None of the items from the checklist above are strictly necessary, but it would be very helpful if you at least self-review the PR. -->
   
   <hr>
   
   ##### Key changed/added classes in this PR
    * `LiveMetricsReporter`
    * `ParallelIndexPhaseRunner`
    * `TaskMonitor`
    * `IngestionMetrics`


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] liran-funaro commented on a change in pull request #10376: Live reporting system for parallel task

Posted by GitBox <gi...@apache.org>.
liran-funaro commented on a change in pull request #10376:
URL: https://github.com/apache/druid/pull/10376#discussion_r500166537



##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.java
##########
@@ -288,28 +297,48 @@ private void stopInternal()
   }
 
   @Override
-  public void collectReport(SubTaskReportType report)
+  public void collectLiveReport(RunningSubtaskReport report)
   {
-    // subTasks might send their reports multiple times because of the HTTP retry.
+    liveReportsMap.compute(report.getTaskId(), (taskId, prevReportCreatedTime) -> {
+      if (prevReportCreatedTime == null || prevReportCreatedTime != report.getCreatedTimeNs()) {
+        // TODO: the metrics in the report will be processed here.
+      }
+      taskMonitor.statusReport(report.getTaskId(), report.getState());
+      return report.getCreatedTimeNs();
+    });
+  }
+
+  @Override
+  public void collectReport(FinalReportType report)
+  {
+    // Even though each subtask is supposed to send its final report only once, supervisor task might receive
+    // the same report multiple times because of the HTTP retry.
     // Here, we simply make sure the current report is exactly same with the previous one.
-    reportsMap.compute(report.getTaskId(), (taskId, prevReport) -> {
-      if (prevReport != null) {
+    finalReportsMap.compute(report.getTaskId(), (taskId, prevReport) -> {
+      if (prevReport == null) {
+        // TODO: the metrics in the report will be processed here.

Review comment:
       What kind of processing is required here that wasn't needed before?
   Same question for `collectLiveReport()`?

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitor.java
##########
@@ -100,73 +100,84 @@
   @GuardedBy("startStopLock")
   private boolean running = false;
 
-  TaskMonitor(IndexingServiceClient indexingServiceClient, int maxRetry, int estimatedNumSucceededTasks)
+  TaskMonitor(IndexingServiceClient indexingServiceClient, int maxSubtaskRetries, int estimatedNumSucceededTasks)
   {
     this.indexingServiceClient = Preconditions.checkNotNull(indexingServiceClient, "indexingServiceClient");
-    this.maxRetry = maxRetry;
+    this.maxSubtaskRetries = maxSubtaskRetries;
     this.estimatedNumSucceededTasks = estimatedNumSucceededTasks;
 
     log.info("TaskMonitor is initialized with estimatedNumSucceededTasks[%d]", estimatedNumSucceededTasks);
   }
 
-  public void start(long taskStatusCheckingPeriod)
+  public void start(long taskStatusCheckPeriodMs, long liveReportTimeoutMs)
   {
+    final long liveReportTimeoutNs = TimeUnit.MILLISECONDS.toNanos(liveReportTimeoutMs);
     synchronized (startStopLock) {
       running = true;
       log.info("Starting taskMonitor");
-      // NOTE: This polling can be improved to event-driven pushing by registering TaskRunnerListener to TaskRunner.
-      // That listener should be able to send the events reported to TaskRunner to this TaskMonitor.
+      // In Parallel task, subtasks periodically report their states with metrics. However, this could not be
+      // enough for monitoring subtask status because the report can be missing or even wrong for various reasons
+      // in distributed systems. TaskMonitor always checks the final status of subtask with the Overlord where
+      // is the source of truth for task statuses.
       taskStatusChecker.scheduleAtFixedRate(
           () -> {
             try {
               final Iterator<Entry<String, MonitorEntry>> iterator = runningTasks.entrySet().iterator();
               while (iterator.hasNext()) {
                 final Entry<String, MonitorEntry> entry = iterator.next();
-                final String specId = entry.getKey();
+                final String taskId = entry.getKey();
                 final MonitorEntry monitorEntry = entry.getValue();
-                final String taskId = monitorEntry.runningTask.getId();
-                final TaskStatusResponse taskStatusResponse = indexingServiceClient.getTaskStatus(taskId);
-                final TaskStatusPlus taskStatus = taskStatusResponse.getStatus();
-                if (taskStatus != null) {
-                  switch (Preconditions.checkNotNull(taskStatus.getStatusCode(), "taskState")) {
-                    case SUCCESS:
-                      incrementNumSucceededTasks();
-
-                      // Remote the current entry after updating taskHistories to make sure that task history
-                      // exists either runningTasks or taskHistories.
-                      monitorEntry.setLastStatus(taskStatus);
-                      iterator.remove();
-                      break;
-                    case FAILED:
-                      incrementNumFailedTasks();
-
-                      log.warn("task[%s] failed!", taskId);
-                      if (monitorEntry.numTries() < maxRetry) {
-                        log.info(
-                            "We still have more chances[%d/%d] to process the spec[%s].",
-                            monitorEntry.numTries(),
-                            maxRetry,
-                            monitorEntry.spec.getId()
-                        );
-                        retry(specId, monitorEntry, taskStatus);
-                      } else {
-                        log.error(
-                            "spec[%s] failed after [%d] tries",
-                            monitorEntry.spec.getId(),
-                            monitorEntry.numTries()
-                        );
-                        // Remote the current entry after updating taskHistories to make sure that task history
+
+                // We here measure the current time for individual subtask because it could take long time to talk to
+                // the Overlord.
+                if (monitorEntry.needStatusCheck(System.nanoTime(), liveReportTimeoutNs)) {
+                  final TaskStatusResponse taskStatusResponse = indexingServiceClient.getTaskStatus(taskId);
+                  final TaskStatusPlus taskStatus = taskStatusResponse.getStatus();
+                  if (taskStatus != null) {
+                    switch (Preconditions.checkNotNull(taskStatus.getStatusCode(), "taskState")) {
+                      case SUCCESS:
+                        incrementNumSucceededTasks();
+
+                        // Remove the current entry after updating taskHistories to make sure that task history
                         // exists either runningTasks or taskHistories.
                         monitorEntry.setLastStatus(taskStatus);
                         iterator.remove();
-                      }
-                      break;
-                    case RUNNING:
-                      monitorEntry.updateStatus(taskStatus);
-                      break;
-                    default:
-                      throw new ISE("Unknown taskStatus[%s] for task[%s[", taskStatus.getStatusCode(), taskId);
+                        break;
+                      case FAILED:
+                        incrementNumFailedTasks();
+
+                        log.warn("task[%s] failed!", taskId);

Review comment:
       `taskId` or `specId`?

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitor.java
##########
@@ -100,73 +100,84 @@
   @GuardedBy("startStopLock")
   private boolean running = false;
 
-  TaskMonitor(IndexingServiceClient indexingServiceClient, int maxRetry, int estimatedNumSucceededTasks)
+  TaskMonitor(IndexingServiceClient indexingServiceClient, int maxSubtaskRetries, int estimatedNumSucceededTasks)
   {
     this.indexingServiceClient = Preconditions.checkNotNull(indexingServiceClient, "indexingServiceClient");
-    this.maxRetry = maxRetry;
+    this.maxSubtaskRetries = maxSubtaskRetries;
     this.estimatedNumSucceededTasks = estimatedNumSucceededTasks;
 
     log.info("TaskMonitor is initialized with estimatedNumSucceededTasks[%d]", estimatedNumSucceededTasks);
   }
 
-  public void start(long taskStatusCheckingPeriod)
+  public void start(long taskStatusCheckPeriodMs, long liveReportTimeoutMs)
   {
+    final long liveReportTimeoutNs = TimeUnit.MILLISECONDS.toNanos(liveReportTimeoutMs);
     synchronized (startStopLock) {
       running = true;
       log.info("Starting taskMonitor");
-      // NOTE: This polling can be improved to event-driven pushing by registering TaskRunnerListener to TaskRunner.
-      // That listener should be able to send the events reported to TaskRunner to this TaskMonitor.
+      // In Parallel task, subtasks periodically report their states with metrics. However, this could not be
+      // enough for monitoring subtask status because the report can be missing or even wrong for various reasons
+      // in distributed systems. TaskMonitor always checks the final status of subtask with the Overlord where
+      // is the source of truth for task statuses.
       taskStatusChecker.scheduleAtFixedRate(
           () -> {
             try {
               final Iterator<Entry<String, MonitorEntry>> iterator = runningTasks.entrySet().iterator();
               while (iterator.hasNext()) {
                 final Entry<String, MonitorEntry> entry = iterator.next();
-                final String specId = entry.getKey();
+                final String taskId = entry.getKey();
                 final MonitorEntry monitorEntry = entry.getValue();
-                final String taskId = monitorEntry.runningTask.getId();
-                final TaskStatusResponse taskStatusResponse = indexingServiceClient.getTaskStatus(taskId);
-                final TaskStatusPlus taskStatus = taskStatusResponse.getStatus();
-                if (taskStatus != null) {
-                  switch (Preconditions.checkNotNull(taskStatus.getStatusCode(), "taskState")) {
-                    case SUCCESS:
-                      incrementNumSucceededTasks();
-
-                      // Remote the current entry after updating taskHistories to make sure that task history
-                      // exists either runningTasks or taskHistories.
-                      monitorEntry.setLastStatus(taskStatus);
-                      iterator.remove();
-                      break;
-                    case FAILED:
-                      incrementNumFailedTasks();
-
-                      log.warn("task[%s] failed!", taskId);
-                      if (monitorEntry.numTries() < maxRetry) {
-                        log.info(
-                            "We still have more chances[%d/%d] to process the spec[%s].",
-                            monitorEntry.numTries(),
-                            maxRetry,
-                            monitorEntry.spec.getId()
-                        );
-                        retry(specId, monitorEntry, taskStatus);
-                      } else {
-                        log.error(
-                            "spec[%s] failed after [%d] tries",
-                            monitorEntry.spec.getId(),
-                            monitorEntry.numTries()
-                        );
-                        // Remote the current entry after updating taskHistories to make sure that task history
+
+                // We here measure the current time for individual subtask because it could take long time to talk to
+                // the Overlord.
+                if (monitorEntry.needStatusCheck(System.nanoTime(), liveReportTimeoutNs)) {
+                  final TaskStatusResponse taskStatusResponse = indexingServiceClient.getTaskStatus(taskId);

Review comment:
       This line wasn't modified, but `taskId` is used here albeit is now what was `specId` before this PR.
   Is this intentional? Is `specId` equals `taskId`?
   See similar issue below.

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
##########
@@ -379,24 +419,21 @@ public Granularity getSegmentGranularity()
       pushedSegments.addAll(pushed.getSegments());
       LOG.info("Pushed [%s] segments", pushed.getSegments().size());
       LOG.infoSegments(pushed.getSegments(), "Pushed segments");
-      appenderator.close();

Review comment:
       Nice catch!

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
##########
@@ -144,6 +145,8 @@
 
   private final ConcurrentHashMap<Interval, AtomicInteger> partitionNumCountersPerInterval = new ConcurrentHashMap<>();
 
+  private final List<ParallelIndexTaskRunner<?, ?>> phaseRunners = new ArrayList<>();

Review comment:
       I don't see anywhere that these `phaseRunners` are being reused. What is the justification for keeping them in this array?
   Below, I see that you create them anyway in each method call, but use their enumeration to fetch them from the list.
   Do you assume the order of the calls?
   
   If this array was indeed intended for reusability of the runners, I suggest replacing it with an enumeration based map (`Map<Enum, ParallelIndexTaskRunner>`) and create the runner only when it was not created before.
   

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitor.java
##########
@@ -100,73 +100,84 @@
   @GuardedBy("startStopLock")
   private boolean running = false;
 
-  TaskMonitor(IndexingServiceClient indexingServiceClient, int maxRetry, int estimatedNumSucceededTasks)
+  TaskMonitor(IndexingServiceClient indexingServiceClient, int maxSubtaskRetries, int estimatedNumSucceededTasks)
   {
     this.indexingServiceClient = Preconditions.checkNotNull(indexingServiceClient, "indexingServiceClient");
-    this.maxRetry = maxRetry;
+    this.maxSubtaskRetries = maxSubtaskRetries;
     this.estimatedNumSucceededTasks = estimatedNumSucceededTasks;
 
     log.info("TaskMonitor is initialized with estimatedNumSucceededTasks[%d]", estimatedNumSucceededTasks);
   }
 
-  public void start(long taskStatusCheckingPeriod)
+  public void start(long taskStatusCheckPeriodMs, long liveReportTimeoutMs)
   {
+    final long liveReportTimeoutNs = TimeUnit.MILLISECONDS.toNanos(liveReportTimeoutMs);
     synchronized (startStopLock) {
       running = true;
       log.info("Starting taskMonitor");
-      // NOTE: This polling can be improved to event-driven pushing by registering TaskRunnerListener to TaskRunner.
-      // That listener should be able to send the events reported to TaskRunner to this TaskMonitor.
+      // In Parallel task, subtasks periodically report their states with metrics. However, this could not be
+      // enough for monitoring subtask status because the report can be missing or even wrong for various reasons
+      // in distributed systems. TaskMonitor always checks the final status of subtask with the Overlord where
+      // is the source of truth for task statuses.
       taskStatusChecker.scheduleAtFixedRate(
           () -> {
             try {
               final Iterator<Entry<String, MonitorEntry>> iterator = runningTasks.entrySet().iterator();
               while (iterator.hasNext()) {
                 final Entry<String, MonitorEntry> entry = iterator.next();
-                final String specId = entry.getKey();
+                final String taskId = entry.getKey();
                 final MonitorEntry monitorEntry = entry.getValue();
-                final String taskId = monitorEntry.runningTask.getId();
-                final TaskStatusResponse taskStatusResponse = indexingServiceClient.getTaskStatus(taskId);
-                final TaskStatusPlus taskStatus = taskStatusResponse.getStatus();
-                if (taskStatus != null) {
-                  switch (Preconditions.checkNotNull(taskStatus.getStatusCode(), "taskState")) {
-                    case SUCCESS:
-                      incrementNumSucceededTasks();
-
-                      // Remote the current entry after updating taskHistories to make sure that task history
-                      // exists either runningTasks or taskHistories.
-                      monitorEntry.setLastStatus(taskStatus);
-                      iterator.remove();
-                      break;
-                    case FAILED:
-                      incrementNumFailedTasks();
-
-                      log.warn("task[%s] failed!", taskId);
-                      if (monitorEntry.numTries() < maxRetry) {
-                        log.info(
-                            "We still have more chances[%d/%d] to process the spec[%s].",
-                            monitorEntry.numTries(),
-                            maxRetry,
-                            monitorEntry.spec.getId()
-                        );
-                        retry(specId, monitorEntry, taskStatus);
-                      } else {
-                        log.error(
-                            "spec[%s] failed after [%d] tries",
-                            monitorEntry.spec.getId(),
-                            monitorEntry.numTries()
-                        );
-                        // Remote the current entry after updating taskHistories to make sure that task history
+
+                // We here measure the current time for individual subtask because it could take long time to talk to
+                // the Overlord.
+                if (monitorEntry.needStatusCheck(System.nanoTime(), liveReportTimeoutNs)) {
+                  final TaskStatusResponse taskStatusResponse = indexingServiceClient.getTaskStatus(taskId);
+                  final TaskStatusPlus taskStatus = taskStatusResponse.getStatus();
+                  if (taskStatus != null) {
+                    switch (Preconditions.checkNotNull(taskStatus.getStatusCode(), "taskState")) {
+                      case SUCCESS:
+                        incrementNumSucceededTasks();
+
+                        // Remove the current entry after updating taskHistories to make sure that task history
                         // exists either runningTasks or taskHistories.
                         monitorEntry.setLastStatus(taskStatus);
                         iterator.remove();
-                      }
-                      break;
-                    case RUNNING:
-                      monitorEntry.updateStatus(taskStatus);
-                      break;
-                    default:
-                      throw new ISE("Unknown taskStatus[%s] for task[%s[", taskStatus.getStatusCode(), taskId);
+                        break;
+                      case FAILED:
+                        incrementNumFailedTasks();
+
+                        log.warn("task[%s] failed!", taskId);
+                        if (monitorEntry.numTries() < maxSubtaskRetries) {
+                          log.info(
+                              "We still have more chances[%d/%d] to process the spec[%s].",
+                              monitorEntry.numTries(),
+                              maxSubtaskRetries,
+                              monitorEntry.spec.getId()
+                          );
+                          retry(monitorEntry, taskStatus);
+                        } else {
+                          log.error(
+                              "spec[%s] failed after [%d] tries",
+                              monitorEntry.spec.getId(),
+                              monitorEntry.numTries()
+                          );
+                          // Remove the current entry after updating taskHistories to make sure that task history
+                          // exists either runningTasks or taskHistories.
+                          monitorEntry.setLastStatus(taskStatus);
+                          iterator.remove();
+                        }
+                        break;
+                      case RUNNING:
+                        monitorEntry.updateRunningStatus(taskStatus);
+                        break;
+                      default:
+                        throw new ISE("Unknown taskStatus[%s] for task[%s[", taskStatus.getStatusCode(), taskId);

Review comment:
       `taskId` or `specId`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] stale[bot] commented on pull request #10376: Live reporting system for parallel task

Posted by GitBox <gi...@apache.org>.
stale[bot] commented on pull request #10376:
URL: https://github.com/apache/druid/pull/10376#issuecomment-743877597


   This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 4 weeks if no further activity occurs. If you think that's incorrect or this pull request should instead be reviewed, please simply write any comment. Even if closed, you can still revive the PR at any time or discuss it on the dev@druid.apache.org list. Thank you for your contributions.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org