You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by "kfaraz (via GitHub)" <gi...@apache.org> on 2023/05/03 10:47:31 UTC

[GitHub] [druid] kfaraz commented on a diff in pull request #13758: add ingested intervals to task report

kfaraz commented on code in PR #13758:
URL: https://github.com/apache/druid/pull/13758#discussion_r1183402090


##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java:
##########
@@ -1040,8 +1048,12 @@ private TaskStatus generateAndPublishSegments(
         );
 
         log.debugSegments(published.getSegments(), "Published segments");
-
-        toolbox.getTaskReportFileWriter().write(getId(), getTaskCompletionReports());
+        List<Interval> ingestedIntervals = published.getSegments()
+                                                    .stream()
+                                                    .map(DataSegment::getInterval)
+                                                    .collect(Collectors.toList());
+        toolbox.getTaskReportFileWriter().write(

Review Comment:
   Druid formatting style suggestion for here and some other places:
   Try to fit method/constructor call in a single line, unless it is an exception. If you break, follow the guidelines below.
   
   https://github.com/apache/druid/blob/ad93635e4599adee2cf558cd1b9dbaaffbe9031e/codestyle/checkstyle.xml#L252-L276



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java:
##########
@@ -610,7 +610,8 @@ private Map<String, TaskReport> getTaskCompletionReports()
                 getTaskCompletionRowStats(),
                 errorMsg,
                 errorMsg == null,
-                0L
+                0L,
+                Collections.emptyList()

Review Comment:
   For realtime tasks, we should pass the `ingestedIntervals` as `null`. Passing empty can be misleading here.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexStats.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.java.util.common.Pair;
+import org.joda.time.Interval;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+public class ParallelIndexStats
+{
+  private final Map<String, Object> rowStats;
+  private final Map<String, Object> unparseableEvents;
+  private final Set<Interval> ingestedIntervals;
+
+  public ParallelIndexStats()
+  {
+    this.rowStats = ImmutableMap.of();
+    this.unparseableEvents = ImmutableMap.of();
+    this.ingestedIntervals = ImmutableSet.of();
+  }
+
+  public ParallelIndexStats(
+      Map<String, Object> rowStats,
+      Map<String, Object> unparseableEvents,
+      Set<Interval> ingestedIntervals
+  )
+  {
+    this.rowStats = rowStats;
+    this.unparseableEvents = unparseableEvents;
+    this.ingestedIntervals = ingestedIntervals;
+  }
+
+  public Pair<Map<String, Object>, Map<String, Object>> toRowStatsAndUnparseableEvents()
+  {
+    return Pair.of(rowStats, unparseableEvents);
+  }
+
+  public Map<String, Object> getRowStats()
+  {
+    return rowStats;
+  }
+
+  public Map<String, Object> getUnparseableEvents()
+  {
+    return unparseableEvents;
+  }
+
+  public Set<Interval> getIngestedIntervals()
+  {
+    return ingestedIntervals;
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    ParallelIndexStats that = (ParallelIndexStats) o;
+    return rowStats.equals(that.rowStats)
+           && unparseableEvents.equals(that.unparseableEvents)
+           && ingestedIntervals.equals(

Review Comment:
   Please stick to formatting style.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexStatsReporter.java:
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
+import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
+import org.apache.druid.indexing.common.TaskReport;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.segment.incremental.ParseExceptionReport;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
+import org.apache.druid.segment.incremental.SimpleRowIngestionMeters;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public abstract class ParallelIndexStatsReporter
+{
+  private static final Logger LOG = new Logger(ParallelIndexStatsReporter.class);
+
+  // Row ingestion meters fields
+  private static final String PROCESSED_FIELD = "processed";
+  private static final String PROCESSED_BYTES_FIELD = "processedBytes";
+  private static final String PROCESSED_WITH_ERROR_FIELD = "processedWithError";
+  private static final String THROWN_AWAY_FIELD = "thrownAway";
+  private static final String UNPARSEABLE_FIELD = "unparseable";
+  // Ingestion stats and errors report fields
+  private static final String INGESTION_STATS_AND_ERRORS_FIELD = "ingestionStatsAndErrors";
+  private static final String PAYLOAD_FIELD = "payload";
+  private static final String ROW_STATS_FIELD = "rowStats";
+  private static final String TOTALS_FIELD = "totals";
+  private static final String UNPARSEABLE_EVENTS_FIELD = "unparseableEvents";
+
+  abstract ParallelIndexStats report(

Review Comment:
   I took a look at this class and its sub-classes. I feel there is still some duplication between the single-phase and multi-phase implementations.
   
   For the overall design, here is what I am thinking:
   - We probably don't need multiple implementations and thus no need for the factory class either.
   - Have a single implementation `ParallelIndexTaskStatsReporter`.
   - Pass the task to its constructor.
   - `report` method (maybe renamed `reportStats` or `getStats` for less ambiguity) should take the following
      - set of running task ids
      - map of reports (the new interace `TaskReportContainer` should be of use here)
      - we shouldn't have to pass the task again here, neither should we pass the indexingRunner as a plain `Object`.
   - For parallel single-phase and multi-phase
     - the handling would be the same
   - For sequential
      - we shouldn't be using `ParallelIndexTaskStatsReporter` at all because it is sequential, not parallel
      - we should continue to do what the original code was doing
   



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java:
##########
@@ -1540,201 +1548,31 @@ public Response getCompleteSubTaskSpecAttemptHistory(
     }
   }
 
-  private RowIngestionMetersTotals getTotalsFromBuildSegmentsRowStats(Object buildSegmentsRowStats)
-  {
-    if (buildSegmentsRowStats instanceof RowIngestionMetersTotals) {
-      // This case is for unit tests. Normally when deserialized the row stats will apppear as a Map<String, Object>.
-      return (RowIngestionMetersTotals) buildSegmentsRowStats;
-    } else if (buildSegmentsRowStats instanceof Map) {
-      Map<String, Object> buildSegmentsRowStatsMap = (Map<String, Object>) buildSegmentsRowStats;
-      return new RowIngestionMetersTotals(
-          ((Number) buildSegmentsRowStatsMap.get("processed")).longValue(),
-          ((Number) buildSegmentsRowStatsMap.get("processedBytes")).longValue(),
-          ((Number) buildSegmentsRowStatsMap.get("processedWithError")).longValue(),
-          ((Number) buildSegmentsRowStatsMap.get("thrownAway")).longValue(),
-          ((Number) buildSegmentsRowStatsMap.get("unparseable")).longValue()
-      );
-    } else {
-      // should never happen
-      throw new RuntimeException("Unrecognized buildSegmentsRowStats type: " + buildSegmentsRowStats.getClass());
-    }
-  }
-
-  private Pair<Map<String, Object>, Map<String, Object>> doGetRowStatsAndUnparseableEventsParallelSinglePhase(
-      SinglePhaseParallelIndexTaskRunner parallelSinglePhaseRunner,
-      boolean includeUnparseable
-  )
-  {
-    final SimpleRowIngestionMeters buildSegmentsRowStats = new SimpleRowIngestionMeters();
-
-    List<ParseExceptionReport> unparseableEvents = new ArrayList<>();
-
-    // Get stats from completed tasks
-    Map<String, PushedSegmentsReport> completedSubtaskReports = parallelSinglePhaseRunner.getReports();
-    for (PushedSegmentsReport pushedSegmentsReport : completedSubtaskReports.values()) {
-      Map<String, TaskReport> taskReport = pushedSegmentsReport.getTaskReport();
-      if (taskReport == null || taskReport.isEmpty()) {
-        LOG.warn("Got an empty task report from subtask: " + pushedSegmentsReport.getTaskId());
-        continue;
-      }
-      RowIngestionMetersTotals rowIngestionMetersTotals =
-          getBuildSegmentsStatsFromTaskReport(taskReport, includeUnparseable, unparseableEvents);
-
-      buildSegmentsRowStats.addRowIngestionMetersTotals(rowIngestionMetersTotals);
-    }
-
-    RowIngestionMetersTotals rowStatsForRunningTasks = getRowStatsAndUnparseableEventsForRunningTasks(
-        parallelSinglePhaseRunner.getRunningTaskIds(),
-        unparseableEvents,
-        includeUnparseable
-    );
-    buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForRunningTasks);
-
-    return createStatsAndErrorsReport(buildSegmentsRowStats.getTotals(), unparseableEvents);
-  }
-
-  private Pair<Map<String, Object>, Map<String, Object>> doGetRowStatsAndUnparseableEventsParallelMultiPhase(
-      ParallelIndexTaskRunner<?, ?> currentRunner,
-      boolean includeUnparseable
-  )
-  {
-    if (indexGenerateRowStats != null) {
-      return Pair.of(indexGenerateRowStats.lhs, includeUnparseable ? indexGenerateRowStats.rhs : ImmutableMap.of());
-    } else if (!currentRunner.getName().equals("partial segment generation")) {
-      return Pair.of(ImmutableMap.of(), ImmutableMap.of());
-    } else {
-      Map<String, GeneratedPartitionsReport> completedSubtaskReports =
-          (Map<String, GeneratedPartitionsReport>) currentRunner.getReports();
-
-      final SimpleRowIngestionMeters buildSegmentsRowStats = new SimpleRowIngestionMeters();
-      final List<ParseExceptionReport> unparseableEvents = new ArrayList<>();
-      for (GeneratedPartitionsReport generatedPartitionsReport : completedSubtaskReports.values()) {
-        Map<String, TaskReport> taskReport = generatedPartitionsReport.getTaskReport();
-        if (taskReport == null || taskReport.isEmpty()) {
-          LOG.warn("Got an empty task report from subtask: " + generatedPartitionsReport.getTaskId());
-          continue;
-        }
-        RowIngestionMetersTotals rowStatsForCompletedTask =
-            getBuildSegmentsStatsFromTaskReport(taskReport, true, unparseableEvents);
-
-        buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForCompletedTask);
-      }
-
-      RowIngestionMetersTotals rowStatsForRunningTasks = getRowStatsAndUnparseableEventsForRunningTasks(
-          currentRunner.getRunningTaskIds(),
-          unparseableEvents,
-          includeUnparseable
-      );
-      buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForRunningTasks);
-
-      return createStatsAndErrorsReport(buildSegmentsRowStats.getTotals(), unparseableEvents);
-    }
-  }
-
-  private RowIngestionMetersTotals getRowStatsAndUnparseableEventsForRunningTasks(
-      Set<String> runningTaskIds,
-      List<ParseExceptionReport> unparseableEvents,
-      boolean includeUnparseable
-  )
+  @Nullable
+  Map<String, Object> fetchTaskReport(String taskId)
   {
-    final SimpleRowIngestionMeters buildSegmentsRowStats = new SimpleRowIngestionMeters();
-    for (String runningTaskId : runningTaskIds) {
-      try {
-        final Map<String, Object> report = getTaskReport(toolbox.getOverlordClient(), runningTaskId);
-
-        if (report == null || report.isEmpty()) {
-          // task does not have a running report yet
-          continue;
-        }
-
-        Map<String, Object> ingestionStatsAndErrors = (Map<String, Object>) report.get("ingestionStatsAndErrors");
-        Map<String, Object> payload = (Map<String, Object>) ingestionStatsAndErrors.get("payload");
-        Map<String, Object> rowStats = (Map<String, Object>) payload.get("rowStats");
-        Map<String, Object> totals = (Map<String, Object>) rowStats.get("totals");
-        Map<String, Object> buildSegments = (Map<String, Object>) totals.get(RowIngestionMeters.BUILD_SEGMENTS);
-
-        if (includeUnparseable) {
-          Map<String, Object> taskUnparseableEvents = (Map<String, Object>) payload.get("unparseableEvents");
-          List<ParseExceptionReport> buildSegmentsUnparseableEvents = (List<ParseExceptionReport>)
-              taskUnparseableEvents.get(RowIngestionMeters.BUILD_SEGMENTS);
-          unparseableEvents.addAll(buildSegmentsUnparseableEvents);
-        }
-
-        buildSegmentsRowStats.addRowIngestionMetersTotals(getTotalsFromBuildSegmentsRowStats(buildSegments));
-      }
-      catch (Exception e) {
-        LOG.warn(e, "Encountered exception when getting live subtask report for task: " + runningTaskId);
-      }
+    try {
+      return getTaskReport(toolbox.getOverlordClient(), taskId);
     }
-    return buildSegmentsRowStats.getTotals();
-  }
-
-  private Pair<Map<String, Object>, Map<String, Object>> createStatsAndErrorsReport(
-      RowIngestionMetersTotals rowStats,
-      List<ParseExceptionReport> unparseableEvents
-  )
-  {
-    Map<String, Object> rowStatsMap = new HashMap<>();
-    Map<String, Object> totalsMap = new HashMap<>();
-    totalsMap.put(RowIngestionMeters.BUILD_SEGMENTS, rowStats);
-    rowStatsMap.put("totals", totalsMap);
-
-    return Pair.of(rowStatsMap, ImmutableMap.of(RowIngestionMeters.BUILD_SEGMENTS, unparseableEvents));
-  }
-
-  private RowIngestionMetersTotals getBuildSegmentsStatsFromTaskReport(
-      Map<String, TaskReport> taskReport,
-      boolean includeUnparseable,
-      List<ParseExceptionReport> unparseableEvents)
-  {
-    IngestionStatsAndErrorsTaskReport ingestionStatsAndErrorsReport = (IngestionStatsAndErrorsTaskReport) taskReport.get(
-        IngestionStatsAndErrorsTaskReport.REPORT_KEY);
-    IngestionStatsAndErrorsTaskReportData reportData =
-        (IngestionStatsAndErrorsTaskReportData) ingestionStatsAndErrorsReport.getPayload();
-    RowIngestionMetersTotals totals = getTotalsFromBuildSegmentsRowStats(
-        reportData.getRowStats().get(RowIngestionMeters.BUILD_SEGMENTS)
-    );
-    if (includeUnparseable) {
-      List<ParseExceptionReport> taskUnparsebleEvents = (List<ParseExceptionReport>) reportData.getUnparseableEvents()
-                                                                    .get(RowIngestionMeters.BUILD_SEGMENTS);
-      unparseableEvents.addAll(taskUnparsebleEvents);
+    catch (Exception e) {
+      LOG.warn(e, "Encountered exception when getting live subtask report for task: " + taskId);
     }
-    return totals;
+    return null;
   }
 
-  private Pair<Map<String, Object>, Map<String, Object>> doGetRowStatsAndUnparseableEvents(
-      String full,
-      boolean includeUnparseable
-  )
+  private ParallelIndexStats doGetRowStatsAndUnparseableEvents(boolean full, boolean includeUnparseable)
   {
     if (currentSubTaskHolder == null) {
-      return Pair.of(ImmutableMap.of(), ImmutableMap.of());
+      return new ParallelIndexStats();
     }
 
     Object currentRunner = currentSubTaskHolder.getTask();
     if (currentRunner == null) {
-      return Pair.of(ImmutableMap.of(), ImmutableMap.of());
+      return new ParallelIndexStats();
     }
 
-    if (isParallelMode()) {

Review Comment:
   I think we can retain this if-else flow (but with some clean up as you have done).
   - for sequential, we will just return here
   - for single phase, we will create a `ParallelIndexStatsReporter` and do the computations
   - for multi phase, we will have a method in this class itself, which checks if it is already cached or not, otherwise, it constructs a `ParallelIndexStatsReporter`.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java:
##########
@@ -323,7 +324,7 @@ public TaskStatus runTask(TaskToolbox toolbox)
       }
 
       errorMsg = Throwables.getStackTraceAsString(effectiveException);
-      toolbox.getTaskReportFileWriter().write(getId(), getTaskCompletionReports());
+      toolbox.getTaskReportFileWriter().write(getId(), getTaskCompletionReports(Collections.emptyList()));

Review Comment:
   I think we should pass empty only when we are certain that the task didn't ingest any interval. Otherwise, we should use `null`. In the future, we might want to clarify which intervals were ingested before the task failed. This could help in debugging a failing task.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexStats.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.java.util.common.Pair;
+import org.joda.time.Interval;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+public class ParallelIndexStats

Review Comment:
   Maybe `ParallelIndexTaskStats` instead?



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexStatsReporter.java:
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
+import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
+import org.apache.druid.indexing.common.TaskReport;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.segment.incremental.ParseExceptionReport;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
+import org.apache.druid.segment.incremental.SimpleRowIngestionMeters;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public abstract class ParallelIndexStatsReporter

Review Comment:
   Please add a small javadoc.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java:
##########
@@ -1105,7 +1105,8 @@ private Map<String, TaskReport> getTaskCompletionReports(@Nullable String errorM
                 getTaskCompletionRowStats(),
                 errorMsg,
                 errorMsg == null,
-                handoffWaitMs
+                handoffWaitMs,
+                Collections.emptyList()

Review Comment:
   Streaming ingestion should set ingestedIntervals to null for the time being.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/MultiPhaseParallelIndexStatsReporter.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
+import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
+import org.apache.druid.indexing.common.TaskReport;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.segment.incremental.ParseExceptionReport;
+import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
+import org.apache.druid.segment.incremental.SimpleRowIngestionMeters;
+import org.joda.time.Interval;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class MultiPhaseParallelIndexStatsReporter extends ParallelIndexStatsReporter
+{
+  private static final Logger LOG = new Logger(MultiPhaseParallelIndexStatsReporter.class);
+
+  @Override
+  ParallelIndexStats report(
+      ParallelIndexSupervisorTask task,
+      Object runner,
+      boolean includeUnparseable,
+      boolean full
+  )
+  {
+    // use cached version if available
+    ParallelIndexStats cached = task.getIndexGenerateRowStats();
+    if (null != cached) {
+      return cached;
+    }
+
+    ParallelIndexTaskRunner<?, ?> currentRunner = (ParallelIndexTaskRunner<?, ?>) runner;
+    if (!InputSourceSplitParallelIndexTaskRunner.PARTIAL_SEGMENT_GENERATION_PHASE.equals(currentRunner.getName())) {
+      return new ParallelIndexStats();
+    }
+
+    Map<String, GeneratedPartitionsReport> completedSubtaskReports =
+        (Map<String, GeneratedPartitionsReport>) currentRunner.getReports();
+
+    final SimpleRowIngestionMeters buildSegmentsRowStats = new SimpleRowIngestionMeters();
+    final List<ParseExceptionReport> unparseableEvents = new ArrayList<>();
+    Set<Interval> intervalsIngested = new HashSet<>();
+    for (GeneratedPartitionsReport generatedPartitionsReport : completedSubtaskReports.values()) {
+      Map<String, TaskReport> taskReport = generatedPartitionsReport.getTaskReport();
+      if (taskReport == null || taskReport.isEmpty()) {
+        LOG.warn("Got an empty task report from subtask: " + generatedPartitionsReport.getTaskId());
+        continue;
+      }
+
+      IngestionStatsAndErrorsTaskReport iseReport =
+          (IngestionStatsAndErrorsTaskReport) taskReport.get(IngestionStatsAndErrorsTaskReport.REPORT_KEY);
+      IngestionStatsAndErrorsTaskReportData payload = (IngestionStatsAndErrorsTaskReportData) iseReport.getPayload();
+      intervalsIngested.addAll(payload.getIngestedIntervals());
+
+      RowIngestionMetersTotals rowStatsForCompletedTask =
+          getBuildSegmentsStatsFromTaskReport(taskReport, true, unparseableEvents);
+
+      buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForCompletedTask);
+    }
+
+    RowIngestionMetersTotals rowStatsForRunningTasks = getRowStatsAndUnparseableEventsForRunningTasks(
+        task,
+        currentRunner.getRunningTaskIds(),
+        unparseableEvents,
+        includeUnparseable
+    );
+    buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForRunningTasks);
+
+    Pair<Map<String, Object>, Map<String, Object>> report =
+        createStatsAndErrorsReport(buildSegmentsRowStats.getTotals(), unparseableEvents);
+
+    return new ParallelIndexStats(

Review Comment:
   See formatting comment.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java:
##########
@@ -1540,201 +1548,31 @@ public Response getCompleteSubTaskSpecAttemptHistory(
     }
   }
 
-  private RowIngestionMetersTotals getTotalsFromBuildSegmentsRowStats(Object buildSegmentsRowStats)
-  {
-    if (buildSegmentsRowStats instanceof RowIngestionMetersTotals) {
-      // This case is for unit tests. Normally when deserialized the row stats will apppear as a Map<String, Object>.
-      return (RowIngestionMetersTotals) buildSegmentsRowStats;
-    } else if (buildSegmentsRowStats instanceof Map) {
-      Map<String, Object> buildSegmentsRowStatsMap = (Map<String, Object>) buildSegmentsRowStats;
-      return new RowIngestionMetersTotals(
-          ((Number) buildSegmentsRowStatsMap.get("processed")).longValue(),
-          ((Number) buildSegmentsRowStatsMap.get("processedBytes")).longValue(),
-          ((Number) buildSegmentsRowStatsMap.get("processedWithError")).longValue(),
-          ((Number) buildSegmentsRowStatsMap.get("thrownAway")).longValue(),
-          ((Number) buildSegmentsRowStatsMap.get("unparseable")).longValue()
-      );
-    } else {
-      // should never happen
-      throw new RuntimeException("Unrecognized buildSegmentsRowStats type: " + buildSegmentsRowStats.getClass());
-    }
-  }
-
-  private Pair<Map<String, Object>, Map<String, Object>> doGetRowStatsAndUnparseableEventsParallelSinglePhase(
-      SinglePhaseParallelIndexTaskRunner parallelSinglePhaseRunner,
-      boolean includeUnparseable
-  )
-  {
-    final SimpleRowIngestionMeters buildSegmentsRowStats = new SimpleRowIngestionMeters();
-
-    List<ParseExceptionReport> unparseableEvents = new ArrayList<>();
-
-    // Get stats from completed tasks
-    Map<String, PushedSegmentsReport> completedSubtaskReports = parallelSinglePhaseRunner.getReports();
-    for (PushedSegmentsReport pushedSegmentsReport : completedSubtaskReports.values()) {
-      Map<String, TaskReport> taskReport = pushedSegmentsReport.getTaskReport();
-      if (taskReport == null || taskReport.isEmpty()) {
-        LOG.warn("Got an empty task report from subtask: " + pushedSegmentsReport.getTaskId());
-        continue;
-      }
-      RowIngestionMetersTotals rowIngestionMetersTotals =
-          getBuildSegmentsStatsFromTaskReport(taskReport, includeUnparseable, unparseableEvents);
-
-      buildSegmentsRowStats.addRowIngestionMetersTotals(rowIngestionMetersTotals);
-    }
-
-    RowIngestionMetersTotals rowStatsForRunningTasks = getRowStatsAndUnparseableEventsForRunningTasks(
-        parallelSinglePhaseRunner.getRunningTaskIds(),
-        unparseableEvents,
-        includeUnparseable
-    );
-    buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForRunningTasks);
-
-    return createStatsAndErrorsReport(buildSegmentsRowStats.getTotals(), unparseableEvents);
-  }
-
-  private Pair<Map<String, Object>, Map<String, Object>> doGetRowStatsAndUnparseableEventsParallelMultiPhase(
-      ParallelIndexTaskRunner<?, ?> currentRunner,
-      boolean includeUnparseable
-  )
-  {
-    if (indexGenerateRowStats != null) {
-      return Pair.of(indexGenerateRowStats.lhs, includeUnparseable ? indexGenerateRowStats.rhs : ImmutableMap.of());
-    } else if (!currentRunner.getName().equals("partial segment generation")) {
-      return Pair.of(ImmutableMap.of(), ImmutableMap.of());
-    } else {
-      Map<String, GeneratedPartitionsReport> completedSubtaskReports =
-          (Map<String, GeneratedPartitionsReport>) currentRunner.getReports();
-
-      final SimpleRowIngestionMeters buildSegmentsRowStats = new SimpleRowIngestionMeters();
-      final List<ParseExceptionReport> unparseableEvents = new ArrayList<>();
-      for (GeneratedPartitionsReport generatedPartitionsReport : completedSubtaskReports.values()) {
-        Map<String, TaskReport> taskReport = generatedPartitionsReport.getTaskReport();
-        if (taskReport == null || taskReport.isEmpty()) {
-          LOG.warn("Got an empty task report from subtask: " + generatedPartitionsReport.getTaskId());
-          continue;
-        }
-        RowIngestionMetersTotals rowStatsForCompletedTask =
-            getBuildSegmentsStatsFromTaskReport(taskReport, true, unparseableEvents);
-
-        buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForCompletedTask);
-      }
-
-      RowIngestionMetersTotals rowStatsForRunningTasks = getRowStatsAndUnparseableEventsForRunningTasks(
-          currentRunner.getRunningTaskIds(),
-          unparseableEvents,
-          includeUnparseable
-      );
-      buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForRunningTasks);
-
-      return createStatsAndErrorsReport(buildSegmentsRowStats.getTotals(), unparseableEvents);
-    }
-  }
-
-  private RowIngestionMetersTotals getRowStatsAndUnparseableEventsForRunningTasks(
-      Set<String> runningTaskIds,
-      List<ParseExceptionReport> unparseableEvents,
-      boolean includeUnparseable
-  )
+  @Nullable
+  Map<String, Object> fetchTaskReport(String taskId)
   {
-    final SimpleRowIngestionMeters buildSegmentsRowStats = new SimpleRowIngestionMeters();

Review Comment:
   Thanks for moving all of this logic to the new class! Seems much cleaner now.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java:
##########
@@ -591,7 +595,7 @@ private static boolean useRangePartitions(ParallelIndexTuningConfig tuningConfig
     return tuningConfig.getGivenOrDefaultPartitionsSpec() instanceof DimensionRangePartitionsSpec;
   }
 
-  private boolean isParallelMode()
+  boolean isParallelMode()
   {
     return isParallelMode(baseInputSource, ingestionSchema.getTuningConfig());

Review Comment:
   Seems like this should be computed just once when this task is instantiated and assigned to a final field. But we need not do it in this PR.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java:
##########
@@ -246,7 +249,8 @@ private Map<String, TaskReport> getTaskCompletionReports()
                 getTaskCompletionRowStats(),
                 "",
                 false, // not applicable for parallel subtask
-                segmentAvailabilityWaitTimeMs
+                segmentAvailabilityWaitTimeMs,
+                intervals

Review Comment:
   @danprince1 , condensing intervals would still help in reducing the payload size of the report sent from sub-task to parent task.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/MultiPhaseParallelIndexStatsReporter.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
+import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
+import org.apache.druid.indexing.common.TaskReport;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.segment.incremental.ParseExceptionReport;
+import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
+import org.apache.druid.segment.incremental.SimpleRowIngestionMeters;
+import org.joda.time.Interval;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class MultiPhaseParallelIndexStatsReporter extends ParallelIndexStatsReporter
+{
+  private static final Logger LOG = new Logger(MultiPhaseParallelIndexStatsReporter.class);
+
+  @Override
+  ParallelIndexStats report(
+      ParallelIndexSupervisorTask task,
+      Object runner,
+      boolean includeUnparseable,
+      boolean full
+  )
+  {
+    // use cached version if available
+    ParallelIndexStats cached = task.getIndexGenerateRowStats();
+    if (null != cached) {
+      return cached;
+    }
+
+    ParallelIndexTaskRunner<?, ?> currentRunner = (ParallelIndexTaskRunner<?, ?>) runner;
+    if (!InputSourceSplitParallelIndexTaskRunner.PARTIAL_SEGMENT_GENERATION_PHASE.equals(currentRunner.getName())) {
+      return new ParallelIndexStats();
+    }

Review Comment:
   Ideally, this logic should remain in the calling task itself. The task should call the report method only when we want the computation to happen.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org