You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by "danprince1 (via GitHub)" <gi...@apache.org> on 2023/02/06 15:33:38 UTC

[GitHub] [druid] danprince1 opened a new pull request, #13758: add ingested intervals to task report

danprince1 opened a new pull request, #13758:
URL: https://github.com/apache/druid/pull/13758

   <!-- Thanks for trying to help us make Apache Druid be the best it can be! Please fill out as much of the following information as is possible (where relevant, and remove it when irrelevant) to help make the intention and scope of this PR clear in order to ease review. -->
   
   <!-- Please read the doc for contribution (https://github.com/apache/druid/blob/master/CONTRIBUTING.md) before making this PR. Also, once you open a PR, please _avoid using force pushes and rebasing_ since these make it difficult for reviewers to see what you've changed in response to their reviews. See [the 'If your pull request shows conflicts with master' section](https://github.com/apache/druid/blob/master/CONTRIBUTING.md#if-your-pull-request-shows-conflicts-with-master) for more details. -->
   
   
   
   <!-- Replace XXXX with the id of the issue fixed in this PR. Remove this section if there is no corresponding issue. Don't reference the issue in the title of this pull-request. -->
   
   <!-- If you are a committer, follow the PR action item checklist for committers:
   https://github.com/apache/druid/blob/master/dev/committer-instructions.md#pr-and-issue-action-item-checklist-for-committers. -->
   
   ### Description
   
   For batch ingestion (native batch tasks and the Hadoop batch task), this PR adds a list of the time intervals ingested to the completion report.  This list represents the time interval of each segment created by the task, condensed into as few intervals as possible.  For example, if four segments were created at 'day' granularity for March 1st, 2nd, 3rd, and 5th, the report's `ingestedIntervals` list would contain two intervals, `["2023-03-01T00:00:00Z/2023-03-04T00:00:00Z", "2023-03-05T00:00:00Z/2023-03-06T00:00:00Z"]`. 
   
   <!-- Describe the goal of this PR, what problem are you fixing. If there is a corresponding issue (referenced above), it's not necessary to repeat the description here, however, you may choose to keep one summary sentence. -->
   
   For IndexTask and the Hadoop task the changes here are fairly straightforward.  For parallel tasks, the code in `ParallelIndexSupervisorTask` gathers intervals ingested from subtasks and adds them to the report.  But this task was already a bit hairy, so I extracted it into a new class, `ParallelIndexStatsReporter` and implementation subclasses.  The implementation is created by a factory based on the configuration of the parallel task.
   
   <!-- Describe your patch: what did you change in code? How did you fix the problem? -->
   
   <!-- If there are several relatively logically separate changes in this PR, create a mini-section for each of them. For example: -->
   
   
   
   <!--
   In each section, please describe design decisions made, including:
    - Choice of algorithms
    - Behavioral aspects. What configuration values are acceptable? How are corner cases and error conditions handled, such as when there are insufficient resources?
    - Class organization and design (how the logic is split between classes, inheritance, composition, design patterns)
    - Method organization and design (how the logic is split between methods, parameters and return types)
    - Naming (class, method, API, configuration, HTTP endpoint, names of emitted metrics)
   -->
   
   
   <!-- It's good to describe an alternative design (or mention an alternative name) for every design (or naming) decision point and compare the alternatives with the designs that you've implemented (or the names you've chosen) to highlight the advantages of the chosen designs and names. -->
   
   <!-- If there was a discussion of the design of the feature implemented in this PR elsewhere (e. g. a "Proposal" issue, any other issue, or a thread in the development mailing list), link to that discussion from this PR description and explain what have changed in your final design compared to your original proposal or the consensus version in the end of the discussion. If something hasn't changed since the original discussion, you can omit a detailed discussion of those aspects of the design here, perhaps apart from brief mentioning for the sake of readability of this PR description. -->
   
   <!-- Some of the aspects mentioned above may be omitted for simple and small changes. -->
   
   #### Release note
   
   For batch ingestion (native batch tasks and the Hadoop batch task), the completion report contains a condensed list of the time intervals ingested.
   
   <!-- Give your best effort to summarize your changes in a couple of sentences aimed toward Druid users. 
   
   If your change doesn't have end user impact, you can skip this section.
   
   For tips about how to write a good release note, see [Release notes](https://github.com/apache/druid/blob/master/CONTRIBUTING.md#release-notes).
   
   -->
   
   
   <hr>
   
   ##### Key changed/added classes in this PR
    * `ParallelIndexSupervisorTask`
    * `IndexTask`
    * `HadoopIndexTask`
    * `IngestionStatsAndErrorsTaskReportData`
   
   <hr>
   
   <!-- Check the items by putting "x" in the brackets for the done things. Not all of these items apply to every PR. Remove the items which are not done or not relevant to the PR. None of the items from the checklist below are strictly necessary, but it would be very helpful if you at least self-review the PR. -->
   
   This PR has:
   
   - [x] been self-reviewed.
   - [x] added documentation for new or modified features or behaviors.
   - [x] a release note entry in the PR description.
   - [x] added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
   - [x] added or updated version, license, or notice information in [licenses.yaml](https://github.com/apache/druid/blob/master/dev/license.md)
   - [x] added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
   - [x] added unit tests or modified existing tests to cover new code paths, ensuring the threshold for [code coverage](https://github.com/apache/druid/blob/master/dev/code-review/code-coverage.md) is met.
   - [x] been tested in a test Druid cluster.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] danprince1 commented on a diff in pull request #13758: add ingested intervals to task report

Posted by "danprince1 (via GitHub)" <gi...@apache.org>.
danprince1 commented on code in PR #13758:
URL: https://github.com/apache/druid/pull/13758#discussion_r1154889605


##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexStatsReporter.java:
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
+import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
+import org.apache.druid.indexing.common.TaskReport;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.segment.incremental.ParseExceptionReport;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
+import org.apache.druid.segment.incremental.SimpleRowIngestionMeters;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public abstract class ParallelIndexStatsReporter
+{
+  private static final Logger LOG = new Logger(ParallelIndexStatsReporter.class);
+
+  abstract ParallelIndexStats report(
+      ParallelIndexSupervisorTask task,
+      Object runner,
+      boolean includeUnparseable,
+      String full
+  );
+
+  protected RowIngestionMetersTotals getBuildSegmentsStatsFromTaskReport(
+      Map<String, TaskReport> taskReport,
+      boolean includeUnparseable,
+      List<ParseExceptionReport> unparseableEvents
+  )
+  {
+    IngestionStatsAndErrorsTaskReport ingestionStatsAndErrorsReport =
+        (IngestionStatsAndErrorsTaskReport) taskReport.get(
+            IngestionStatsAndErrorsTaskReport.REPORT_KEY);
+    IngestionStatsAndErrorsTaskReportData reportData =
+        (IngestionStatsAndErrorsTaskReportData) ingestionStatsAndErrorsReport.getPayload();
+    RowIngestionMetersTotals totals = getTotalsFromBuildSegmentsRowStats(
+        reportData.getRowStats().get(RowIngestionMeters.BUILD_SEGMENTS)
+    );
+    if (includeUnparseable) {
+      List<ParseExceptionReport> taskUnparsebleEvents =
+          (List<ParseExceptionReport>) reportData.getUnparseableEvents().get(RowIngestionMeters.BUILD_SEGMENTS);
+      unparseableEvents.addAll(taskUnparsebleEvents);
+    }
+    return totals;
+  }
+
+  private RowIngestionMetersTotals getTotalsFromBuildSegmentsRowStats(Object buildSegmentsRowStats)
+  {
+    if (buildSegmentsRowStats instanceof RowIngestionMetersTotals) {
+      // This case is for unit tests. Normally when deserialized the row stats will apppear as a Map<String, Object>.
+      return (RowIngestionMetersTotals) buildSegmentsRowStats;
+    } else if (buildSegmentsRowStats instanceof Map) {
+      Map<String, Object> buildSegmentsRowStatsMap = (Map<String, Object>) buildSegmentsRowStats;
+      return new RowIngestionMetersTotals(
+          ((Number) buildSegmentsRowStatsMap.get("processed")).longValue(),
+          ((Number) buildSegmentsRowStatsMap.get("processedBytes")).longValue(),
+          ((Number) buildSegmentsRowStatsMap.get("processedWithError")).longValue(),
+          ((Number) buildSegmentsRowStatsMap.get("thrownAway")).longValue(),
+          ((Number) buildSegmentsRowStatsMap.get("unparseable")).longValue()

Review Comment:
   Good idea - done.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexStatsReporter.java:
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
+import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
+import org.apache.druid.indexing.common.TaskReport;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.segment.incremental.ParseExceptionReport;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
+import org.apache.druid.segment.incremental.SimpleRowIngestionMeters;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public abstract class ParallelIndexStatsReporter
+{
+  private static final Logger LOG = new Logger(ParallelIndexStatsReporter.class);
+
+  abstract ParallelIndexStats report(
+      ParallelIndexSupervisorTask task,
+      Object runner,
+      boolean includeUnparseable,
+      String full
+  );
+
+  protected RowIngestionMetersTotals getBuildSegmentsStatsFromTaskReport(
+      Map<String, TaskReport> taskReport,
+      boolean includeUnparseable,
+      List<ParseExceptionReport> unparseableEvents
+  )
+  {
+    IngestionStatsAndErrorsTaskReport ingestionStatsAndErrorsReport =
+        (IngestionStatsAndErrorsTaskReport) taskReport.get(
+            IngestionStatsAndErrorsTaskReport.REPORT_KEY);
+    IngestionStatsAndErrorsTaskReportData reportData =
+        (IngestionStatsAndErrorsTaskReportData) ingestionStatsAndErrorsReport.getPayload();
+    RowIngestionMetersTotals totals = getTotalsFromBuildSegmentsRowStats(
+        reportData.getRowStats().get(RowIngestionMeters.BUILD_SEGMENTS)
+    );
+    if (includeUnparseable) {
+      List<ParseExceptionReport> taskUnparsebleEvents =
+          (List<ParseExceptionReport>) reportData.getUnparseableEvents().get(RowIngestionMeters.BUILD_SEGMENTS);
+      unparseableEvents.addAll(taskUnparsebleEvents);
+    }
+    return totals;
+  }
+
+  private RowIngestionMetersTotals getTotalsFromBuildSegmentsRowStats(Object buildSegmentsRowStats)
+  {
+    if (buildSegmentsRowStats instanceof RowIngestionMetersTotals) {
+      // This case is for unit tests. Normally when deserialized the row stats will apppear as a Map<String, Object>.
+      return (RowIngestionMetersTotals) buildSegmentsRowStats;
+    } else if (buildSegmentsRowStats instanceof Map) {
+      Map<String, Object> buildSegmentsRowStatsMap = (Map<String, Object>) buildSegmentsRowStats;
+      return new RowIngestionMetersTotals(
+          ((Number) buildSegmentsRowStatsMap.get("processed")).longValue(),
+          ((Number) buildSegmentsRowStatsMap.get("processedBytes")).longValue(),
+          ((Number) buildSegmentsRowStatsMap.get("processedWithError")).longValue(),
+          ((Number) buildSegmentsRowStatsMap.get("thrownAway")).longValue(),
+          ((Number) buildSegmentsRowStatsMap.get("unparseable")).longValue()
+      );
+    } else {
+      // should never happen
+      throw new RuntimeException("Unrecognized buildSegmentsRowStats type: " + buildSegmentsRowStats.getClass());

Review Comment:
   Good idea - done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] tejaswini-imply commented on pull request #13758: add ingested intervals to task report

Posted by "tejaswini-imply (via GitHub)" <gi...@apache.org>.
tejaswini-imply commented on PR #13758:
URL: https://github.com/apache/druid/pull/13758#issuecomment-1509116891

   Could you also please resolve the conflicts?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] tejaswini-imply commented on a diff in pull request #13758: add ingested intervals to task report

Posted by "tejaswini-imply (via GitHub)" <gi...@apache.org>.
tejaswini-imply commented on code in PR #13758:
URL: https://github.com/apache/druid/pull/13758#discussion_r1177630203


##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SequentialParallelIndexStatsReporter.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import org.apache.druid.indexing.common.task.IndexTask;
+
+import java.util.Collections;
+
+public class SequentialParallelIndexStatsReporter extends ParallelIndexStatsReporter
+{
+  @Override
+  ParallelIndexStats report(
+      ParallelIndexSupervisorTask task,
+      Object runner,
+      boolean includeUnparseable,
+      boolean full
+  )
+  {
+    IndexTask currentSequentialTask = (IndexTask) runner;
+    return new ParallelIndexStats(
+        currentSequentialTask.doGetRowStats(full),
+        currentSequentialTask.doGetUnparseableEvents(full),
+        Collections.emptySet()

Review Comment:
   Oh ok this is for live reports. My bad!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] danprince1 commented on a diff in pull request #13758: add ingested intervals to task report

Posted by "danprince1 (via GitHub)" <gi...@apache.org>.
danprince1 commented on code in PR #13758:
URL: https://github.com/apache/druid/pull/13758#discussion_r1154889307


##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java:
##########
@@ -534,7 +536,7 @@ public TaskStatus runTask(final TaskToolbox toolbox)
     catch (Exception e) {
       log.error(e, "Encountered exception in %s.", ingestionState);
       errorMsg = Throwables.getStackTraceAsString(e);
-      toolbox.getTaskReportFileWriter().write(getId(), getTaskCompletionReports());
+      toolbox.getTaskReportFileWriter().write(getId(), getTaskCompletionReports(Collections.emptyList()));

Review Comment:
   Good idea - done.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/MultiPhaseParallelIndexStatsReporter.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
+import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
+import org.apache.druid.indexing.common.TaskReport;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.segment.incremental.ParseExceptionReport;
+import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
+import org.apache.druid.segment.incremental.SimpleRowIngestionMeters;
+import org.joda.time.Interval;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class MultiPhaseParallelIndexStatsReporter extends ParallelIndexStatsReporter
+{
+  private static final Logger LOG = new Logger(MultiPhaseParallelIndexStatsReporter.class);
+
+  @Override
+  ParallelIndexStats report(
+      ParallelIndexSupervisorTask task,
+      Object runner,
+      boolean includeUnparseable,
+      String full
+  )
+  {
+    // use cached version if available
+    ParallelIndexStats cached = task.getIndexGenerateRowStats();
+    if (null != cached) {
+      return cached;
+    }
+
+    ParallelIndexTaskRunner<?, ?> currentRunner = (ParallelIndexTaskRunner<?, ?>) runner;
+    if (!currentRunner.getName().equals("partial segment generation")) {

Review Comment:
   Good idea - done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] danprince1 commented on a diff in pull request #13758: add ingested intervals to task report

Posted by "danprince1 (via GitHub)" <gi...@apache.org>.
danprince1 commented on code in PR #13758:
URL: https://github.com/apache/druid/pull/13758#discussion_r1174082032


##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SequentialParallelIndexStatsReporter.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import org.apache.druid.indexing.common.task.IndexTask;
+
+import java.util.Collections;
+
+public class SequentialParallelIndexStatsReporter extends ParallelIndexStatsReporter
+{
+  @Override
+  ParallelIndexStats report(
+      ParallelIndexSupervisorTask task,
+      Object runner,
+      boolean includeUnparseable,
+      boolean full
+  )
+  {
+    IndexTask currentSequentialTask = (IndexTask) runner;
+    return new ParallelIndexStats(
+        currentSequentialTask.doGetRowStats(full),
+        currentSequentialTask.doGetUnparseableEvents(full),
+        Collections.emptySet()

Review Comment:
   When `ParallelIndexSupervisorTask` is in sequential mode, it just creates an `IndexTask` and runs it.  And `IndexTask` already has logic to report intervals in its Completion report, as you saw above.  We just return row stats and unparseable events here for intermediate reports, before the task is done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] tejaswini-imply commented on pull request #13758: add ingested intervals to task report

Posted by "tejaswini-imply (via GitHub)" <gi...@apache.org>.
tejaswini-imply commented on PR #13758:
URL: https://github.com/apache/druid/pull/13758#issuecomment-1519879542

   Thanks for the changes and explanation @danprince1, Could you please look into failing checks?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] danprince1 commented on a diff in pull request #13758: add ingested intervals to task report

Posted by "danprince1 (via GitHub)" <gi...@apache.org>.
danprince1 commented on code in PR #13758:
URL: https://github.com/apache/druid/pull/13758#discussion_r1174037991


##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java:
##########
@@ -640,7 +643,7 @@ private Map<String, Object> getTaskCompletionRowStats()
    **
    * @return
    */
-  private Map<String, TaskReport> getTaskCompletionReports()
+  private Map<String, TaskReport> getTaskCompletionReports(List<Interval> ingestedIntervals)

Review Comment:
   See my comment above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] danprince1 commented on a diff in pull request #13758: add ingested intervals to task report

Posted by "danprince1 (via GitHub)" <gi...@apache.org>.
danprince1 commented on code in PR #13758:
URL: https://github.com/apache/druid/pull/13758#discussion_r1154890424


##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java:
##########
@@ -767,7 +770,8 @@ TaskStatus runHashPartitionMultiPhaseParallel(TaskToolbox toolbox) throws Except
       );
       return TaskStatus.failure(getId(), errMsg);
     }
-    indexGenerateRowStats = doGetRowStatsAndUnparseableEventsParallelMultiPhase(indexingRunner, true);
+
+    indexGenerateRowStats = new MultiPhaseParallelIndexStatsReporter().report(this, indexingRunner, true, "full");

Review Comment:
   Turns out this string 'full' parameter was pretty wonky to begin with, so I changed it to a boolean, which seemed to make much more sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] tejaswini-imply commented on a diff in pull request #13758: add ingested intervals to task report

Posted by "tejaswini-imply (via GitHub)" <gi...@apache.org>.
tejaswini-imply commented on code in PR #13758:
URL: https://github.com/apache/druid/pull/13758#discussion_r1177626070


##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java:
##########
@@ -573,7 +580,8 @@ private Map<String, TaskReport> getTaskCompletionReports()
                 getTaskCompletionRowStats(),
                 errorMsg,
                 segmentAvailabilityConfirmationCompleted,
-                segmentAvailabilityWaitTimeMs
+                segmentAvailabilityWaitTimeMs,
+                JodaUtils.condenseIntervals(intervals)

Review Comment:
   Yeah, that makes sense. I just went with that choice to be consistent. This works as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on pull request #13758: add ingested intervals to task report

Posted by "kfaraz (via GitHub)" <gi...@apache.org>.
kfaraz commented on PR #13758:
URL: https://github.com/apache/druid/pull/13758#issuecomment-1560458478

   @danprince1 , please let me know if you have any thoughts on the feedback. I was hoping we could close this soon.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] add ingested intervals to task report (druid)

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #13758:
URL: https://github.com/apache/druid/pull/13758#issuecomment-1926007165

   This pull request has been marked as stale due to 60 days of inactivity.
   It will be closed in 4 weeks if no further activity occurs. If you think
   that's incorrect or this pull request should instead be reviewed, please simply
   write any comment. Even if closed, you can still revive the PR at any time or
   discuss it on the dev@druid.apache.org list.
   Thank you for your contributions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] paul-rogers commented on pull request #13758: add ingested intervals to task report

Posted by "paul-rogers (via GitHub)" <gi...@apache.org>.
paul-rogers commented on PR #13758:
URL: https://github.com/apache/druid/pull/13758#issuecomment-1441110089

   Tests are failing due to insufficient branch coverage. Can you add some unit tests to cover the new code? Sometimes doing so is quite difficult. It that is the case here, please explain why so we can figure out a Plan B.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] danprince1 commented on pull request #13758: add ingested intervals to task report

Posted by "danprince1 (via GitHub)" <gi...@apache.org>.
danprince1 commented on PR #13758:
URL: https://github.com/apache/druid/pull/13758#issuecomment-1468734961

   @paul-rogers thanks so much for your review.  I'm about to go on vacation, so I won't get to a chance to address your suggestions for a while, but I really appreciate your looking at this and I will try to get back to this as soon as I can.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] tejaswini-imply commented on a diff in pull request #13758: add ingested intervals to task report

Posted by "tejaswini-imply (via GitHub)" <gi...@apache.org>.
tejaswini-imply commented on code in PR #13758:
URL: https://github.com/apache/druid/pull/13758#discussion_r1177628929


##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java:
##########
@@ -651,7 +654,8 @@ private Map<String, TaskReport> getTaskCompletionReports()
                 getTaskCompletionRowStats(),
                 errorMsg,
                 false, // not applicable for parallel subtask
-                segmentAvailabilityWaitTimeMs
+                segmentAvailabilityWaitTimeMs,
+                ingestedIntervals

Review Comment:
   Got it! Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a diff in pull request #13758: add ingested intervals to task report

Posted by "kfaraz (via GitHub)" <gi...@apache.org>.
kfaraz commented on code in PR #13758:
URL: https://github.com/apache/druid/pull/13758#discussion_r1183421389


##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexStats.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.java.util.common.Pair;
+import org.joda.time.Interval;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+public class ParallelIndexStats

Review Comment:
   Maybe rename to `ParallelIndexTaskStats` for clarity.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] danprince1 commented on pull request #13758: add ingested intervals to task report

Posted by "danprince1 (via GitHub)" <gi...@apache.org>.
danprince1 commented on PR #13758:
URL: https://github.com/apache/druid/pull/13758#issuecomment-1421168454

   > Thanks for your first PR, @danprince1 ! Could you please add a note in the PR description on the uses of `ingestedIntervals`?
   
   Hi @kfaraz, I added a section about potential uses to the description.  Let me know of any other suggestions - thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] vtlim commented on a diff in pull request #13758: add ingested intervals to task report

Posted by "vtlim (via GitHub)" <gi...@apache.org>.
vtlim commented on code in PR #13758:
URL: https://github.com/apache/druid/pull/13758#discussion_r1154926674


##########
docs/ingestion/tasks.md:
##########
@@ -98,6 +101,14 @@ For some task types, the indexing task can wait for the newly ingested segments
 |`segmentAvailabilityConfirmed`|Whether all segments generated by this ingestion task had been confirmed as available for queries in the cluster before the task completed.|
 |`segmentAvailabilityWaitTimeMs`|Milliseconds waited by the ingestion task for the newly ingested segments to be available for query after completing ingestion was completed.|
 
+#### Ingested Intervals

Review Comment:
   Docs look good. Only nit is to use sentence case for subheadings: `Ingested intervals`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] danprince1 commented on pull request #13758: add ingested intervals to task report

Posted by "danprince1 (via GitHub)" <gi...@apache.org>.
danprince1 commented on PR #13758:
URL: https://github.com/apache/druid/pull/13758#issuecomment-1492601211

   I ran coverage locally and added some tests until it passed, so hopefully that is resolved as well.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] danprince1 commented on pull request #13758: add ingested intervals to task report

Posted by "danprince1 (via GitHub)" <gi...@apache.org>.
danprince1 commented on PR #13758:
URL: https://github.com/apache/druid/pull/13758#issuecomment-1521683220

   > Thanks for the changes and explanation @danprince1, Could you please look into failing checks?
   
   Fixed!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] tejaswini-imply commented on a diff in pull request #13758: add ingested intervals to task report

Posted by "tejaswini-imply (via GitHub)" <gi...@apache.org>.
tejaswini-imply commented on code in PR #13758:
URL: https://github.com/apache/druid/pull/13758#discussion_r1167046898


##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java:
##########
@@ -573,7 +580,8 @@ private Map<String, TaskReport> getTaskCompletionReports()
                 getTaskCompletionRowStats(),
                 errorMsg,
                 segmentAvailabilityConfirmationCompleted,
-                segmentAvailabilityWaitTimeMs
+                segmentAvailabilityWaitTimeMs,
+                JodaUtils.condenseIntervals(intervals)

Review Comment:
   Rather taking intervals as input and condesing here, Could we create new method `getIngestedIntervals()` which has the logic to condense there and maybe we can maintain class wide variable `ingestedIntervals` like `buildSegmentsMeters`, `ingestionState`. IMO it'll be consistent with existing structure that way and we don't have to pass intervals everytime we write reports and overloading `getTaskCompletionReports()` wouldn't be necessary as well.



##########
integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/config/ClusterConfigTest.java:
##########
@@ -37,11 +38,12 @@
 public class ClusterConfigTest
 {
   @Test
+  @Ignore
   public void testYaml()
   {
     ClusterConfig config = ClusterConfig.loadFromResource("/config-test/test.yaml");
     // Uncomment this line to see the full config with includes resolved.
-    //System.out.println(config.resolveIncludes());
+    System.out.println(config.resolveIncludes());

Review Comment:
   Could you please remove this debug line.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java:
##########
@@ -651,7 +654,8 @@ private Map<String, TaskReport> getTaskCompletionReports()
                 getTaskCompletionRowStats(),
                 errorMsg,
                 false, // not applicable for parallel subtask
-                segmentAvailabilityWaitTimeMs
+                segmentAvailabilityWaitTimeMs,
+                ingestedIntervals

Review Comment:
   Do we not need to condense the intervals here?



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java:
##########
@@ -640,7 +643,7 @@ private Map<String, Object> getTaskCompletionRowStats()
    **
    * @return
    */
-  private Map<String, TaskReport> getTaskCompletionReports()
+  private Map<String, TaskReport> getTaskCompletionReports(List<Interval> ingestedIntervals)

Review Comment:
   Same comment applies here as suggested in `IndexTask` class.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java:
##########
@@ -695,7 +696,8 @@ private Map<String, TaskReport> getTaskCompletionReports()
                 getTaskCompletionRowStats(),
                 errorMsg,
                 segmentAvailabilityConfirmationCompleted,
-                segmentAvailabilityWaitTimeMs
+                segmentAvailabilityWaitTimeMs,
+                JodaUtils.condenseIntervals(intervals)

Review Comment:
   Same comments as in `IndexTask` class.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SequentialParallelIndexStatsReporter.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import org.apache.druid.indexing.common.task.IndexTask;
+
+import java.util.Collections;
+
+public class SequentialParallelIndexStatsReporter extends ParallelIndexStatsReporter
+{
+  @Override
+  ParallelIndexStats report(
+      ParallelIndexSupervisorTask task,
+      Object runner,
+      boolean includeUnparseable,
+      boolean full
+  )
+  {
+    IndexTask currentSequentialTask = (IndexTask) runner;
+    return new ParallelIndexStats(
+        currentSequentialTask.doGetRowStats(full),
+        currentSequentialTask.doGetUnparseableEvents(full),
+        Collections.emptySet()

Review Comment:
   Can you please elaborate why we are updating stats with empty intervals set here? Shouldn't we report intervals for sequential task as well?



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java:
##########
@@ -246,7 +249,8 @@ private Map<String, TaskReport> getTaskCompletionReports()
                 getTaskCompletionRowStats(),
                 "",
                 false, // not applicable for parallel subtask
-                segmentAvailabilityWaitTimeMs
+                segmentAvailabilityWaitTimeMs,
+                intervals

Review Comment:
   Do we not need to condense the intervals here?



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java:
##########
@@ -235,7 +238,7 @@ private List<DataSegment> generateSegments(
   /**
    * Generate an IngestionStatsAndErrorsTaskReport for the task.
    */
-  private Map<String, TaskReport> getTaskCompletionReports()
+  private Map<String, TaskReport> getTaskCompletionReports(List<Interval> intervals)

Review Comment:
   Same comment applies here as suggested in `IndexTask` class.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexStatsReporterFactory.java:
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
+
+public class ParallelIndexStatsReporterFactory
+{
+  ParallelIndexStatsReporter create(ParallelIndexSupervisorTask task)
+  {
+    if (task.isParallelMode()) {
+      if (AbstractBatchIndexTask.isGuaranteedRollup(

Review Comment:
   Maybe we could replace this with `task.isPerfectRollup()` for simplicity?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] danprince1 commented on a diff in pull request #13758: add ingested intervals to task report

Posted by "danprince1 (via GitHub)" <gi...@apache.org>.
danprince1 commented on code in PR #13758:
URL: https://github.com/apache/druid/pull/13758#discussion_r1174037715


##########
integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/config/ClusterConfigTest.java:
##########
@@ -37,11 +38,12 @@
 public class ClusterConfigTest
 {
   @Test
+  @Ignore
   public void testYaml()
   {
     ClusterConfig config = ClusterConfig.loadFromResource("/config-test/test.yaml");
     // Uncomment this line to see the full config with includes resolved.
-    //System.out.println(config.resolveIncludes());
+    System.out.println(config.resolveIncludes());

Review Comment:
   Good catch - I'll re-comment it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a diff in pull request #13758: add ingested intervals to task report

Posted by "kfaraz (via GitHub)" <gi...@apache.org>.
kfaraz commented on code in PR #13758:
URL: https://github.com/apache/druid/pull/13758#discussion_r1183402090


##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java:
##########
@@ -1040,8 +1048,12 @@ private TaskStatus generateAndPublishSegments(
         );
 
         log.debugSegments(published.getSegments(), "Published segments");
-
-        toolbox.getTaskReportFileWriter().write(getId(), getTaskCompletionReports());
+        List<Interval> ingestedIntervals = published.getSegments()
+                                                    .stream()
+                                                    .map(DataSegment::getInterval)
+                                                    .collect(Collectors.toList());
+        toolbox.getTaskReportFileWriter().write(

Review Comment:
   Druid formatting style suggestion for here and some other places:
   Try to fit method/constructor call in a single line, unless it is an exception. If you break, follow the guidelines below.
   
   https://github.com/apache/druid/blob/ad93635e4599adee2cf558cd1b9dbaaffbe9031e/codestyle/checkstyle.xml#L252-L276



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java:
##########
@@ -610,7 +610,8 @@ private Map<String, TaskReport> getTaskCompletionReports()
                 getTaskCompletionRowStats(),
                 errorMsg,
                 errorMsg == null,
-                0L
+                0L,
+                Collections.emptyList()

Review Comment:
   For realtime tasks, we should pass the `ingestedIntervals` as `null`. Passing empty can be misleading here.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexStats.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.java.util.common.Pair;
+import org.joda.time.Interval;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+public class ParallelIndexStats
+{
+  private final Map<String, Object> rowStats;
+  private final Map<String, Object> unparseableEvents;
+  private final Set<Interval> ingestedIntervals;
+
+  public ParallelIndexStats()
+  {
+    this.rowStats = ImmutableMap.of();
+    this.unparseableEvents = ImmutableMap.of();
+    this.ingestedIntervals = ImmutableSet.of();
+  }
+
+  public ParallelIndexStats(
+      Map<String, Object> rowStats,
+      Map<String, Object> unparseableEvents,
+      Set<Interval> ingestedIntervals
+  )
+  {
+    this.rowStats = rowStats;
+    this.unparseableEvents = unparseableEvents;
+    this.ingestedIntervals = ingestedIntervals;
+  }
+
+  public Pair<Map<String, Object>, Map<String, Object>> toRowStatsAndUnparseableEvents()
+  {
+    return Pair.of(rowStats, unparseableEvents);
+  }
+
+  public Map<String, Object> getRowStats()
+  {
+    return rowStats;
+  }
+
+  public Map<String, Object> getUnparseableEvents()
+  {
+    return unparseableEvents;
+  }
+
+  public Set<Interval> getIngestedIntervals()
+  {
+    return ingestedIntervals;
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    ParallelIndexStats that = (ParallelIndexStats) o;
+    return rowStats.equals(that.rowStats)
+           && unparseableEvents.equals(that.unparseableEvents)
+           && ingestedIntervals.equals(

Review Comment:
   Please stick to formatting style.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexStatsReporter.java:
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
+import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
+import org.apache.druid.indexing.common.TaskReport;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.segment.incremental.ParseExceptionReport;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
+import org.apache.druid.segment.incremental.SimpleRowIngestionMeters;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public abstract class ParallelIndexStatsReporter
+{
+  private static final Logger LOG = new Logger(ParallelIndexStatsReporter.class);
+
+  // Row ingestion meters fields
+  private static final String PROCESSED_FIELD = "processed";
+  private static final String PROCESSED_BYTES_FIELD = "processedBytes";
+  private static final String PROCESSED_WITH_ERROR_FIELD = "processedWithError";
+  private static final String THROWN_AWAY_FIELD = "thrownAway";
+  private static final String UNPARSEABLE_FIELD = "unparseable";
+  // Ingestion stats and errors report fields
+  private static final String INGESTION_STATS_AND_ERRORS_FIELD = "ingestionStatsAndErrors";
+  private static final String PAYLOAD_FIELD = "payload";
+  private static final String ROW_STATS_FIELD = "rowStats";
+  private static final String TOTALS_FIELD = "totals";
+  private static final String UNPARSEABLE_EVENTS_FIELD = "unparseableEvents";
+
+  abstract ParallelIndexStats report(

Review Comment:
   I took a look at this class and its sub-classes. I feel there is still some duplication between the single-phase and multi-phase implementations.
   
   For the overall design, here is what I am thinking:
   - We probably don't need multiple implementations and thus no need for the factory class either.
   - Have a single implementation `ParallelIndexTaskStatsReporter`.
   - Pass the task to its constructor.
   - `report` method (maybe renamed `reportStats` or `getStats` for less ambiguity) should take the following
      - set of running task ids
      - map of reports (the new interace `TaskReportContainer` should be of use here)
      - we shouldn't have to pass the task again here, neither should we pass the indexingRunner as a plain `Object`.
   - For parallel single-phase and multi-phase
     - the handling would be the same
   - For sequential
      - we shouldn't be using `ParallelIndexTaskStatsReporter` at all because it is sequential, not parallel
      - we should continue to do what the original code was doing
   



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java:
##########
@@ -1540,201 +1548,31 @@ public Response getCompleteSubTaskSpecAttemptHistory(
     }
   }
 
-  private RowIngestionMetersTotals getTotalsFromBuildSegmentsRowStats(Object buildSegmentsRowStats)
-  {
-    if (buildSegmentsRowStats instanceof RowIngestionMetersTotals) {
-      // This case is for unit tests. Normally when deserialized the row stats will apppear as a Map<String, Object>.
-      return (RowIngestionMetersTotals) buildSegmentsRowStats;
-    } else if (buildSegmentsRowStats instanceof Map) {
-      Map<String, Object> buildSegmentsRowStatsMap = (Map<String, Object>) buildSegmentsRowStats;
-      return new RowIngestionMetersTotals(
-          ((Number) buildSegmentsRowStatsMap.get("processed")).longValue(),
-          ((Number) buildSegmentsRowStatsMap.get("processedBytes")).longValue(),
-          ((Number) buildSegmentsRowStatsMap.get("processedWithError")).longValue(),
-          ((Number) buildSegmentsRowStatsMap.get("thrownAway")).longValue(),
-          ((Number) buildSegmentsRowStatsMap.get("unparseable")).longValue()
-      );
-    } else {
-      // should never happen
-      throw new RuntimeException("Unrecognized buildSegmentsRowStats type: " + buildSegmentsRowStats.getClass());
-    }
-  }
-
-  private Pair<Map<String, Object>, Map<String, Object>> doGetRowStatsAndUnparseableEventsParallelSinglePhase(
-      SinglePhaseParallelIndexTaskRunner parallelSinglePhaseRunner,
-      boolean includeUnparseable
-  )
-  {
-    final SimpleRowIngestionMeters buildSegmentsRowStats = new SimpleRowIngestionMeters();
-
-    List<ParseExceptionReport> unparseableEvents = new ArrayList<>();
-
-    // Get stats from completed tasks
-    Map<String, PushedSegmentsReport> completedSubtaskReports = parallelSinglePhaseRunner.getReports();
-    for (PushedSegmentsReport pushedSegmentsReport : completedSubtaskReports.values()) {
-      Map<String, TaskReport> taskReport = pushedSegmentsReport.getTaskReport();
-      if (taskReport == null || taskReport.isEmpty()) {
-        LOG.warn("Got an empty task report from subtask: " + pushedSegmentsReport.getTaskId());
-        continue;
-      }
-      RowIngestionMetersTotals rowIngestionMetersTotals =
-          getBuildSegmentsStatsFromTaskReport(taskReport, includeUnparseable, unparseableEvents);
-
-      buildSegmentsRowStats.addRowIngestionMetersTotals(rowIngestionMetersTotals);
-    }
-
-    RowIngestionMetersTotals rowStatsForRunningTasks = getRowStatsAndUnparseableEventsForRunningTasks(
-        parallelSinglePhaseRunner.getRunningTaskIds(),
-        unparseableEvents,
-        includeUnparseable
-    );
-    buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForRunningTasks);
-
-    return createStatsAndErrorsReport(buildSegmentsRowStats.getTotals(), unparseableEvents);
-  }
-
-  private Pair<Map<String, Object>, Map<String, Object>> doGetRowStatsAndUnparseableEventsParallelMultiPhase(
-      ParallelIndexTaskRunner<?, ?> currentRunner,
-      boolean includeUnparseable
-  )
-  {
-    if (indexGenerateRowStats != null) {
-      return Pair.of(indexGenerateRowStats.lhs, includeUnparseable ? indexGenerateRowStats.rhs : ImmutableMap.of());
-    } else if (!currentRunner.getName().equals("partial segment generation")) {
-      return Pair.of(ImmutableMap.of(), ImmutableMap.of());
-    } else {
-      Map<String, GeneratedPartitionsReport> completedSubtaskReports =
-          (Map<String, GeneratedPartitionsReport>) currentRunner.getReports();
-
-      final SimpleRowIngestionMeters buildSegmentsRowStats = new SimpleRowIngestionMeters();
-      final List<ParseExceptionReport> unparseableEvents = new ArrayList<>();
-      for (GeneratedPartitionsReport generatedPartitionsReport : completedSubtaskReports.values()) {
-        Map<String, TaskReport> taskReport = generatedPartitionsReport.getTaskReport();
-        if (taskReport == null || taskReport.isEmpty()) {
-          LOG.warn("Got an empty task report from subtask: " + generatedPartitionsReport.getTaskId());
-          continue;
-        }
-        RowIngestionMetersTotals rowStatsForCompletedTask =
-            getBuildSegmentsStatsFromTaskReport(taskReport, true, unparseableEvents);
-
-        buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForCompletedTask);
-      }
-
-      RowIngestionMetersTotals rowStatsForRunningTasks = getRowStatsAndUnparseableEventsForRunningTasks(
-          currentRunner.getRunningTaskIds(),
-          unparseableEvents,
-          includeUnparseable
-      );
-      buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForRunningTasks);
-
-      return createStatsAndErrorsReport(buildSegmentsRowStats.getTotals(), unparseableEvents);
-    }
-  }
-
-  private RowIngestionMetersTotals getRowStatsAndUnparseableEventsForRunningTasks(
-      Set<String> runningTaskIds,
-      List<ParseExceptionReport> unparseableEvents,
-      boolean includeUnparseable
-  )
+  @Nullable
+  Map<String, Object> fetchTaskReport(String taskId)
   {
-    final SimpleRowIngestionMeters buildSegmentsRowStats = new SimpleRowIngestionMeters();
-    for (String runningTaskId : runningTaskIds) {
-      try {
-        final Map<String, Object> report = getTaskReport(toolbox.getOverlordClient(), runningTaskId);
-
-        if (report == null || report.isEmpty()) {
-          // task does not have a running report yet
-          continue;
-        }
-
-        Map<String, Object> ingestionStatsAndErrors = (Map<String, Object>) report.get("ingestionStatsAndErrors");
-        Map<String, Object> payload = (Map<String, Object>) ingestionStatsAndErrors.get("payload");
-        Map<String, Object> rowStats = (Map<String, Object>) payload.get("rowStats");
-        Map<String, Object> totals = (Map<String, Object>) rowStats.get("totals");
-        Map<String, Object> buildSegments = (Map<String, Object>) totals.get(RowIngestionMeters.BUILD_SEGMENTS);
-
-        if (includeUnparseable) {
-          Map<String, Object> taskUnparseableEvents = (Map<String, Object>) payload.get("unparseableEvents");
-          List<ParseExceptionReport> buildSegmentsUnparseableEvents = (List<ParseExceptionReport>)
-              taskUnparseableEvents.get(RowIngestionMeters.BUILD_SEGMENTS);
-          unparseableEvents.addAll(buildSegmentsUnparseableEvents);
-        }
-
-        buildSegmentsRowStats.addRowIngestionMetersTotals(getTotalsFromBuildSegmentsRowStats(buildSegments));
-      }
-      catch (Exception e) {
-        LOG.warn(e, "Encountered exception when getting live subtask report for task: " + runningTaskId);
-      }
+    try {
+      return getTaskReport(toolbox.getOverlordClient(), taskId);
     }
-    return buildSegmentsRowStats.getTotals();
-  }
-
-  private Pair<Map<String, Object>, Map<String, Object>> createStatsAndErrorsReport(
-      RowIngestionMetersTotals rowStats,
-      List<ParseExceptionReport> unparseableEvents
-  )
-  {
-    Map<String, Object> rowStatsMap = new HashMap<>();
-    Map<String, Object> totalsMap = new HashMap<>();
-    totalsMap.put(RowIngestionMeters.BUILD_SEGMENTS, rowStats);
-    rowStatsMap.put("totals", totalsMap);
-
-    return Pair.of(rowStatsMap, ImmutableMap.of(RowIngestionMeters.BUILD_SEGMENTS, unparseableEvents));
-  }
-
-  private RowIngestionMetersTotals getBuildSegmentsStatsFromTaskReport(
-      Map<String, TaskReport> taskReport,
-      boolean includeUnparseable,
-      List<ParseExceptionReport> unparseableEvents)
-  {
-    IngestionStatsAndErrorsTaskReport ingestionStatsAndErrorsReport = (IngestionStatsAndErrorsTaskReport) taskReport.get(
-        IngestionStatsAndErrorsTaskReport.REPORT_KEY);
-    IngestionStatsAndErrorsTaskReportData reportData =
-        (IngestionStatsAndErrorsTaskReportData) ingestionStatsAndErrorsReport.getPayload();
-    RowIngestionMetersTotals totals = getTotalsFromBuildSegmentsRowStats(
-        reportData.getRowStats().get(RowIngestionMeters.BUILD_SEGMENTS)
-    );
-    if (includeUnparseable) {
-      List<ParseExceptionReport> taskUnparsebleEvents = (List<ParseExceptionReport>) reportData.getUnparseableEvents()
-                                                                    .get(RowIngestionMeters.BUILD_SEGMENTS);
-      unparseableEvents.addAll(taskUnparsebleEvents);
+    catch (Exception e) {
+      LOG.warn(e, "Encountered exception when getting live subtask report for task: " + taskId);
     }
-    return totals;
+    return null;
   }
 
-  private Pair<Map<String, Object>, Map<String, Object>> doGetRowStatsAndUnparseableEvents(
-      String full,
-      boolean includeUnparseable
-  )
+  private ParallelIndexStats doGetRowStatsAndUnparseableEvents(boolean full, boolean includeUnparseable)
   {
     if (currentSubTaskHolder == null) {
-      return Pair.of(ImmutableMap.of(), ImmutableMap.of());
+      return new ParallelIndexStats();
     }
 
     Object currentRunner = currentSubTaskHolder.getTask();
     if (currentRunner == null) {
-      return Pair.of(ImmutableMap.of(), ImmutableMap.of());
+      return new ParallelIndexStats();
     }
 
-    if (isParallelMode()) {

Review Comment:
   I think we can retain this if-else flow (but with some clean up as you have done).
   - for sequential, we will just return here
   - for single phase, we will create a `ParallelIndexStatsReporter` and do the computations
   - for multi phase, we will have a method in this class itself, which checks if it is already cached or not, otherwise, it constructs a `ParallelIndexStatsReporter`.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java:
##########
@@ -323,7 +324,7 @@ public TaskStatus runTask(TaskToolbox toolbox)
       }
 
       errorMsg = Throwables.getStackTraceAsString(effectiveException);
-      toolbox.getTaskReportFileWriter().write(getId(), getTaskCompletionReports());
+      toolbox.getTaskReportFileWriter().write(getId(), getTaskCompletionReports(Collections.emptyList()));

Review Comment:
   I think we should pass empty only when we are certain that the task didn't ingest any interval. Otherwise, we should use `null`. In the future, we might want to clarify which intervals were ingested before the task failed. This could help in debugging a failing task.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexStats.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.java.util.common.Pair;
+import org.joda.time.Interval;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+public class ParallelIndexStats

Review Comment:
   Maybe `ParallelIndexTaskStats` instead?



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexStatsReporter.java:
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
+import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
+import org.apache.druid.indexing.common.TaskReport;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.segment.incremental.ParseExceptionReport;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
+import org.apache.druid.segment.incremental.SimpleRowIngestionMeters;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public abstract class ParallelIndexStatsReporter

Review Comment:
   Please add a small javadoc.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java:
##########
@@ -1105,7 +1105,8 @@ private Map<String, TaskReport> getTaskCompletionReports(@Nullable String errorM
                 getTaskCompletionRowStats(),
                 errorMsg,
                 errorMsg == null,
-                handoffWaitMs
+                handoffWaitMs,
+                Collections.emptyList()

Review Comment:
   Streaming ingestion should set ingestedIntervals to null for the time being.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/MultiPhaseParallelIndexStatsReporter.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
+import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
+import org.apache.druid.indexing.common.TaskReport;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.segment.incremental.ParseExceptionReport;
+import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
+import org.apache.druid.segment.incremental.SimpleRowIngestionMeters;
+import org.joda.time.Interval;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class MultiPhaseParallelIndexStatsReporter extends ParallelIndexStatsReporter
+{
+  private static final Logger LOG = new Logger(MultiPhaseParallelIndexStatsReporter.class);
+
+  @Override
+  ParallelIndexStats report(
+      ParallelIndexSupervisorTask task,
+      Object runner,
+      boolean includeUnparseable,
+      boolean full
+  )
+  {
+    // use cached version if available
+    ParallelIndexStats cached = task.getIndexGenerateRowStats();
+    if (null != cached) {
+      return cached;
+    }
+
+    ParallelIndexTaskRunner<?, ?> currentRunner = (ParallelIndexTaskRunner<?, ?>) runner;
+    if (!InputSourceSplitParallelIndexTaskRunner.PARTIAL_SEGMENT_GENERATION_PHASE.equals(currentRunner.getName())) {
+      return new ParallelIndexStats();
+    }
+
+    Map<String, GeneratedPartitionsReport> completedSubtaskReports =
+        (Map<String, GeneratedPartitionsReport>) currentRunner.getReports();
+
+    final SimpleRowIngestionMeters buildSegmentsRowStats = new SimpleRowIngestionMeters();
+    final List<ParseExceptionReport> unparseableEvents = new ArrayList<>();
+    Set<Interval> intervalsIngested = new HashSet<>();
+    for (GeneratedPartitionsReport generatedPartitionsReport : completedSubtaskReports.values()) {
+      Map<String, TaskReport> taskReport = generatedPartitionsReport.getTaskReport();
+      if (taskReport == null || taskReport.isEmpty()) {
+        LOG.warn("Got an empty task report from subtask: " + generatedPartitionsReport.getTaskId());
+        continue;
+      }
+
+      IngestionStatsAndErrorsTaskReport iseReport =
+          (IngestionStatsAndErrorsTaskReport) taskReport.get(IngestionStatsAndErrorsTaskReport.REPORT_KEY);
+      IngestionStatsAndErrorsTaskReportData payload = (IngestionStatsAndErrorsTaskReportData) iseReport.getPayload();
+      intervalsIngested.addAll(payload.getIngestedIntervals());
+
+      RowIngestionMetersTotals rowStatsForCompletedTask =
+          getBuildSegmentsStatsFromTaskReport(taskReport, true, unparseableEvents);
+
+      buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForCompletedTask);
+    }
+
+    RowIngestionMetersTotals rowStatsForRunningTasks = getRowStatsAndUnparseableEventsForRunningTasks(
+        task,
+        currentRunner.getRunningTaskIds(),
+        unparseableEvents,
+        includeUnparseable
+    );
+    buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForRunningTasks);
+
+    Pair<Map<String, Object>, Map<String, Object>> report =
+        createStatsAndErrorsReport(buildSegmentsRowStats.getTotals(), unparseableEvents);
+
+    return new ParallelIndexStats(

Review Comment:
   See formatting comment.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java:
##########
@@ -1540,201 +1548,31 @@ public Response getCompleteSubTaskSpecAttemptHistory(
     }
   }
 
-  private RowIngestionMetersTotals getTotalsFromBuildSegmentsRowStats(Object buildSegmentsRowStats)
-  {
-    if (buildSegmentsRowStats instanceof RowIngestionMetersTotals) {
-      // This case is for unit tests. Normally when deserialized the row stats will apppear as a Map<String, Object>.
-      return (RowIngestionMetersTotals) buildSegmentsRowStats;
-    } else if (buildSegmentsRowStats instanceof Map) {
-      Map<String, Object> buildSegmentsRowStatsMap = (Map<String, Object>) buildSegmentsRowStats;
-      return new RowIngestionMetersTotals(
-          ((Number) buildSegmentsRowStatsMap.get("processed")).longValue(),
-          ((Number) buildSegmentsRowStatsMap.get("processedBytes")).longValue(),
-          ((Number) buildSegmentsRowStatsMap.get("processedWithError")).longValue(),
-          ((Number) buildSegmentsRowStatsMap.get("thrownAway")).longValue(),
-          ((Number) buildSegmentsRowStatsMap.get("unparseable")).longValue()
-      );
-    } else {
-      // should never happen
-      throw new RuntimeException("Unrecognized buildSegmentsRowStats type: " + buildSegmentsRowStats.getClass());
-    }
-  }
-
-  private Pair<Map<String, Object>, Map<String, Object>> doGetRowStatsAndUnparseableEventsParallelSinglePhase(
-      SinglePhaseParallelIndexTaskRunner parallelSinglePhaseRunner,
-      boolean includeUnparseable
-  )
-  {
-    final SimpleRowIngestionMeters buildSegmentsRowStats = new SimpleRowIngestionMeters();
-
-    List<ParseExceptionReport> unparseableEvents = new ArrayList<>();
-
-    // Get stats from completed tasks
-    Map<String, PushedSegmentsReport> completedSubtaskReports = parallelSinglePhaseRunner.getReports();
-    for (PushedSegmentsReport pushedSegmentsReport : completedSubtaskReports.values()) {
-      Map<String, TaskReport> taskReport = pushedSegmentsReport.getTaskReport();
-      if (taskReport == null || taskReport.isEmpty()) {
-        LOG.warn("Got an empty task report from subtask: " + pushedSegmentsReport.getTaskId());
-        continue;
-      }
-      RowIngestionMetersTotals rowIngestionMetersTotals =
-          getBuildSegmentsStatsFromTaskReport(taskReport, includeUnparseable, unparseableEvents);
-
-      buildSegmentsRowStats.addRowIngestionMetersTotals(rowIngestionMetersTotals);
-    }
-
-    RowIngestionMetersTotals rowStatsForRunningTasks = getRowStatsAndUnparseableEventsForRunningTasks(
-        parallelSinglePhaseRunner.getRunningTaskIds(),
-        unparseableEvents,
-        includeUnparseable
-    );
-    buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForRunningTasks);
-
-    return createStatsAndErrorsReport(buildSegmentsRowStats.getTotals(), unparseableEvents);
-  }
-
-  private Pair<Map<String, Object>, Map<String, Object>> doGetRowStatsAndUnparseableEventsParallelMultiPhase(
-      ParallelIndexTaskRunner<?, ?> currentRunner,
-      boolean includeUnparseable
-  )
-  {
-    if (indexGenerateRowStats != null) {
-      return Pair.of(indexGenerateRowStats.lhs, includeUnparseable ? indexGenerateRowStats.rhs : ImmutableMap.of());
-    } else if (!currentRunner.getName().equals("partial segment generation")) {
-      return Pair.of(ImmutableMap.of(), ImmutableMap.of());
-    } else {
-      Map<String, GeneratedPartitionsReport> completedSubtaskReports =
-          (Map<String, GeneratedPartitionsReport>) currentRunner.getReports();
-
-      final SimpleRowIngestionMeters buildSegmentsRowStats = new SimpleRowIngestionMeters();
-      final List<ParseExceptionReport> unparseableEvents = new ArrayList<>();
-      for (GeneratedPartitionsReport generatedPartitionsReport : completedSubtaskReports.values()) {
-        Map<String, TaskReport> taskReport = generatedPartitionsReport.getTaskReport();
-        if (taskReport == null || taskReport.isEmpty()) {
-          LOG.warn("Got an empty task report from subtask: " + generatedPartitionsReport.getTaskId());
-          continue;
-        }
-        RowIngestionMetersTotals rowStatsForCompletedTask =
-            getBuildSegmentsStatsFromTaskReport(taskReport, true, unparseableEvents);
-
-        buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForCompletedTask);
-      }
-
-      RowIngestionMetersTotals rowStatsForRunningTasks = getRowStatsAndUnparseableEventsForRunningTasks(
-          currentRunner.getRunningTaskIds(),
-          unparseableEvents,
-          includeUnparseable
-      );
-      buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForRunningTasks);
-
-      return createStatsAndErrorsReport(buildSegmentsRowStats.getTotals(), unparseableEvents);
-    }
-  }
-
-  private RowIngestionMetersTotals getRowStatsAndUnparseableEventsForRunningTasks(
-      Set<String> runningTaskIds,
-      List<ParseExceptionReport> unparseableEvents,
-      boolean includeUnparseable
-  )
+  @Nullable
+  Map<String, Object> fetchTaskReport(String taskId)
   {
-    final SimpleRowIngestionMeters buildSegmentsRowStats = new SimpleRowIngestionMeters();

Review Comment:
   Thanks for moving all of this logic to the new class! Seems much cleaner now.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java:
##########
@@ -591,7 +595,7 @@ private static boolean useRangePartitions(ParallelIndexTuningConfig tuningConfig
     return tuningConfig.getGivenOrDefaultPartitionsSpec() instanceof DimensionRangePartitionsSpec;
   }
 
-  private boolean isParallelMode()
+  boolean isParallelMode()
   {
     return isParallelMode(baseInputSource, ingestionSchema.getTuningConfig());

Review Comment:
   Seems like this should be computed just once when this task is instantiated and assigned to a final field. But we need not do it in this PR.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java:
##########
@@ -246,7 +249,8 @@ private Map<String, TaskReport> getTaskCompletionReports()
                 getTaskCompletionRowStats(),
                 "",
                 false, // not applicable for parallel subtask
-                segmentAvailabilityWaitTimeMs
+                segmentAvailabilityWaitTimeMs,
+                intervals

Review Comment:
   @danprince1 , condensing intervals would still help in reducing the payload size of the report sent from sub-task to parent task.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/MultiPhaseParallelIndexStatsReporter.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
+import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
+import org.apache.druid.indexing.common.TaskReport;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.segment.incremental.ParseExceptionReport;
+import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
+import org.apache.druid.segment.incremental.SimpleRowIngestionMeters;
+import org.joda.time.Interval;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class MultiPhaseParallelIndexStatsReporter extends ParallelIndexStatsReporter
+{
+  private static final Logger LOG = new Logger(MultiPhaseParallelIndexStatsReporter.class);
+
+  @Override
+  ParallelIndexStats report(
+      ParallelIndexSupervisorTask task,
+      Object runner,
+      boolean includeUnparseable,
+      boolean full
+  )
+  {
+    // use cached version if available
+    ParallelIndexStats cached = task.getIndexGenerateRowStats();
+    if (null != cached) {
+      return cached;
+    }
+
+    ParallelIndexTaskRunner<?, ?> currentRunner = (ParallelIndexTaskRunner<?, ?>) runner;
+    if (!InputSourceSplitParallelIndexTaskRunner.PARTIAL_SEGMENT_GENERATION_PHASE.equals(currentRunner.getName())) {
+      return new ParallelIndexStats();
+    }

Review Comment:
   Ideally, this logic should remain in the calling task itself. The task should call the report method only when we want the computation to happen.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a diff in pull request #13758: add ingested intervals to task report

Posted by "kfaraz (via GitHub)" <gi...@apache.org>.
kfaraz commented on code in PR #13758:
URL: https://github.com/apache/druid/pull/13758#discussion_r1183522014


##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java:
##########
@@ -1540,201 +1548,31 @@ public Response getCompleteSubTaskSpecAttemptHistory(
     }
   }
 
-  private RowIngestionMetersTotals getTotalsFromBuildSegmentsRowStats(Object buildSegmentsRowStats)
-  {
-    if (buildSegmentsRowStats instanceof RowIngestionMetersTotals) {
-      // This case is for unit tests. Normally when deserialized the row stats will apppear as a Map<String, Object>.
-      return (RowIngestionMetersTotals) buildSegmentsRowStats;
-    } else if (buildSegmentsRowStats instanceof Map) {
-      Map<String, Object> buildSegmentsRowStatsMap = (Map<String, Object>) buildSegmentsRowStats;
-      return new RowIngestionMetersTotals(
-          ((Number) buildSegmentsRowStatsMap.get("processed")).longValue(),
-          ((Number) buildSegmentsRowStatsMap.get("processedBytes")).longValue(),
-          ((Number) buildSegmentsRowStatsMap.get("processedWithError")).longValue(),
-          ((Number) buildSegmentsRowStatsMap.get("thrownAway")).longValue(),
-          ((Number) buildSegmentsRowStatsMap.get("unparseable")).longValue()
-      );
-    } else {
-      // should never happen
-      throw new RuntimeException("Unrecognized buildSegmentsRowStats type: " + buildSegmentsRowStats.getClass());
-    }
-  }
-
-  private Pair<Map<String, Object>, Map<String, Object>> doGetRowStatsAndUnparseableEventsParallelSinglePhase(
-      SinglePhaseParallelIndexTaskRunner parallelSinglePhaseRunner,
-      boolean includeUnparseable
-  )
-  {
-    final SimpleRowIngestionMeters buildSegmentsRowStats = new SimpleRowIngestionMeters();
-
-    List<ParseExceptionReport> unparseableEvents = new ArrayList<>();
-
-    // Get stats from completed tasks
-    Map<String, PushedSegmentsReport> completedSubtaskReports = parallelSinglePhaseRunner.getReports();
-    for (PushedSegmentsReport pushedSegmentsReport : completedSubtaskReports.values()) {
-      Map<String, TaskReport> taskReport = pushedSegmentsReport.getTaskReport();
-      if (taskReport == null || taskReport.isEmpty()) {
-        LOG.warn("Got an empty task report from subtask: " + pushedSegmentsReport.getTaskId());
-        continue;
-      }
-      RowIngestionMetersTotals rowIngestionMetersTotals =
-          getBuildSegmentsStatsFromTaskReport(taskReport, includeUnparseable, unparseableEvents);
-
-      buildSegmentsRowStats.addRowIngestionMetersTotals(rowIngestionMetersTotals);
-    }
-
-    RowIngestionMetersTotals rowStatsForRunningTasks = getRowStatsAndUnparseableEventsForRunningTasks(
-        parallelSinglePhaseRunner.getRunningTaskIds(),
-        unparseableEvents,
-        includeUnparseable
-    );
-    buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForRunningTasks);
-
-    return createStatsAndErrorsReport(buildSegmentsRowStats.getTotals(), unparseableEvents);
-  }
-
-  private Pair<Map<String, Object>, Map<String, Object>> doGetRowStatsAndUnparseableEventsParallelMultiPhase(
-      ParallelIndexTaskRunner<?, ?> currentRunner,
-      boolean includeUnparseable
-  )
-  {
-    if (indexGenerateRowStats != null) {
-      return Pair.of(indexGenerateRowStats.lhs, includeUnparseable ? indexGenerateRowStats.rhs : ImmutableMap.of());
-    } else if (!currentRunner.getName().equals("partial segment generation")) {
-      return Pair.of(ImmutableMap.of(), ImmutableMap.of());
-    } else {
-      Map<String, GeneratedPartitionsReport> completedSubtaskReports =
-          (Map<String, GeneratedPartitionsReport>) currentRunner.getReports();
-
-      final SimpleRowIngestionMeters buildSegmentsRowStats = new SimpleRowIngestionMeters();
-      final List<ParseExceptionReport> unparseableEvents = new ArrayList<>();
-      for (GeneratedPartitionsReport generatedPartitionsReport : completedSubtaskReports.values()) {
-        Map<String, TaskReport> taskReport = generatedPartitionsReport.getTaskReport();
-        if (taskReport == null || taskReport.isEmpty()) {
-          LOG.warn("Got an empty task report from subtask: " + generatedPartitionsReport.getTaskId());
-          continue;
-        }
-        RowIngestionMetersTotals rowStatsForCompletedTask =
-            getBuildSegmentsStatsFromTaskReport(taskReport, true, unparseableEvents);
-
-        buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForCompletedTask);
-      }
-
-      RowIngestionMetersTotals rowStatsForRunningTasks = getRowStatsAndUnparseableEventsForRunningTasks(
-          currentRunner.getRunningTaskIds(),
-          unparseableEvents,
-          includeUnparseable
-      );
-      buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForRunningTasks);
-
-      return createStatsAndErrorsReport(buildSegmentsRowStats.getTotals(), unparseableEvents);
-    }
-  }
-
-  private RowIngestionMetersTotals getRowStatsAndUnparseableEventsForRunningTasks(
-      Set<String> runningTaskIds,
-      List<ParseExceptionReport> unparseableEvents,
-      boolean includeUnparseable
-  )
+  @Nullable
+  Map<String, Object> fetchTaskReport(String taskId)
   {
-    final SimpleRowIngestionMeters buildSegmentsRowStats = new SimpleRowIngestionMeters();
-    for (String runningTaskId : runningTaskIds) {
-      try {
-        final Map<String, Object> report = getTaskReport(toolbox.getOverlordClient(), runningTaskId);
-
-        if (report == null || report.isEmpty()) {
-          // task does not have a running report yet
-          continue;
-        }
-
-        Map<String, Object> ingestionStatsAndErrors = (Map<String, Object>) report.get("ingestionStatsAndErrors");
-        Map<String, Object> payload = (Map<String, Object>) ingestionStatsAndErrors.get("payload");
-        Map<String, Object> rowStats = (Map<String, Object>) payload.get("rowStats");
-        Map<String, Object> totals = (Map<String, Object>) rowStats.get("totals");
-        Map<String, Object> buildSegments = (Map<String, Object>) totals.get(RowIngestionMeters.BUILD_SEGMENTS);
-
-        if (includeUnparseable) {
-          Map<String, Object> taskUnparseableEvents = (Map<String, Object>) payload.get("unparseableEvents");
-          List<ParseExceptionReport> buildSegmentsUnparseableEvents = (List<ParseExceptionReport>)
-              taskUnparseableEvents.get(RowIngestionMeters.BUILD_SEGMENTS);
-          unparseableEvents.addAll(buildSegmentsUnparseableEvents);
-        }
-
-        buildSegmentsRowStats.addRowIngestionMetersTotals(getTotalsFromBuildSegmentsRowStats(buildSegments));
-      }
-      catch (Exception e) {
-        LOG.warn(e, "Encountered exception when getting live subtask report for task: " + runningTaskId);
-      }
+    try {
+      return getTaskReport(toolbox.getOverlordClient(), taskId);
     }
-    return buildSegmentsRowStats.getTotals();
-  }
-
-  private Pair<Map<String, Object>, Map<String, Object>> createStatsAndErrorsReport(
-      RowIngestionMetersTotals rowStats,
-      List<ParseExceptionReport> unparseableEvents
-  )
-  {
-    Map<String, Object> rowStatsMap = new HashMap<>();
-    Map<String, Object> totalsMap = new HashMap<>();
-    totalsMap.put(RowIngestionMeters.BUILD_SEGMENTS, rowStats);
-    rowStatsMap.put("totals", totalsMap);
-
-    return Pair.of(rowStatsMap, ImmutableMap.of(RowIngestionMeters.BUILD_SEGMENTS, unparseableEvents));
-  }
-
-  private RowIngestionMetersTotals getBuildSegmentsStatsFromTaskReport(
-      Map<String, TaskReport> taskReport,
-      boolean includeUnparseable,
-      List<ParseExceptionReport> unparseableEvents)
-  {
-    IngestionStatsAndErrorsTaskReport ingestionStatsAndErrorsReport = (IngestionStatsAndErrorsTaskReport) taskReport.get(
-        IngestionStatsAndErrorsTaskReport.REPORT_KEY);
-    IngestionStatsAndErrorsTaskReportData reportData =
-        (IngestionStatsAndErrorsTaskReportData) ingestionStatsAndErrorsReport.getPayload();
-    RowIngestionMetersTotals totals = getTotalsFromBuildSegmentsRowStats(
-        reportData.getRowStats().get(RowIngestionMeters.BUILD_SEGMENTS)
-    );
-    if (includeUnparseable) {
-      List<ParseExceptionReport> taskUnparsebleEvents = (List<ParseExceptionReport>) reportData.getUnparseableEvents()
-                                                                    .get(RowIngestionMeters.BUILD_SEGMENTS);
-      unparseableEvents.addAll(taskUnparsebleEvents);
+    catch (Exception e) {
+      LOG.warn(e, "Encountered exception when getting live subtask report for task: " + taskId);
     }
-    return totals;
+    return null;
   }
 
-  private Pair<Map<String, Object>, Map<String, Object>> doGetRowStatsAndUnparseableEvents(
-      String full,
-      boolean includeUnparseable
-  )
+  private ParallelIndexStats doGetRowStatsAndUnparseableEvents(boolean full, boolean includeUnparseable)
   {
     if (currentSubTaskHolder == null) {
-      return Pair.of(ImmutableMap.of(), ImmutableMap.of());
+      return new ParallelIndexStats();
     }
 
     Object currentRunner = currentSubTaskHolder.getTask();
     if (currentRunner == null) {
-      return Pair.of(ImmutableMap.of(), ImmutableMap.of());
+      return new ParallelIndexStats();
     }
 
-    if (isParallelMode()) {

Review Comment:
   I think we can retain this if-else flow (but with some of the clean up that you have done).
   - for sequential, we will just create the `ParallelIndexTaskStats` object here itself.
   - for single phase, we will create a `ParallelIndexStatsReporter` and do the computations
   - for multi phase, we will call a method, which checks if the stats are already cached and if the current phase is the correct one. If not, it constructs a `ParallelIndexStatsReporter`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a diff in pull request #13758: add ingested intervals to task report

Posted by "kfaraz (via GitHub)" <gi...@apache.org>.
kfaraz commented on code in PR #13758:
URL: https://github.com/apache/druid/pull/13758#discussion_r1183423168


##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexStats.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.java.util.common.Pair;
+import org.joda.time.Interval;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+public class ParallelIndexStats
+{
+  private final Map<String, Object> rowStats;
+  private final Map<String, Object> unparseableEvents;
+  private final Set<Interval> ingestedIntervals;
+
+  public ParallelIndexStats()
+  {
+    this.rowStats = ImmutableMap.of();
+    this.unparseableEvents = ImmutableMap.of();
+    this.ingestedIntervals = ImmutableSet.of();
+  }
+
+  public ParallelIndexStats(
+      Map<String, Object> rowStats,
+      Map<String, Object> unparseableEvents,
+      Set<Interval> ingestedIntervals
+  )
+  {
+    this.rowStats = rowStats;
+    this.unparseableEvents = unparseableEvents;
+    this.ingestedIntervals = ingestedIntervals;
+  }
+
+  public Pair<Map<String, Object>, Map<String, Object>> toRowStatsAndUnparseableEvents()
+  {
+    return Pair.of(rowStats, unparseableEvents);
+  }
+
+  public Map<String, Object> getRowStats()
+  {
+    return rowStats;
+  }
+
+  public Map<String, Object> getUnparseableEvents()
+  {
+    return unparseableEvents;
+  }
+
+  public Set<Interval> getIngestedIntervals()
+  {
+    return ingestedIntervals;
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    ParallelIndexStats that = (ParallelIndexStats) o;
+    return rowStats.equals(that.rowStats)
+           && unparseableEvents.equals(that.unparseableEvents)
+           && ingestedIntervals.equals(

Review Comment:
   Please use the Druid formatting style.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on pull request #13758: add ingested intervals to task report

Posted by "kfaraz (via GitHub)" <gi...@apache.org>.
kfaraz commented on PR #13758:
URL: https://github.com/apache/druid/pull/13758#issuecomment-1420147831

   Thanks for your first PR, @danprince1 !
   Could you please add a note in the PR description on the uses of `ingestedIntervals`?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] danprince1 commented on a diff in pull request #13758: add ingested intervals to task report

Posted by "danprince1 (via GitHub)" <gi...@apache.org>.
danprince1 commented on code in PR #13758:
URL: https://github.com/apache/druid/pull/13758#discussion_r1154889897


##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexStatsReporter.java:
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
+import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
+import org.apache.druid.indexing.common.TaskReport;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.segment.incremental.ParseExceptionReport;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
+import org.apache.druid.segment.incremental.SimpleRowIngestionMeters;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public abstract class ParallelIndexStatsReporter
+{
+  private static final Logger LOG = new Logger(ParallelIndexStatsReporter.class);
+
+  abstract ParallelIndexStats report(
+      ParallelIndexSupervisorTask task,
+      Object runner,
+      boolean includeUnparseable,
+      String full
+  );
+
+  protected RowIngestionMetersTotals getBuildSegmentsStatsFromTaskReport(
+      Map<String, TaskReport> taskReport,
+      boolean includeUnparseable,
+      List<ParseExceptionReport> unparseableEvents
+  )
+  {
+    IngestionStatsAndErrorsTaskReport ingestionStatsAndErrorsReport =
+        (IngestionStatsAndErrorsTaskReport) taskReport.get(
+            IngestionStatsAndErrorsTaskReport.REPORT_KEY);
+    IngestionStatsAndErrorsTaskReportData reportData =
+        (IngestionStatsAndErrorsTaskReportData) ingestionStatsAndErrorsReport.getPayload();
+    RowIngestionMetersTotals totals = getTotalsFromBuildSegmentsRowStats(
+        reportData.getRowStats().get(RowIngestionMeters.BUILD_SEGMENTS)
+    );
+    if (includeUnparseable) {
+      List<ParseExceptionReport> taskUnparsebleEvents =
+          (List<ParseExceptionReport>) reportData.getUnparseableEvents().get(RowIngestionMeters.BUILD_SEGMENTS);
+      unparseableEvents.addAll(taskUnparsebleEvents);
+    }
+    return totals;
+  }
+
+  private RowIngestionMetersTotals getTotalsFromBuildSegmentsRowStats(Object buildSegmentsRowStats)
+  {
+    if (buildSegmentsRowStats instanceof RowIngestionMetersTotals) {
+      // This case is for unit tests. Normally when deserialized the row stats will apppear as a Map<String, Object>.
+      return (RowIngestionMetersTotals) buildSegmentsRowStats;
+    } else if (buildSegmentsRowStats instanceof Map) {
+      Map<String, Object> buildSegmentsRowStatsMap = (Map<String, Object>) buildSegmentsRowStats;
+      return new RowIngestionMetersTotals(
+          ((Number) buildSegmentsRowStatsMap.get("processed")).longValue(),
+          ((Number) buildSegmentsRowStatsMap.get("processedBytes")).longValue(),
+          ((Number) buildSegmentsRowStatsMap.get("processedWithError")).longValue(),
+          ((Number) buildSegmentsRowStatsMap.get("thrownAway")).longValue(),
+          ((Number) buildSegmentsRowStatsMap.get("unparseable")).longValue()
+      );
+    } else {
+      // should never happen
+      throw new RuntimeException("Unrecognized buildSegmentsRowStats type: " + buildSegmentsRowStats.getClass());
+    }
+  }
+
+  protected RowIngestionMetersTotals getRowStatsAndUnparseableEventsForRunningTasks(
+      ParallelIndexSupervisorTask task,
+      Set<String> runningTaskIds,
+      List<ParseExceptionReport> unparseableEvents,
+      boolean includeUnparseable
+  )
+  {
+    final SimpleRowIngestionMeters buildSegmentsRowStats = new SimpleRowIngestionMeters();
+    for (String runningTaskId : runningTaskIds) {
+      try {
+        final Map<String, Object> report = task.fetchTaskReport(runningTaskId);
+        if (report == null || report.isEmpty()) {
+          // task does not have a running report yet
+          continue;
+        }
+
+        Map<String, Object> ingestionStatsAndErrors = (Map<String, Object>) report.get("ingestionStatsAndErrors");
+        Map<String, Object> payload = (Map<String, Object>) ingestionStatsAndErrors.get("payload");
+        Map<String, Object> rowStats = (Map<String, Object>) payload.get("rowStats");
+        Map<String, Object> totals = (Map<String, Object>) rowStats.get("totals");

Review Comment:
   Good idea - done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a diff in pull request #13758: add ingested intervals to task report

Posted by "kfaraz (via GitHub)" <gi...@apache.org>.
kfaraz commented on code in PR #13758:
URL: https://github.com/apache/druid/pull/13758#discussion_r1183522014


##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java:
##########
@@ -1540,201 +1548,31 @@ public Response getCompleteSubTaskSpecAttemptHistory(
     }
   }
 
-  private RowIngestionMetersTotals getTotalsFromBuildSegmentsRowStats(Object buildSegmentsRowStats)
-  {
-    if (buildSegmentsRowStats instanceof RowIngestionMetersTotals) {
-      // This case is for unit tests. Normally when deserialized the row stats will apppear as a Map<String, Object>.
-      return (RowIngestionMetersTotals) buildSegmentsRowStats;
-    } else if (buildSegmentsRowStats instanceof Map) {
-      Map<String, Object> buildSegmentsRowStatsMap = (Map<String, Object>) buildSegmentsRowStats;
-      return new RowIngestionMetersTotals(
-          ((Number) buildSegmentsRowStatsMap.get("processed")).longValue(),
-          ((Number) buildSegmentsRowStatsMap.get("processedBytes")).longValue(),
-          ((Number) buildSegmentsRowStatsMap.get("processedWithError")).longValue(),
-          ((Number) buildSegmentsRowStatsMap.get("thrownAway")).longValue(),
-          ((Number) buildSegmentsRowStatsMap.get("unparseable")).longValue()
-      );
-    } else {
-      // should never happen
-      throw new RuntimeException("Unrecognized buildSegmentsRowStats type: " + buildSegmentsRowStats.getClass());
-    }
-  }
-
-  private Pair<Map<String, Object>, Map<String, Object>> doGetRowStatsAndUnparseableEventsParallelSinglePhase(
-      SinglePhaseParallelIndexTaskRunner parallelSinglePhaseRunner,
-      boolean includeUnparseable
-  )
-  {
-    final SimpleRowIngestionMeters buildSegmentsRowStats = new SimpleRowIngestionMeters();
-
-    List<ParseExceptionReport> unparseableEvents = new ArrayList<>();
-
-    // Get stats from completed tasks
-    Map<String, PushedSegmentsReport> completedSubtaskReports = parallelSinglePhaseRunner.getReports();
-    for (PushedSegmentsReport pushedSegmentsReport : completedSubtaskReports.values()) {
-      Map<String, TaskReport> taskReport = pushedSegmentsReport.getTaskReport();
-      if (taskReport == null || taskReport.isEmpty()) {
-        LOG.warn("Got an empty task report from subtask: " + pushedSegmentsReport.getTaskId());
-        continue;
-      }
-      RowIngestionMetersTotals rowIngestionMetersTotals =
-          getBuildSegmentsStatsFromTaskReport(taskReport, includeUnparseable, unparseableEvents);
-
-      buildSegmentsRowStats.addRowIngestionMetersTotals(rowIngestionMetersTotals);
-    }
-
-    RowIngestionMetersTotals rowStatsForRunningTasks = getRowStatsAndUnparseableEventsForRunningTasks(
-        parallelSinglePhaseRunner.getRunningTaskIds(),
-        unparseableEvents,
-        includeUnparseable
-    );
-    buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForRunningTasks);
-
-    return createStatsAndErrorsReport(buildSegmentsRowStats.getTotals(), unparseableEvents);
-  }
-
-  private Pair<Map<String, Object>, Map<String, Object>> doGetRowStatsAndUnparseableEventsParallelMultiPhase(
-      ParallelIndexTaskRunner<?, ?> currentRunner,
-      boolean includeUnparseable
-  )
-  {
-    if (indexGenerateRowStats != null) {
-      return Pair.of(indexGenerateRowStats.lhs, includeUnparseable ? indexGenerateRowStats.rhs : ImmutableMap.of());
-    } else if (!currentRunner.getName().equals("partial segment generation")) {
-      return Pair.of(ImmutableMap.of(), ImmutableMap.of());
-    } else {
-      Map<String, GeneratedPartitionsReport> completedSubtaskReports =
-          (Map<String, GeneratedPartitionsReport>) currentRunner.getReports();
-
-      final SimpleRowIngestionMeters buildSegmentsRowStats = new SimpleRowIngestionMeters();
-      final List<ParseExceptionReport> unparseableEvents = new ArrayList<>();
-      for (GeneratedPartitionsReport generatedPartitionsReport : completedSubtaskReports.values()) {
-        Map<String, TaskReport> taskReport = generatedPartitionsReport.getTaskReport();
-        if (taskReport == null || taskReport.isEmpty()) {
-          LOG.warn("Got an empty task report from subtask: " + generatedPartitionsReport.getTaskId());
-          continue;
-        }
-        RowIngestionMetersTotals rowStatsForCompletedTask =
-            getBuildSegmentsStatsFromTaskReport(taskReport, true, unparseableEvents);
-
-        buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForCompletedTask);
-      }
-
-      RowIngestionMetersTotals rowStatsForRunningTasks = getRowStatsAndUnparseableEventsForRunningTasks(
-          currentRunner.getRunningTaskIds(),
-          unparseableEvents,
-          includeUnparseable
-      );
-      buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForRunningTasks);
-
-      return createStatsAndErrorsReport(buildSegmentsRowStats.getTotals(), unparseableEvents);
-    }
-  }
-
-  private RowIngestionMetersTotals getRowStatsAndUnparseableEventsForRunningTasks(
-      Set<String> runningTaskIds,
-      List<ParseExceptionReport> unparseableEvents,
-      boolean includeUnparseable
-  )
+  @Nullable
+  Map<String, Object> fetchTaskReport(String taskId)
   {
-    final SimpleRowIngestionMeters buildSegmentsRowStats = new SimpleRowIngestionMeters();
-    for (String runningTaskId : runningTaskIds) {
-      try {
-        final Map<String, Object> report = getTaskReport(toolbox.getOverlordClient(), runningTaskId);
-
-        if (report == null || report.isEmpty()) {
-          // task does not have a running report yet
-          continue;
-        }
-
-        Map<String, Object> ingestionStatsAndErrors = (Map<String, Object>) report.get("ingestionStatsAndErrors");
-        Map<String, Object> payload = (Map<String, Object>) ingestionStatsAndErrors.get("payload");
-        Map<String, Object> rowStats = (Map<String, Object>) payload.get("rowStats");
-        Map<String, Object> totals = (Map<String, Object>) rowStats.get("totals");
-        Map<String, Object> buildSegments = (Map<String, Object>) totals.get(RowIngestionMeters.BUILD_SEGMENTS);
-
-        if (includeUnparseable) {
-          Map<String, Object> taskUnparseableEvents = (Map<String, Object>) payload.get("unparseableEvents");
-          List<ParseExceptionReport> buildSegmentsUnparseableEvents = (List<ParseExceptionReport>)
-              taskUnparseableEvents.get(RowIngestionMeters.BUILD_SEGMENTS);
-          unparseableEvents.addAll(buildSegmentsUnparseableEvents);
-        }
-
-        buildSegmentsRowStats.addRowIngestionMetersTotals(getTotalsFromBuildSegmentsRowStats(buildSegments));
-      }
-      catch (Exception e) {
-        LOG.warn(e, "Encountered exception when getting live subtask report for task: " + runningTaskId);
-      }
+    try {
+      return getTaskReport(toolbox.getOverlordClient(), taskId);
     }
-    return buildSegmentsRowStats.getTotals();
-  }
-
-  private Pair<Map<String, Object>, Map<String, Object>> createStatsAndErrorsReport(
-      RowIngestionMetersTotals rowStats,
-      List<ParseExceptionReport> unparseableEvents
-  )
-  {
-    Map<String, Object> rowStatsMap = new HashMap<>();
-    Map<String, Object> totalsMap = new HashMap<>();
-    totalsMap.put(RowIngestionMeters.BUILD_SEGMENTS, rowStats);
-    rowStatsMap.put("totals", totalsMap);
-
-    return Pair.of(rowStatsMap, ImmutableMap.of(RowIngestionMeters.BUILD_SEGMENTS, unparseableEvents));
-  }
-
-  private RowIngestionMetersTotals getBuildSegmentsStatsFromTaskReport(
-      Map<String, TaskReport> taskReport,
-      boolean includeUnparseable,
-      List<ParseExceptionReport> unparseableEvents)
-  {
-    IngestionStatsAndErrorsTaskReport ingestionStatsAndErrorsReport = (IngestionStatsAndErrorsTaskReport) taskReport.get(
-        IngestionStatsAndErrorsTaskReport.REPORT_KEY);
-    IngestionStatsAndErrorsTaskReportData reportData =
-        (IngestionStatsAndErrorsTaskReportData) ingestionStatsAndErrorsReport.getPayload();
-    RowIngestionMetersTotals totals = getTotalsFromBuildSegmentsRowStats(
-        reportData.getRowStats().get(RowIngestionMeters.BUILD_SEGMENTS)
-    );
-    if (includeUnparseable) {
-      List<ParseExceptionReport> taskUnparsebleEvents = (List<ParseExceptionReport>) reportData.getUnparseableEvents()
-                                                                    .get(RowIngestionMeters.BUILD_SEGMENTS);
-      unparseableEvents.addAll(taskUnparsebleEvents);
+    catch (Exception e) {
+      LOG.warn(e, "Encountered exception when getting live subtask report for task: " + taskId);
     }
-    return totals;
+    return null;
   }
 
-  private Pair<Map<String, Object>, Map<String, Object>> doGetRowStatsAndUnparseableEvents(
-      String full,
-      boolean includeUnparseable
-  )
+  private ParallelIndexStats doGetRowStatsAndUnparseableEvents(boolean full, boolean includeUnparseable)
   {
     if (currentSubTaskHolder == null) {
-      return Pair.of(ImmutableMap.of(), ImmutableMap.of());
+      return new ParallelIndexStats();
     }
 
     Object currentRunner = currentSubTaskHolder.getTask();
     if (currentRunner == null) {
-      return Pair.of(ImmutableMap.of(), ImmutableMap.of());
+      return new ParallelIndexStats();
     }
 
-    if (isParallelMode()) {

Review Comment:
   I think we can retain this if-else flow (but with some clean up as you have done).
   - for sequential, we will just create the `ParallelIndexTaskStats` object here itself.
   - for single phase, we will create a `ParallelIndexStatsReporter` and do the computations
   - for multi phase, we will have a method in this class itself, which checks if it is already cached and checks if the current phase is the correct one, otherwise, it constructs a `ParallelIndexStatsReporter`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a diff in pull request #13758: add ingested intervals to task report

Posted by "kfaraz (via GitHub)" <gi...@apache.org>.
kfaraz commented on code in PR #13758:
URL: https://github.com/apache/druid/pull/13758#discussion_r1183522014


##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java:
##########
@@ -1540,201 +1548,31 @@ public Response getCompleteSubTaskSpecAttemptHistory(
     }
   }
 
-  private RowIngestionMetersTotals getTotalsFromBuildSegmentsRowStats(Object buildSegmentsRowStats)
-  {
-    if (buildSegmentsRowStats instanceof RowIngestionMetersTotals) {
-      // This case is for unit tests. Normally when deserialized the row stats will apppear as a Map<String, Object>.
-      return (RowIngestionMetersTotals) buildSegmentsRowStats;
-    } else if (buildSegmentsRowStats instanceof Map) {
-      Map<String, Object> buildSegmentsRowStatsMap = (Map<String, Object>) buildSegmentsRowStats;
-      return new RowIngestionMetersTotals(
-          ((Number) buildSegmentsRowStatsMap.get("processed")).longValue(),
-          ((Number) buildSegmentsRowStatsMap.get("processedBytes")).longValue(),
-          ((Number) buildSegmentsRowStatsMap.get("processedWithError")).longValue(),
-          ((Number) buildSegmentsRowStatsMap.get("thrownAway")).longValue(),
-          ((Number) buildSegmentsRowStatsMap.get("unparseable")).longValue()
-      );
-    } else {
-      // should never happen
-      throw new RuntimeException("Unrecognized buildSegmentsRowStats type: " + buildSegmentsRowStats.getClass());
-    }
-  }
-
-  private Pair<Map<String, Object>, Map<String, Object>> doGetRowStatsAndUnparseableEventsParallelSinglePhase(
-      SinglePhaseParallelIndexTaskRunner parallelSinglePhaseRunner,
-      boolean includeUnparseable
-  )
-  {
-    final SimpleRowIngestionMeters buildSegmentsRowStats = new SimpleRowIngestionMeters();
-
-    List<ParseExceptionReport> unparseableEvents = new ArrayList<>();
-
-    // Get stats from completed tasks
-    Map<String, PushedSegmentsReport> completedSubtaskReports = parallelSinglePhaseRunner.getReports();
-    for (PushedSegmentsReport pushedSegmentsReport : completedSubtaskReports.values()) {
-      Map<String, TaskReport> taskReport = pushedSegmentsReport.getTaskReport();
-      if (taskReport == null || taskReport.isEmpty()) {
-        LOG.warn("Got an empty task report from subtask: " + pushedSegmentsReport.getTaskId());
-        continue;
-      }
-      RowIngestionMetersTotals rowIngestionMetersTotals =
-          getBuildSegmentsStatsFromTaskReport(taskReport, includeUnparseable, unparseableEvents);
-
-      buildSegmentsRowStats.addRowIngestionMetersTotals(rowIngestionMetersTotals);
-    }
-
-    RowIngestionMetersTotals rowStatsForRunningTasks = getRowStatsAndUnparseableEventsForRunningTasks(
-        parallelSinglePhaseRunner.getRunningTaskIds(),
-        unparseableEvents,
-        includeUnparseable
-    );
-    buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForRunningTasks);
-
-    return createStatsAndErrorsReport(buildSegmentsRowStats.getTotals(), unparseableEvents);
-  }
-
-  private Pair<Map<String, Object>, Map<String, Object>> doGetRowStatsAndUnparseableEventsParallelMultiPhase(
-      ParallelIndexTaskRunner<?, ?> currentRunner,
-      boolean includeUnparseable
-  )
-  {
-    if (indexGenerateRowStats != null) {
-      return Pair.of(indexGenerateRowStats.lhs, includeUnparseable ? indexGenerateRowStats.rhs : ImmutableMap.of());
-    } else if (!currentRunner.getName().equals("partial segment generation")) {
-      return Pair.of(ImmutableMap.of(), ImmutableMap.of());
-    } else {
-      Map<String, GeneratedPartitionsReport> completedSubtaskReports =
-          (Map<String, GeneratedPartitionsReport>) currentRunner.getReports();
-
-      final SimpleRowIngestionMeters buildSegmentsRowStats = new SimpleRowIngestionMeters();
-      final List<ParseExceptionReport> unparseableEvents = new ArrayList<>();
-      for (GeneratedPartitionsReport generatedPartitionsReport : completedSubtaskReports.values()) {
-        Map<String, TaskReport> taskReport = generatedPartitionsReport.getTaskReport();
-        if (taskReport == null || taskReport.isEmpty()) {
-          LOG.warn("Got an empty task report from subtask: " + generatedPartitionsReport.getTaskId());
-          continue;
-        }
-        RowIngestionMetersTotals rowStatsForCompletedTask =
-            getBuildSegmentsStatsFromTaskReport(taskReport, true, unparseableEvents);
-
-        buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForCompletedTask);
-      }
-
-      RowIngestionMetersTotals rowStatsForRunningTasks = getRowStatsAndUnparseableEventsForRunningTasks(
-          currentRunner.getRunningTaskIds(),
-          unparseableEvents,
-          includeUnparseable
-      );
-      buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForRunningTasks);
-
-      return createStatsAndErrorsReport(buildSegmentsRowStats.getTotals(), unparseableEvents);
-    }
-  }
-
-  private RowIngestionMetersTotals getRowStatsAndUnparseableEventsForRunningTasks(
-      Set<String> runningTaskIds,
-      List<ParseExceptionReport> unparseableEvents,
-      boolean includeUnparseable
-  )
+  @Nullable
+  Map<String, Object> fetchTaskReport(String taskId)
   {
-    final SimpleRowIngestionMeters buildSegmentsRowStats = new SimpleRowIngestionMeters();
-    for (String runningTaskId : runningTaskIds) {
-      try {
-        final Map<String, Object> report = getTaskReport(toolbox.getOverlordClient(), runningTaskId);
-
-        if (report == null || report.isEmpty()) {
-          // task does not have a running report yet
-          continue;
-        }
-
-        Map<String, Object> ingestionStatsAndErrors = (Map<String, Object>) report.get("ingestionStatsAndErrors");
-        Map<String, Object> payload = (Map<String, Object>) ingestionStatsAndErrors.get("payload");
-        Map<String, Object> rowStats = (Map<String, Object>) payload.get("rowStats");
-        Map<String, Object> totals = (Map<String, Object>) rowStats.get("totals");
-        Map<String, Object> buildSegments = (Map<String, Object>) totals.get(RowIngestionMeters.BUILD_SEGMENTS);
-
-        if (includeUnparseable) {
-          Map<String, Object> taskUnparseableEvents = (Map<String, Object>) payload.get("unparseableEvents");
-          List<ParseExceptionReport> buildSegmentsUnparseableEvents = (List<ParseExceptionReport>)
-              taskUnparseableEvents.get(RowIngestionMeters.BUILD_SEGMENTS);
-          unparseableEvents.addAll(buildSegmentsUnparseableEvents);
-        }
-
-        buildSegmentsRowStats.addRowIngestionMetersTotals(getTotalsFromBuildSegmentsRowStats(buildSegments));
-      }
-      catch (Exception e) {
-        LOG.warn(e, "Encountered exception when getting live subtask report for task: " + runningTaskId);
-      }
+    try {
+      return getTaskReport(toolbox.getOverlordClient(), taskId);
     }
-    return buildSegmentsRowStats.getTotals();
-  }
-
-  private Pair<Map<String, Object>, Map<String, Object>> createStatsAndErrorsReport(
-      RowIngestionMetersTotals rowStats,
-      List<ParseExceptionReport> unparseableEvents
-  )
-  {
-    Map<String, Object> rowStatsMap = new HashMap<>();
-    Map<String, Object> totalsMap = new HashMap<>();
-    totalsMap.put(RowIngestionMeters.BUILD_SEGMENTS, rowStats);
-    rowStatsMap.put("totals", totalsMap);
-
-    return Pair.of(rowStatsMap, ImmutableMap.of(RowIngestionMeters.BUILD_SEGMENTS, unparseableEvents));
-  }
-
-  private RowIngestionMetersTotals getBuildSegmentsStatsFromTaskReport(
-      Map<String, TaskReport> taskReport,
-      boolean includeUnparseable,
-      List<ParseExceptionReport> unparseableEvents)
-  {
-    IngestionStatsAndErrorsTaskReport ingestionStatsAndErrorsReport = (IngestionStatsAndErrorsTaskReport) taskReport.get(
-        IngestionStatsAndErrorsTaskReport.REPORT_KEY);
-    IngestionStatsAndErrorsTaskReportData reportData =
-        (IngestionStatsAndErrorsTaskReportData) ingestionStatsAndErrorsReport.getPayload();
-    RowIngestionMetersTotals totals = getTotalsFromBuildSegmentsRowStats(
-        reportData.getRowStats().get(RowIngestionMeters.BUILD_SEGMENTS)
-    );
-    if (includeUnparseable) {
-      List<ParseExceptionReport> taskUnparsebleEvents = (List<ParseExceptionReport>) reportData.getUnparseableEvents()
-                                                                    .get(RowIngestionMeters.BUILD_SEGMENTS);
-      unparseableEvents.addAll(taskUnparsebleEvents);
+    catch (Exception e) {
+      LOG.warn(e, "Encountered exception when getting live subtask report for task: " + taskId);
     }
-    return totals;
+    return null;
   }
 
-  private Pair<Map<String, Object>, Map<String, Object>> doGetRowStatsAndUnparseableEvents(
-      String full,
-      boolean includeUnparseable
-  )
+  private ParallelIndexStats doGetRowStatsAndUnparseableEvents(boolean full, boolean includeUnparseable)
   {
     if (currentSubTaskHolder == null) {
-      return Pair.of(ImmutableMap.of(), ImmutableMap.of());
+      return new ParallelIndexStats();
     }
 
     Object currentRunner = currentSubTaskHolder.getTask();
     if (currentRunner == null) {
-      return Pair.of(ImmutableMap.of(), ImmutableMap.of());
+      return new ParallelIndexStats();
     }
 
-    if (isParallelMode()) {

Review Comment:
   I think we can retain this if-else flow (but with some clean up as you have done).
   - for sequential, we will just create the `ParallelIndexTaskStats` object here itself.
   - for single phase, we will create a `ParallelIndexStatsReporter` and do the computations
   - for multi phase, we will have a method in this class itself, which checks if it is already cached or not, otherwise, it constructs a `ParallelIndexStatsReporter`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] add ingested intervals to task report (druid)

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #13758:
URL: https://github.com/apache/druid/pull/13758#issuecomment-1977712882

   This pull request/issue has been closed due to lack of activity. If you think that
   is incorrect, or the pull request requires review, you can revive the PR at any time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] add ingested intervals to task report (druid)

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] closed pull request #13758: add ingested intervals to task report
URL: https://github.com/apache/druid/pull/13758


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] paul-rogers commented on a diff in pull request #13758: add ingested intervals to task report

Posted by "paul-rogers (via GitHub)" <gi...@apache.org>.
paul-rogers commented on code in PR #13758:
URL: https://github.com/apache/druid/pull/13758#discussion_r1115163884


##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java:
##########
@@ -534,7 +536,7 @@ public TaskStatus runTask(final TaskToolbox toolbox)
     catch (Exception e) {
       log.error(e, "Encountered exception in %s.", ingestionState);
       errorMsg = Throwables.getStackTraceAsString(e);
-      toolbox.getTaskReportFileWriter().write(getId(), getTaskCompletionReports());
+      toolbox.getTaskReportFileWriter().write(getId(), getTaskCompletionReports(Collections.emptyList()));

Review Comment:
   Given the number of times that this empty list pattern occurs, should we just define an overload of `getTaskCompletionReports()` that takes no arguments and calls the one-argument version with an empty list?



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexStatsReporter.java:
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
+import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
+import org.apache.druid.indexing.common.TaskReport;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.segment.incremental.ParseExceptionReport;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
+import org.apache.druid.segment.incremental.SimpleRowIngestionMeters;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public abstract class ParallelIndexStatsReporter
+{
+  private static final Logger LOG = new Logger(ParallelIndexStatsReporter.class);
+
+  abstract ParallelIndexStats report(
+      ParallelIndexSupervisorTask task,
+      Object runner,
+      boolean includeUnparseable,
+      String full
+  );
+
+  protected RowIngestionMetersTotals getBuildSegmentsStatsFromTaskReport(
+      Map<String, TaskReport> taskReport,
+      boolean includeUnparseable,
+      List<ParseExceptionReport> unparseableEvents
+  )
+  {
+    IngestionStatsAndErrorsTaskReport ingestionStatsAndErrorsReport =
+        (IngestionStatsAndErrorsTaskReport) taskReport.get(
+            IngestionStatsAndErrorsTaskReport.REPORT_KEY);
+    IngestionStatsAndErrorsTaskReportData reportData =
+        (IngestionStatsAndErrorsTaskReportData) ingestionStatsAndErrorsReport.getPayload();
+    RowIngestionMetersTotals totals = getTotalsFromBuildSegmentsRowStats(
+        reportData.getRowStats().get(RowIngestionMeters.BUILD_SEGMENTS)
+    );
+    if (includeUnparseable) {
+      List<ParseExceptionReport> taskUnparsebleEvents =
+          (List<ParseExceptionReport>) reportData.getUnparseableEvents().get(RowIngestionMeters.BUILD_SEGMENTS);
+      unparseableEvents.addAll(taskUnparsebleEvents);
+    }
+    return totals;
+  }
+
+  private RowIngestionMetersTotals getTotalsFromBuildSegmentsRowStats(Object buildSegmentsRowStats)
+  {
+    if (buildSegmentsRowStats instanceof RowIngestionMetersTotals) {
+      // This case is for unit tests. Normally when deserialized the row stats will apppear as a Map<String, Object>.
+      return (RowIngestionMetersTotals) buildSegmentsRowStats;
+    } else if (buildSegmentsRowStats instanceof Map) {
+      Map<String, Object> buildSegmentsRowStatsMap = (Map<String, Object>) buildSegmentsRowStats;
+      return new RowIngestionMetersTotals(
+          ((Number) buildSegmentsRowStatsMap.get("processed")).longValue(),
+          ((Number) buildSegmentsRowStatsMap.get("processedBytes")).longValue(),
+          ((Number) buildSegmentsRowStatsMap.get("processedWithError")).longValue(),
+          ((Number) buildSegmentsRowStatsMap.get("thrownAway")).longValue(),
+          ((Number) buildSegmentsRowStatsMap.get("unparseable")).longValue()
+      );
+    } else {
+      // should never happen
+      throw new RuntimeException("Unrecognized buildSegmentsRowStats type: " + buildSegmentsRowStats.getClass());
+    }
+  }
+
+  protected RowIngestionMetersTotals getRowStatsAndUnparseableEventsForRunningTasks(
+      ParallelIndexSupervisorTask task,
+      Set<String> runningTaskIds,
+      List<ParseExceptionReport> unparseableEvents,
+      boolean includeUnparseable
+  )
+  {
+    final SimpleRowIngestionMeters buildSegmentsRowStats = new SimpleRowIngestionMeters();
+    for (String runningTaskId : runningTaskIds) {
+      try {
+        final Map<String, Object> report = task.fetchTaskReport(runningTaskId);
+        if (report == null || report.isEmpty()) {
+          // task does not have a running report yet
+          continue;
+        }
+
+        Map<String, Object> ingestionStatsAndErrors = (Map<String, Object>) report.get("ingestionStatsAndErrors");
+        Map<String, Object> payload = (Map<String, Object>) ingestionStatsAndErrors.get("payload");
+        Map<String, Object> rowStats = (Map<String, Object>) payload.get("rowStats");
+        Map<String, Object> totals = (Map<String, Object>) rowStats.get("totals");

Review Comment:
   More constants?



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexStatsReporter.java:
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
+import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
+import org.apache.druid.indexing.common.TaskReport;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.segment.incremental.ParseExceptionReport;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
+import org.apache.druid.segment.incremental.SimpleRowIngestionMeters;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public abstract class ParallelIndexStatsReporter
+{
+  private static final Logger LOG = new Logger(ParallelIndexStatsReporter.class);
+
+  abstract ParallelIndexStats report(
+      ParallelIndexSupervisorTask task,
+      Object runner,
+      boolean includeUnparseable,
+      String full
+  );
+
+  protected RowIngestionMetersTotals getBuildSegmentsStatsFromTaskReport(
+      Map<String, TaskReport> taskReport,
+      boolean includeUnparseable,
+      List<ParseExceptionReport> unparseableEvents
+  )
+  {
+    IngestionStatsAndErrorsTaskReport ingestionStatsAndErrorsReport =
+        (IngestionStatsAndErrorsTaskReport) taskReport.get(
+            IngestionStatsAndErrorsTaskReport.REPORT_KEY);
+    IngestionStatsAndErrorsTaskReportData reportData =
+        (IngestionStatsAndErrorsTaskReportData) ingestionStatsAndErrorsReport.getPayload();
+    RowIngestionMetersTotals totals = getTotalsFromBuildSegmentsRowStats(
+        reportData.getRowStats().get(RowIngestionMeters.BUILD_SEGMENTS)
+    );
+    if (includeUnparseable) {
+      List<ParseExceptionReport> taskUnparsebleEvents =
+          (List<ParseExceptionReport>) reportData.getUnparseableEvents().get(RowIngestionMeters.BUILD_SEGMENTS);
+      unparseableEvents.addAll(taskUnparsebleEvents);
+    }
+    return totals;
+  }
+
+  private RowIngestionMetersTotals getTotalsFromBuildSegmentsRowStats(Object buildSegmentsRowStats)
+  {
+    if (buildSegmentsRowStats instanceof RowIngestionMetersTotals) {
+      // This case is for unit tests. Normally when deserialized the row stats will apppear as a Map<String, Object>.
+      return (RowIngestionMetersTotals) buildSegmentsRowStats;
+    } else if (buildSegmentsRowStats instanceof Map) {
+      Map<String, Object> buildSegmentsRowStatsMap = (Map<String, Object>) buildSegmentsRowStats;
+      return new RowIngestionMetersTotals(
+          ((Number) buildSegmentsRowStatsMap.get("processed")).longValue(),
+          ((Number) buildSegmentsRowStatsMap.get("processedBytes")).longValue(),
+          ((Number) buildSegmentsRowStatsMap.get("processedWithError")).longValue(),
+          ((Number) buildSegmentsRowStatsMap.get("thrownAway")).longValue(),
+          ((Number) buildSegmentsRowStatsMap.get("unparseable")).longValue()

Review Comment:
   Can we define constants for these names?



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/MultiPhaseParallelIndexStatsReporter.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
+import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
+import org.apache.druid.indexing.common.TaskReport;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.segment.incremental.ParseExceptionReport;
+import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
+import org.apache.druid.segment.incremental.SimpleRowIngestionMeters;
+import org.joda.time.Interval;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class MultiPhaseParallelIndexStatsReporter extends ParallelIndexStatsReporter
+{
+  private static final Logger LOG = new Logger(MultiPhaseParallelIndexStatsReporter.class);
+
+  @Override
+  ParallelIndexStats report(
+      ParallelIndexSupervisorTask task,
+      Object runner,
+      boolean includeUnparseable,
+      String full
+  )
+  {
+    // use cached version if available
+    ParallelIndexStats cached = task.getIndexGenerateRowStats();
+    if (null != cached) {
+      return cached;
+    }
+
+    ParallelIndexTaskRunner<?, ?> currentRunner = (ParallelIndexTaskRunner<?, ?>) runner;
+    if (!currentRunner.getName().equals("partial segment generation")) {

Review Comment:
   Better, should we declare a constant and use it where we first set the name and here? Will save grief later if someone decides to change the text.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexStatsReporter.java:
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
+import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
+import org.apache.druid.indexing.common.TaskReport;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.segment.incremental.ParseExceptionReport;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
+import org.apache.druid.segment.incremental.SimpleRowIngestionMeters;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public abstract class ParallelIndexStatsReporter
+{
+  private static final Logger LOG = new Logger(ParallelIndexStatsReporter.class);
+
+  abstract ParallelIndexStats report(
+      ParallelIndexSupervisorTask task,
+      Object runner,
+      boolean includeUnparseable,
+      String full
+  );
+
+  protected RowIngestionMetersTotals getBuildSegmentsStatsFromTaskReport(
+      Map<String, TaskReport> taskReport,
+      boolean includeUnparseable,
+      List<ParseExceptionReport> unparseableEvents
+  )
+  {
+    IngestionStatsAndErrorsTaskReport ingestionStatsAndErrorsReport =
+        (IngestionStatsAndErrorsTaskReport) taskReport.get(
+            IngestionStatsAndErrorsTaskReport.REPORT_KEY);
+    IngestionStatsAndErrorsTaskReportData reportData =
+        (IngestionStatsAndErrorsTaskReportData) ingestionStatsAndErrorsReport.getPayload();
+    RowIngestionMetersTotals totals = getTotalsFromBuildSegmentsRowStats(
+        reportData.getRowStats().get(RowIngestionMeters.BUILD_SEGMENTS)
+    );
+    if (includeUnparseable) {
+      List<ParseExceptionReport> taskUnparsebleEvents =
+          (List<ParseExceptionReport>) reportData.getUnparseableEvents().get(RowIngestionMeters.BUILD_SEGMENTS);
+      unparseableEvents.addAll(taskUnparsebleEvents);
+    }
+    return totals;
+  }
+
+  private RowIngestionMetersTotals getTotalsFromBuildSegmentsRowStats(Object buildSegmentsRowStats)
+  {
+    if (buildSegmentsRowStats instanceof RowIngestionMetersTotals) {
+      // This case is for unit tests. Normally when deserialized the row stats will apppear as a Map<String, Object>.
+      return (RowIngestionMetersTotals) buildSegmentsRowStats;
+    } else if (buildSegmentsRowStats instanceof Map) {
+      Map<String, Object> buildSegmentsRowStatsMap = (Map<String, Object>) buildSegmentsRowStats;
+      return new RowIngestionMetersTotals(
+          ((Number) buildSegmentsRowStatsMap.get("processed")).longValue(),
+          ((Number) buildSegmentsRowStatsMap.get("processedBytes")).longValue(),
+          ((Number) buildSegmentsRowStatsMap.get("processedWithError")).longValue(),
+          ((Number) buildSegmentsRowStatsMap.get("thrownAway")).longValue(),
+          ((Number) buildSegmentsRowStatsMap.get("unparseable")).longValue()
+      );
+    } else {
+      // should never happen
+      throw new RuntimeException("Unrecognized buildSegmentsRowStats type: " + buildSegmentsRowStats.getClass());

Review Comment:
   Nit: 
   
   ```java
       throw ISE(
             "Unrecognized buildSegmentsRowStats type: [%s]",
             buildSegmentsRowStats.getClass().getName()
       );
   ````
   
   ISE = Illegal State Exception
   
   Has a formatter built in. Note Druid likes its brackets for interpolated values.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/MultiPhaseParallelIndexStatsReporter.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
+import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
+import org.apache.druid.indexing.common.TaskReport;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.segment.incremental.ParseExceptionReport;
+import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
+import org.apache.druid.segment.incremental.SimpleRowIngestionMeters;
+import org.joda.time.Interval;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class MultiPhaseParallelIndexStatsReporter extends ParallelIndexStatsReporter
+{
+  private static final Logger LOG = new Logger(MultiPhaseParallelIndexStatsReporter.class);
+
+  @Override
+  ParallelIndexStats report(
+      ParallelIndexSupervisorTask task,
+      Object runner,
+      boolean includeUnparseable,
+      String full
+  )
+  {
+    // use cached version if available
+    ParallelIndexStats cached = task.getIndexGenerateRowStats();
+    if (null != cached) {
+      return cached;
+    }
+
+    ParallelIndexTaskRunner<?, ?> currentRunner = (ParallelIndexTaskRunner<?, ?>) runner;
+    if (!currentRunner.getName().equals("partial segment generation")) {

Review Comment:
   Druid kinda likes to reverse these comparisons for null protection:
   
   ```java
     if (!"constant".equals(variable)) {
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java:
##########
@@ -767,7 +770,8 @@ TaskStatus runHashPartitionMultiPhaseParallel(TaskToolbox toolbox) throws Except
       );
       return TaskStatus.failure(getId(), errMsg);
     }
-    indexGenerateRowStats = doGetRowStatsAndUnparseableEventsParallelMultiPhase(indexingRunner, true);
+
+    indexGenerateRowStats = new MultiPhaseParallelIndexStatsReporter().report(this, indexingRunner, true, "full");

Review Comment:
   Yet another constant?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a diff in pull request #13758: add ingested intervals to task report

Posted by "kfaraz (via GitHub)" <gi...@apache.org>.
kfaraz commented on code in PR #13758:
URL: https://github.com/apache/druid/pull/13758#discussion_r1183522014


##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java:
##########
@@ -1540,201 +1548,31 @@ public Response getCompleteSubTaskSpecAttemptHistory(
     }
   }
 
-  private RowIngestionMetersTotals getTotalsFromBuildSegmentsRowStats(Object buildSegmentsRowStats)
-  {
-    if (buildSegmentsRowStats instanceof RowIngestionMetersTotals) {
-      // This case is for unit tests. Normally when deserialized the row stats will apppear as a Map<String, Object>.
-      return (RowIngestionMetersTotals) buildSegmentsRowStats;
-    } else if (buildSegmentsRowStats instanceof Map) {
-      Map<String, Object> buildSegmentsRowStatsMap = (Map<String, Object>) buildSegmentsRowStats;
-      return new RowIngestionMetersTotals(
-          ((Number) buildSegmentsRowStatsMap.get("processed")).longValue(),
-          ((Number) buildSegmentsRowStatsMap.get("processedBytes")).longValue(),
-          ((Number) buildSegmentsRowStatsMap.get("processedWithError")).longValue(),
-          ((Number) buildSegmentsRowStatsMap.get("thrownAway")).longValue(),
-          ((Number) buildSegmentsRowStatsMap.get("unparseable")).longValue()
-      );
-    } else {
-      // should never happen
-      throw new RuntimeException("Unrecognized buildSegmentsRowStats type: " + buildSegmentsRowStats.getClass());
-    }
-  }
-
-  private Pair<Map<String, Object>, Map<String, Object>> doGetRowStatsAndUnparseableEventsParallelSinglePhase(
-      SinglePhaseParallelIndexTaskRunner parallelSinglePhaseRunner,
-      boolean includeUnparseable
-  )
-  {
-    final SimpleRowIngestionMeters buildSegmentsRowStats = new SimpleRowIngestionMeters();
-
-    List<ParseExceptionReport> unparseableEvents = new ArrayList<>();
-
-    // Get stats from completed tasks
-    Map<String, PushedSegmentsReport> completedSubtaskReports = parallelSinglePhaseRunner.getReports();
-    for (PushedSegmentsReport pushedSegmentsReport : completedSubtaskReports.values()) {
-      Map<String, TaskReport> taskReport = pushedSegmentsReport.getTaskReport();
-      if (taskReport == null || taskReport.isEmpty()) {
-        LOG.warn("Got an empty task report from subtask: " + pushedSegmentsReport.getTaskId());
-        continue;
-      }
-      RowIngestionMetersTotals rowIngestionMetersTotals =
-          getBuildSegmentsStatsFromTaskReport(taskReport, includeUnparseable, unparseableEvents);
-
-      buildSegmentsRowStats.addRowIngestionMetersTotals(rowIngestionMetersTotals);
-    }
-
-    RowIngestionMetersTotals rowStatsForRunningTasks = getRowStatsAndUnparseableEventsForRunningTasks(
-        parallelSinglePhaseRunner.getRunningTaskIds(),
-        unparseableEvents,
-        includeUnparseable
-    );
-    buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForRunningTasks);
-
-    return createStatsAndErrorsReport(buildSegmentsRowStats.getTotals(), unparseableEvents);
-  }
-
-  private Pair<Map<String, Object>, Map<String, Object>> doGetRowStatsAndUnparseableEventsParallelMultiPhase(
-      ParallelIndexTaskRunner<?, ?> currentRunner,
-      boolean includeUnparseable
-  )
-  {
-    if (indexGenerateRowStats != null) {
-      return Pair.of(indexGenerateRowStats.lhs, includeUnparseable ? indexGenerateRowStats.rhs : ImmutableMap.of());
-    } else if (!currentRunner.getName().equals("partial segment generation")) {
-      return Pair.of(ImmutableMap.of(), ImmutableMap.of());
-    } else {
-      Map<String, GeneratedPartitionsReport> completedSubtaskReports =
-          (Map<String, GeneratedPartitionsReport>) currentRunner.getReports();
-
-      final SimpleRowIngestionMeters buildSegmentsRowStats = new SimpleRowIngestionMeters();
-      final List<ParseExceptionReport> unparseableEvents = new ArrayList<>();
-      for (GeneratedPartitionsReport generatedPartitionsReport : completedSubtaskReports.values()) {
-        Map<String, TaskReport> taskReport = generatedPartitionsReport.getTaskReport();
-        if (taskReport == null || taskReport.isEmpty()) {
-          LOG.warn("Got an empty task report from subtask: " + generatedPartitionsReport.getTaskId());
-          continue;
-        }
-        RowIngestionMetersTotals rowStatsForCompletedTask =
-            getBuildSegmentsStatsFromTaskReport(taskReport, true, unparseableEvents);
-
-        buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForCompletedTask);
-      }
-
-      RowIngestionMetersTotals rowStatsForRunningTasks = getRowStatsAndUnparseableEventsForRunningTasks(
-          currentRunner.getRunningTaskIds(),
-          unparseableEvents,
-          includeUnparseable
-      );
-      buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForRunningTasks);
-
-      return createStatsAndErrorsReport(buildSegmentsRowStats.getTotals(), unparseableEvents);
-    }
-  }
-
-  private RowIngestionMetersTotals getRowStatsAndUnparseableEventsForRunningTasks(
-      Set<String> runningTaskIds,
-      List<ParseExceptionReport> unparseableEvents,
-      boolean includeUnparseable
-  )
+  @Nullable
+  Map<String, Object> fetchTaskReport(String taskId)
   {
-    final SimpleRowIngestionMeters buildSegmentsRowStats = new SimpleRowIngestionMeters();
-    for (String runningTaskId : runningTaskIds) {
-      try {
-        final Map<String, Object> report = getTaskReport(toolbox.getOverlordClient(), runningTaskId);
-
-        if (report == null || report.isEmpty()) {
-          // task does not have a running report yet
-          continue;
-        }
-
-        Map<String, Object> ingestionStatsAndErrors = (Map<String, Object>) report.get("ingestionStatsAndErrors");
-        Map<String, Object> payload = (Map<String, Object>) ingestionStatsAndErrors.get("payload");
-        Map<String, Object> rowStats = (Map<String, Object>) payload.get("rowStats");
-        Map<String, Object> totals = (Map<String, Object>) rowStats.get("totals");
-        Map<String, Object> buildSegments = (Map<String, Object>) totals.get(RowIngestionMeters.BUILD_SEGMENTS);
-
-        if (includeUnparseable) {
-          Map<String, Object> taskUnparseableEvents = (Map<String, Object>) payload.get("unparseableEvents");
-          List<ParseExceptionReport> buildSegmentsUnparseableEvents = (List<ParseExceptionReport>)
-              taskUnparseableEvents.get(RowIngestionMeters.BUILD_SEGMENTS);
-          unparseableEvents.addAll(buildSegmentsUnparseableEvents);
-        }
-
-        buildSegmentsRowStats.addRowIngestionMetersTotals(getTotalsFromBuildSegmentsRowStats(buildSegments));
-      }
-      catch (Exception e) {
-        LOG.warn(e, "Encountered exception when getting live subtask report for task: " + runningTaskId);
-      }
+    try {
+      return getTaskReport(toolbox.getOverlordClient(), taskId);
     }
-    return buildSegmentsRowStats.getTotals();
-  }
-
-  private Pair<Map<String, Object>, Map<String, Object>> createStatsAndErrorsReport(
-      RowIngestionMetersTotals rowStats,
-      List<ParseExceptionReport> unparseableEvents
-  )
-  {
-    Map<String, Object> rowStatsMap = new HashMap<>();
-    Map<String, Object> totalsMap = new HashMap<>();
-    totalsMap.put(RowIngestionMeters.BUILD_SEGMENTS, rowStats);
-    rowStatsMap.put("totals", totalsMap);
-
-    return Pair.of(rowStatsMap, ImmutableMap.of(RowIngestionMeters.BUILD_SEGMENTS, unparseableEvents));
-  }
-
-  private RowIngestionMetersTotals getBuildSegmentsStatsFromTaskReport(
-      Map<String, TaskReport> taskReport,
-      boolean includeUnparseable,
-      List<ParseExceptionReport> unparseableEvents)
-  {
-    IngestionStatsAndErrorsTaskReport ingestionStatsAndErrorsReport = (IngestionStatsAndErrorsTaskReport) taskReport.get(
-        IngestionStatsAndErrorsTaskReport.REPORT_KEY);
-    IngestionStatsAndErrorsTaskReportData reportData =
-        (IngestionStatsAndErrorsTaskReportData) ingestionStatsAndErrorsReport.getPayload();
-    RowIngestionMetersTotals totals = getTotalsFromBuildSegmentsRowStats(
-        reportData.getRowStats().get(RowIngestionMeters.BUILD_SEGMENTS)
-    );
-    if (includeUnparseable) {
-      List<ParseExceptionReport> taskUnparsebleEvents = (List<ParseExceptionReport>) reportData.getUnparseableEvents()
-                                                                    .get(RowIngestionMeters.BUILD_SEGMENTS);
-      unparseableEvents.addAll(taskUnparsebleEvents);
+    catch (Exception e) {
+      LOG.warn(e, "Encountered exception when getting live subtask report for task: " + taskId);
     }
-    return totals;
+    return null;
   }
 
-  private Pair<Map<String, Object>, Map<String, Object>> doGetRowStatsAndUnparseableEvents(
-      String full,
-      boolean includeUnparseable
-  )
+  private ParallelIndexStats doGetRowStatsAndUnparseableEvents(boolean full, boolean includeUnparseable)
   {
     if (currentSubTaskHolder == null) {
-      return Pair.of(ImmutableMap.of(), ImmutableMap.of());
+      return new ParallelIndexStats();
     }
 
     Object currentRunner = currentSubTaskHolder.getTask();
     if (currentRunner == null) {
-      return Pair.of(ImmutableMap.of(), ImmutableMap.of());
+      return new ParallelIndexStats();
     }
 
-    if (isParallelMode()) {

Review Comment:
   I think we can retain this if-else flow (but with some of the clean up that you have done).
   - for sequential, we will just create the `ParallelIndexTaskStats` object here itself.
   - for single phase, we will create a `ParallelIndexStatsReporter` and do the computations
   - for multi phase, we will have a method in this class itself, which checks if it is already cached and checks if the current phase is the correct one, otherwise, it constructs a `ParallelIndexStatsReporter`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] danprince1 commented on a diff in pull request #13758: add ingested intervals to task report

Posted by "danprince1 (via GitHub)" <gi...@apache.org>.
danprince1 commented on code in PR #13758:
URL: https://github.com/apache/druid/pull/13758#discussion_r1174076521


##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexStatsReporterFactory.java:
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
+
+public class ParallelIndexStatsReporterFactory
+{
+  ParallelIndexStatsReporter create(ParallelIndexSupervisorTask task)
+  {
+    if (task.isParallelMode()) {
+      if (AbstractBatchIndexTask.isGuaranteedRollup(

Review Comment:
   Good idea - done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] danprince1 commented on a diff in pull request #13758: add ingested intervals to task report

Posted by "danprince1 (via GitHub)" <gi...@apache.org>.
danprince1 commented on code in PR #13758:
URL: https://github.com/apache/druid/pull/13758#discussion_r1174073728


##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java:
##########
@@ -246,7 +249,8 @@ private Map<String, TaskReport> getTaskCompletionReports()
                 getTaskCompletionRowStats(),
                 "",
                 false, // not applicable for parallel subtask
-                segmentAvailabilityWaitTimeMs
+                segmentAvailabilityWaitTimeMs,
+                intervals

Review Comment:
   The subtasks don't need to condense intervals, because the parent task gathers all intervals from subtasks, then condenses them. See ParallelIndexSupervisorTask line 1249.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] danprince1 commented on a diff in pull request #13758: add ingested intervals to task report

Posted by "danprince1 (via GitHub)" <gi...@apache.org>.
danprince1 commented on code in PR #13758:
URL: https://github.com/apache/druid/pull/13758#discussion_r1174037386


##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java:
##########
@@ -573,7 +580,8 @@ private Map<String, TaskReport> getTaskCompletionReports()
                 getTaskCompletionRowStats(),
                 errorMsg,
                 segmentAvailabilityConfirmationCompleted,
-                segmentAvailabilityWaitTimeMs
+                segmentAvailabilityWaitTimeMs,
+                JodaUtils.condenseIntervals(intervals)

Review Comment:
   Hi, thanks for taking a look at this PR!
   
   Making `ingestedIntervals` an attribute of the Task doesn't feel right to me because this is a very transient thing.  The instance variable would be empty all the time except for a vanishingly small period of time after `generateAndPublishSegments()` completes and before the task is torn down.  We only pass intervals to `getTaskCompletionReports()` in one place - we would call the setter and then immediately call `getTaskCompletionReports()`, which would call the getter.  And no other entity has any interest in these intervals - they are not really a property of the Task.
   
   Let me know what you think.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] danprince1 commented on a diff in pull request #13758: add ingested intervals to task report

Posted by "danprince1 (via GitHub)" <gi...@apache.org>.
danprince1 commented on code in PR #13758:
URL: https://github.com/apache/druid/pull/13758#discussion_r1174046572


##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java:
##########
@@ -651,7 +654,8 @@ private Map<String, TaskReport> getTaskCompletionReports()
                 getTaskCompletionRowStats(),
                 errorMsg,
                 false, // not applicable for parallel subtask
-                segmentAvailabilityWaitTimeMs
+                segmentAvailabilityWaitTimeMs,
+                ingestedIntervals

Review Comment:
   The subtasks don't need to condense intervals, because the parent task gathers all intervals from subtasks, then condenses them.  See `ParallelIndexSupervisorTask` line 1249.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org