You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2020/09/22 10:37:05 UTC

[GitHub] [druid] jon-wei opened a new pull request #10419: Automatically determine numShards for parallel ingestion hash partitioning

jon-wei opened a new pull request #10419:
URL: https://github.com/apache/druid/pull/10419


   This PR allows parallel batch ingestion to automatically determine `numShards` when `forceGuaranteedRollup` is used with hash partitioning.
   
   This is accomplished with a new phase of subtasks (`PartialDimensionCardinalityTask`). These subtasks build a map of `<Interval, HyperLogLogCollector>` where the HLL collector records the cardinality of the partitioning dimensions for each segment granularity interval in the input data.
   
   The supervisor task aggregates the HLL collectors by interval, and determines the highest cardinality across all the intervals. This max cardinality is divided by `maxRowsPerSegment` to determine `numShards` automatically.
   
   This PR has:
   - [x] been self-reviewed.
   - [ ] added documentation for new or modified features or behaviors.
   - [x] added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
   - [ ] added or updated version, license, or notice information in [licenses.yaml](https://github.com/apache/druid/blob/master/licenses.yaml)
   - [x] added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
   - [x] added unit tests or modified existing tests to cover new code paths, ensuring the threshold for [code coverage](https://github.com/apache/druid/blob/master/dev/code-review/code-coverage.md) is met.
   - [ ] added integration tests.
   - [x] been tested in a test Druid cluster.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jon-wei commented on a change in pull request #10419: Automatically determine numShards for parallel ingestion hash partitioning

Posted by GitBox <gi...@apache.org>.
jon-wei commented on a change in pull request #10419:
URL: https://github.com/apache/druid/pull/10419#discussion_r493117965



##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import org.apache.druid.data.input.HandlingInputRowIterator;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputSource;
+import org.apache.druid.hll.HyperLogLogCollector;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
+import org.apache.druid.indexing.common.task.ClientBasedTaskInfoProvider;
+import org.apache.druid.indexing.common.task.IndexTask;
+import org.apache.druid.indexing.common.task.TaskResource;
+import org.apache.druid.indexing.common.task.batch.parallel.iterator.DefaultIndexTaskInputRowIteratorBuilder;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.granularity.GranularitySpec;
+import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class PartialDimensionCardinalityTask extends PerfectRollupWorkerTask
+{
+  public static final String TYPE = "partial_dimension_cardinality";
+  private static final Logger LOG = new Logger(PartialDimensionCardinalityTask.class);
+
+  private final int numAttempts;
+  private final ParallelIndexIngestionSpec ingestionSchema;
+  private final String supervisorTaskId;
+
+  private final ObjectMapper jsonMapper;
+
+  @JsonCreator
+  PartialDimensionCardinalityTask(
+      // id shouldn't be null except when this task is created by ParallelIndexSupervisorTask
+      @JsonProperty("id") @Nullable String id,
+      @JsonProperty("groupId") final String groupId,
+      @JsonProperty("resource") final TaskResource taskResource,
+      @JsonProperty("supervisorTaskId") final String supervisorTaskId,
+      @JsonProperty("numAttempts") final int numAttempts, // zero-based counting
+      @JsonProperty("spec") final ParallelIndexIngestionSpec ingestionSchema,
+      @JsonProperty("context") final Map<String, Object> context,
+      @JacksonInject ObjectMapper jsonMapper
+  )
+  {
+    super(
+        getOrMakeId(id, TYPE, ingestionSchema.getDataSchema().getDataSource()),
+        groupId,
+        taskResource,
+        ingestionSchema.getDataSchema(),
+        ingestionSchema.getTuningConfig(),
+        context
+    );
+
+    Preconditions.checkArgument(
+        ingestionSchema.getTuningConfig().getPartitionsSpec() instanceof HashedPartitionsSpec,
+        "%s partitionsSpec required",
+        HashedPartitionsSpec.NAME
+    );
+
+    this.numAttempts = numAttempts;
+    this.ingestionSchema = ingestionSchema;
+    this.supervisorTaskId = supervisorTaskId;
+    this.jsonMapper = jsonMapper;
+  }
+
+  @JsonProperty
+  private int getNumAttempts()
+  {
+    return numAttempts;
+  }
+
+  @JsonProperty("spec")
+  private ParallelIndexIngestionSpec getIngestionSchema()
+  {
+    return ingestionSchema;
+  }
+
+  @JsonProperty
+  private String getSupervisorTaskId()
+  {
+    return supervisorTaskId;
+  }
+
+  @Override
+  public String getType()
+  {
+    return TYPE;
+  }
+
+  @Override
+  public boolean isReady(TaskActionClient taskActionClient) throws Exception
+  {
+    return tryTimeChunkLock(
+        taskActionClient,
+        getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals()
+    );
+  }
+
+  @Override
+  public TaskStatus runTask(TaskToolbox toolbox) throws Exception
+  {
+    DataSchema dataSchema = ingestionSchema.getDataSchema();
+    GranularitySpec granularitySpec = dataSchema.getGranularitySpec();
+    ParallelIndexTuningConfig tuningConfig = ingestionSchema.getTuningConfig();
+
+    HashedPartitionsSpec partitionsSpec = (HashedPartitionsSpec) tuningConfig.getPartitionsSpec();
+    Preconditions.checkNotNull(partitionsSpec, "partitionsSpec required in tuningConfig");
+
+    List<String> partitionDimensions = partitionsSpec.getPartitionDimensions();
+    if (partitionDimensions == null) {
+      partitionDimensions = HashBasedNumberedShardSpec.DEFAULT_PARTITION_DIMENSIONS;
+    }
+
+    InputSource inputSource = ingestionSchema.getIOConfig().getNonNullInputSource(
+        ingestionSchema.getDataSchema().getParser()
+    );
+    InputFormat inputFormat = inputSource.needsFormat()
+                              ? ParallelIndexSupervisorTask.getInputFormat(ingestionSchema)
+                              : null;
+    final RowIngestionMeters buildSegmentsMeters = toolbox.getRowIngestionMetersFactory().createRowIngestionMeters();
+    final ParseExceptionHandler parseExceptionHandler = new ParseExceptionHandler(
+        buildSegmentsMeters,
+        tuningConfig.isLogParseExceptions(),
+        tuningConfig.getMaxParseExceptions(),
+        tuningConfig.getMaxSavedParseExceptions()
+    );
+
+    try (
+        final CloseableIterator<InputRow> inputRowIterator = AbstractBatchIndexTask.inputSourceReader(
+            toolbox.getIndexingTmpDir(),
+            dataSchema,
+            inputSource,
+            inputFormat,
+            AbstractBatchIndexTask.defaultRowFilter(granularitySpec),
+            buildSegmentsMeters,
+            parseExceptionHandler
+        );
+        HandlingInputRowIterator iterator =
+            new DefaultIndexTaskInputRowIteratorBuilder()
+                .delegate(inputRowIterator)
+                .granularitySpec(granularitySpec)
+                .build()
+    ) {
+      Map<Interval, byte[]> cardinalities = determineCardinalities(
+          iterator,
+          granularitySpec,
+          partitionDimensions
+      );
+
+      sendReport(
+          toolbox,
+          new DimensionCardinalityReport(getId(), cardinalities)
+      );
+    }
+
+    return TaskStatus.success(getId());
+  }
+
+  private Map<Interval, byte[]> determineCardinalities(
+      HandlingInputRowIterator inputRowIterator,
+      GranularitySpec granularitySpec,
+      List<String> partitionDimensions
+  )
+  {
+    Map<Interval, HyperLogLogCollector> intervalToCardinalities = new HashMap<>();
+    while (inputRowIterator.hasNext()) {
+      InputRow inputRow = inputRowIterator.next();
+      if (inputRow == null) {
+        continue;
+      }
+
+      DateTime timestamp = inputRow.getTimestamp();
+
+      //noinspection OptionalGetWithoutIsPresent (InputRowIterator returns rows with present intervals)
+      Interval interval = granularitySpec.bucketInterval(timestamp).get();
+
+      LOG.info("TS: " + timestamp + " INTV: " + interval + " GSC: " + granularitySpec.getClass());
+
+      HyperLogLogCollector hllCollector = intervalToCardinalities.computeIfAbsent(

Review comment:
       > Using HllSketch would mean that the implementation for parallel ingestion is different from the one for sequential ingestion though.
   
   That was my reasoning for using `HyperLogLogCollector`, but I think it makes sense to change these to use `HllSketch`. I'll update this one to use `HllSketch` and a follow-on could be to do the same for `IndexTask`.
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jon-wei commented on a change in pull request #10419: Automatically determine numShards for parallel ingestion hash partitioning

Posted by GitBox <gi...@apache.org>.
jon-wei commented on a change in pull request #10419:
URL: https://github.com/apache/druid/pull/10419#discussion_r493939426



##########
File path: docs/ingestion/native-batch.md
##########
@@ -294,11 +294,17 @@ How the worker task creates segments is:
 |property|description|default|required?|
 |--------|-----------|-------|---------|
 |type|This should always be `hashed`|none|yes|
-|numShards|Directly specify the number of shards to create. If this is specified and `intervals` is specified in the `granularitySpec`, the index task can skip the determine intervals/partitions pass through the data.|null|yes|
+|numShards|Directly specify the number of shards to create. If this is specified and `intervals` is specified in the `granularitySpec`, the index task can skip the determine intervals/partitions pass through the data.|null|no|

Review comment:
       I don't think I added any new parameters to `HashedPartitionsSpec`, were you referring to something else?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jon-wei commented on a change in pull request #10419: Automatically determine numShards for parallel ingestion hash partitioning

Posted by GitBox <gi...@apache.org>.
jon-wei commented on a change in pull request #10419:
URL: https://github.com/apache/druid/pull/10419#discussion_r493977792



##########
File path: docs/ingestion/native-batch.md
##########
@@ -294,11 +294,17 @@ How the worker task creates segments is:
 |property|description|default|required?|
 |--------|-----------|-------|---------|
 |type|This should always be `hashed`|none|yes|
-|numShards|Directly specify the number of shards to create. If this is specified and `intervals` is specified in the `granularitySpec`, the index task can skip the determine intervals/partitions pass through the data.|null|yes|
+|numShards|Directly specify the number of shards to create. If this is specified and `intervals` is specified in the `granularitySpec`, the index task can skip the determine intervals/partitions pass through the data.|null|no|
 |partitionDimensions|The dimensions to partition on. Leave blank to select all dimensions.|null|no|
 
 The Parallel task with hash-based partitioning is similar to [MapReduce](https://en.wikipedia.org/wiki/MapReduce).
-The task runs in 2 phases, i.e., `partial segment generation` and `partial segment merge`.
+The task runs in up to 3 phases: `partial_dimension_cardinality`, `partial segment generation` and `partial segment merge`.
+- The `partial_dimension_cardinality` phase is an optional phase that only runs if `numShards` is not specified.
+The Parallel task splits the input data and assigns them to worker tasks based on the split hint spec.
+Each worker task (type `partial_dimension_cardinality`) gathers estimates of partitioning dimensions cardinality for
+each time chunk. The Parallel task will aggregate these estimates from the worker tasks and determine the highest
+cardinality across all of the time chunks in the input data, dividing this cardinality by `maxRowsPerSegment` to

Review comment:
       I went with `targetRowsPerSegment`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] ccaominh commented on a change in pull request #10419: Automatically determine numShards for parallel ingestion hash partitioning

Posted by GitBox <gi...@apache.org>.
ccaominh commented on a change in pull request #10419:
URL: https://github.com/apache/druid/pull/10419#discussion_r493920983



##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import org.apache.druid.data.input.HandlingInputRowIterator;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputSource;
+import org.apache.druid.hll.HyperLogLogCollector;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
+import org.apache.druid.indexing.common.task.ClientBasedTaskInfoProvider;
+import org.apache.druid.indexing.common.task.IndexTask;
+import org.apache.druid.indexing.common.task.TaskResource;
+import org.apache.druid.indexing.common.task.batch.parallel.iterator.DefaultIndexTaskInputRowIteratorBuilder;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.granularity.GranularitySpec;
+import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class PartialDimensionCardinalityTask extends PerfectRollupWorkerTask
+{
+  public static final String TYPE = "partial_dimension_cardinality";
+  private static final Logger LOG = new Logger(PartialDimensionCardinalityTask.class);
+
+  private final int numAttempts;
+  private final ParallelIndexIngestionSpec ingestionSchema;
+  private final String supervisorTaskId;
+
+  private final ObjectMapper jsonMapper;
+
+  @JsonCreator
+  PartialDimensionCardinalityTask(
+      // id shouldn't be null except when this task is created by ParallelIndexSupervisorTask
+      @JsonProperty("id") @Nullable String id,
+      @JsonProperty("groupId") final String groupId,
+      @JsonProperty("resource") final TaskResource taskResource,
+      @JsonProperty("supervisorTaskId") final String supervisorTaskId,
+      @JsonProperty("numAttempts") final int numAttempts, // zero-based counting
+      @JsonProperty("spec") final ParallelIndexIngestionSpec ingestionSchema,
+      @JsonProperty("context") final Map<String, Object> context,
+      @JacksonInject ObjectMapper jsonMapper
+  )
+  {
+    super(
+        getOrMakeId(id, TYPE, ingestionSchema.getDataSchema().getDataSource()),
+        groupId,
+        taskResource,
+        ingestionSchema.getDataSchema(),
+        ingestionSchema.getTuningConfig(),
+        context
+    );
+
+    Preconditions.checkArgument(
+        ingestionSchema.getTuningConfig().getPartitionsSpec() instanceof HashedPartitionsSpec,
+        "%s partitionsSpec required",
+        HashedPartitionsSpec.NAME
+    );
+
+    this.numAttempts = numAttempts;
+    this.ingestionSchema = ingestionSchema;
+    this.supervisorTaskId = supervisorTaskId;
+    this.jsonMapper = jsonMapper;
+  }
+
+  @JsonProperty
+  private int getNumAttempts()
+  {
+    return numAttempts;
+  }
+
+  @JsonProperty("spec")
+  private ParallelIndexIngestionSpec getIngestionSchema()
+  {
+    return ingestionSchema;
+  }
+
+  @JsonProperty
+  private String getSupervisorTaskId()
+  {
+    return supervisorTaskId;
+  }
+
+  @Override
+  public String getType()
+  {
+    return TYPE;
+  }
+
+  @Override
+  public boolean isReady(TaskActionClient taskActionClient) throws Exception
+  {
+    return tryTimeChunkLock(
+        taskActionClient,
+        getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals()
+    );
+  }

Review comment:
       I seem to remember running into the self deadlock issue when implementing https://github.com/apache/druid/pull/8925, which is why the locks are only acquired in the first phase of the range partitioning subtasks. I don't remember if I discovered this via `RangePartitionMultiPhaseParallelIndexingTest` or `ITPerfectRollupParallelIndexTest`, but if the problem isn't showing up in this PR's tests, then that's good.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a change in pull request #10419: Automatically determine numShards for parallel ingestion hash partitioning

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on a change in pull request #10419:
URL: https://github.com/apache/druid/pull/10419#discussion_r493391588



##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
##########
@@ -582,6 +652,50 @@ private TaskStatus runRangePartitionMultiPhaseParallel(TaskToolbox toolbox) thro
     return TaskStatus.fromCode(getId(), mergeState);
   }
 
+  @VisibleForTesting
+  public static int determineNumShardsFromCardinalityReport(
+      Collection<DimensionCardinalityReport> reports,
+      int maxRowsPerSegment
+  )
+  {
+    // aggregate all the sub-reports
+    Map<Interval, Union> finalCollectors = new HashMap<>();
+    reports.forEach(report -> {
+      Map<Interval, byte[]> intervalToCardinality = report.getIntervalToCardinalities();
+      for (Map.Entry<Interval, byte[]> entry : intervalToCardinality.entrySet()) {
+        Union union = finalCollectors.computeIfAbsent(
+            entry.getKey(),
+            (key) -> {
+              return new Union(DimensionCardinalityReport.HLL_SKETCH_LOG_K);
+            }
+        );
+        HllSketch entryHll = HllSketch.wrap(Memory.wrap(entry.getValue()));
+        union.update(entryHll);
+      }
+    });
+
+    // determine the highest cardinality in any interval
+    long maxCardinality = Long.MIN_VALUE;

Review comment:
       how about using `0` instead here? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jon-wei merged pull request #10419: Automatically determine numShards for parallel ingestion hash partitioning

Posted by GitBox <gi...@apache.org>.
jon-wei merged pull request #10419:
URL: https://github.com/apache/druid/pull/10419


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jon-wei commented on a change in pull request #10419: Automatically determine numShards for parallel ingestion hash partitioning

Posted by GitBox <gi...@apache.org>.
jon-wei commented on a change in pull request #10419:
URL: https://github.com/apache/druid/pull/10419#discussion_r493977564



##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
##########
@@ -582,6 +652,50 @@ private TaskStatus runRangePartitionMultiPhaseParallel(TaskToolbox toolbox) thro
     return TaskStatus.fromCode(getId(), mergeState);
   }
 
+  @VisibleForTesting
+  public static int determineNumShardsFromCardinalityReport(
+      Collection<DimensionCardinalityReport> reports,
+      int maxRowsPerSegment
+  )
+  {
+    // aggregate all the sub-reports
+    Map<Interval, Union> finalCollectors = new HashMap<>();
+    reports.forEach(report -> {
+      Map<Interval, byte[]> intervalToCardinality = report.getIntervalToCardinalities();
+      for (Map.Entry<Interval, byte[]> entry : intervalToCardinality.entrySet()) {
+        Union union = finalCollectors.computeIfAbsent(
+            entry.getKey(),
+            (key) -> {
+              return new Union(DimensionCardinalityReport.HLL_SKETCH_LOG_K);
+            }
+        );
+        HllSketch entryHll = HllSketch.wrap(Memory.wrap(entry.getValue()));
+        union.update(entryHll);
+      }
+    });
+
+    // determine the highest cardinality in any interval
+    long maxCardinality = Long.MIN_VALUE;

Review comment:
       Changed this to 0




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jon-wei commented on a change in pull request #10419: Automatically determine numShards for parallel ingestion hash partitioning

Posted by GitBox <gi...@apache.org>.
jon-wei commented on a change in pull request #10419:
URL: https://github.com/apache/druid/pull/10419#discussion_r493117965



##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import org.apache.druid.data.input.HandlingInputRowIterator;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputSource;
+import org.apache.druid.hll.HyperLogLogCollector;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
+import org.apache.druid.indexing.common.task.ClientBasedTaskInfoProvider;
+import org.apache.druid.indexing.common.task.IndexTask;
+import org.apache.druid.indexing.common.task.TaskResource;
+import org.apache.druid.indexing.common.task.batch.parallel.iterator.DefaultIndexTaskInputRowIteratorBuilder;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.granularity.GranularitySpec;
+import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class PartialDimensionCardinalityTask extends PerfectRollupWorkerTask
+{
+  public static final String TYPE = "partial_dimension_cardinality";
+  private static final Logger LOG = new Logger(PartialDimensionCardinalityTask.class);
+
+  private final int numAttempts;
+  private final ParallelIndexIngestionSpec ingestionSchema;
+  private final String supervisorTaskId;
+
+  private final ObjectMapper jsonMapper;
+
+  @JsonCreator
+  PartialDimensionCardinalityTask(
+      // id shouldn't be null except when this task is created by ParallelIndexSupervisorTask
+      @JsonProperty("id") @Nullable String id,
+      @JsonProperty("groupId") final String groupId,
+      @JsonProperty("resource") final TaskResource taskResource,
+      @JsonProperty("supervisorTaskId") final String supervisorTaskId,
+      @JsonProperty("numAttempts") final int numAttempts, // zero-based counting
+      @JsonProperty("spec") final ParallelIndexIngestionSpec ingestionSchema,
+      @JsonProperty("context") final Map<String, Object> context,
+      @JacksonInject ObjectMapper jsonMapper
+  )
+  {
+    super(
+        getOrMakeId(id, TYPE, ingestionSchema.getDataSchema().getDataSource()),
+        groupId,
+        taskResource,
+        ingestionSchema.getDataSchema(),
+        ingestionSchema.getTuningConfig(),
+        context
+    );
+
+    Preconditions.checkArgument(
+        ingestionSchema.getTuningConfig().getPartitionsSpec() instanceof HashedPartitionsSpec,
+        "%s partitionsSpec required",
+        HashedPartitionsSpec.NAME
+    );
+
+    this.numAttempts = numAttempts;
+    this.ingestionSchema = ingestionSchema;
+    this.supervisorTaskId = supervisorTaskId;
+    this.jsonMapper = jsonMapper;
+  }
+
+  @JsonProperty
+  private int getNumAttempts()
+  {
+    return numAttempts;
+  }
+
+  @JsonProperty("spec")
+  private ParallelIndexIngestionSpec getIngestionSchema()
+  {
+    return ingestionSchema;
+  }
+
+  @JsonProperty
+  private String getSupervisorTaskId()
+  {
+    return supervisorTaskId;
+  }
+
+  @Override
+  public String getType()
+  {
+    return TYPE;
+  }
+
+  @Override
+  public boolean isReady(TaskActionClient taskActionClient) throws Exception
+  {
+    return tryTimeChunkLock(
+        taskActionClient,
+        getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals()
+    );
+  }
+
+  @Override
+  public TaskStatus runTask(TaskToolbox toolbox) throws Exception
+  {
+    DataSchema dataSchema = ingestionSchema.getDataSchema();
+    GranularitySpec granularitySpec = dataSchema.getGranularitySpec();
+    ParallelIndexTuningConfig tuningConfig = ingestionSchema.getTuningConfig();
+
+    HashedPartitionsSpec partitionsSpec = (HashedPartitionsSpec) tuningConfig.getPartitionsSpec();
+    Preconditions.checkNotNull(partitionsSpec, "partitionsSpec required in tuningConfig");
+
+    List<String> partitionDimensions = partitionsSpec.getPartitionDimensions();
+    if (partitionDimensions == null) {
+      partitionDimensions = HashBasedNumberedShardSpec.DEFAULT_PARTITION_DIMENSIONS;
+    }
+
+    InputSource inputSource = ingestionSchema.getIOConfig().getNonNullInputSource(
+        ingestionSchema.getDataSchema().getParser()
+    );
+    InputFormat inputFormat = inputSource.needsFormat()
+                              ? ParallelIndexSupervisorTask.getInputFormat(ingestionSchema)
+                              : null;
+    final RowIngestionMeters buildSegmentsMeters = toolbox.getRowIngestionMetersFactory().createRowIngestionMeters();
+    final ParseExceptionHandler parseExceptionHandler = new ParseExceptionHandler(
+        buildSegmentsMeters,
+        tuningConfig.isLogParseExceptions(),
+        tuningConfig.getMaxParseExceptions(),
+        tuningConfig.getMaxSavedParseExceptions()
+    );
+
+    try (
+        final CloseableIterator<InputRow> inputRowIterator = AbstractBatchIndexTask.inputSourceReader(
+            toolbox.getIndexingTmpDir(),
+            dataSchema,
+            inputSource,
+            inputFormat,
+            AbstractBatchIndexTask.defaultRowFilter(granularitySpec),
+            buildSegmentsMeters,
+            parseExceptionHandler
+        );
+        HandlingInputRowIterator iterator =
+            new DefaultIndexTaskInputRowIteratorBuilder()
+                .delegate(inputRowIterator)
+                .granularitySpec(granularitySpec)
+                .build()
+    ) {
+      Map<Interval, byte[]> cardinalities = determineCardinalities(
+          iterator,
+          granularitySpec,
+          partitionDimensions
+      );
+
+      sendReport(
+          toolbox,
+          new DimensionCardinalityReport(getId(), cardinalities)
+      );
+    }
+
+    return TaskStatus.success(getId());
+  }
+
+  private Map<Interval, byte[]> determineCardinalities(
+      HandlingInputRowIterator inputRowIterator,
+      GranularitySpec granularitySpec,
+      List<String> partitionDimensions
+  )
+  {
+    Map<Interval, HyperLogLogCollector> intervalToCardinalities = new HashMap<>();
+    while (inputRowIterator.hasNext()) {
+      InputRow inputRow = inputRowIterator.next();
+      if (inputRow == null) {
+        continue;
+      }
+
+      DateTime timestamp = inputRow.getTimestamp();
+
+      //noinspection OptionalGetWithoutIsPresent (InputRowIterator returns rows with present intervals)
+      Interval interval = granularitySpec.bucketInterval(timestamp).get();
+
+      LOG.info("TS: " + timestamp + " INTV: " + interval + " GSC: " + granularitySpec.getClass());
+
+      HyperLogLogCollector hllCollector = intervalToCardinalities.computeIfAbsent(

Review comment:
       > Using HllSketch would mean that the implementation for parallel ingestion is different from the one for sequential ingestion though.
   
   That was my reasoning for using `HyperLogLogCollector`, but I think it makes sense to change these to use `HllSketch`. I'll update this one to use `HllSketch` and a follow-on could be to do the same for `IndexTask`.
   
   

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import org.apache.druid.data.input.HandlingInputRowIterator;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputSource;
+import org.apache.druid.hll.HyperLogLogCollector;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
+import org.apache.druid.indexing.common.task.ClientBasedTaskInfoProvider;
+import org.apache.druid.indexing.common.task.IndexTask;
+import org.apache.druid.indexing.common.task.TaskResource;
+import org.apache.druid.indexing.common.task.batch.parallel.iterator.DefaultIndexTaskInputRowIteratorBuilder;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.granularity.GranularitySpec;
+import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class PartialDimensionCardinalityTask extends PerfectRollupWorkerTask
+{
+  public static final String TYPE = "partial_dimension_cardinality";
+  private static final Logger LOG = new Logger(PartialDimensionCardinalityTask.class);
+
+  private final int numAttempts;
+  private final ParallelIndexIngestionSpec ingestionSchema;
+  private final String supervisorTaskId;
+
+  private final ObjectMapper jsonMapper;
+
+  @JsonCreator
+  PartialDimensionCardinalityTask(
+      // id shouldn't be null except when this task is created by ParallelIndexSupervisorTask
+      @JsonProperty("id") @Nullable String id,
+      @JsonProperty("groupId") final String groupId,
+      @JsonProperty("resource") final TaskResource taskResource,
+      @JsonProperty("supervisorTaskId") final String supervisorTaskId,
+      @JsonProperty("numAttempts") final int numAttempts, // zero-based counting
+      @JsonProperty("spec") final ParallelIndexIngestionSpec ingestionSchema,
+      @JsonProperty("context") final Map<String, Object> context,
+      @JacksonInject ObjectMapper jsonMapper
+  )
+  {
+    super(
+        getOrMakeId(id, TYPE, ingestionSchema.getDataSchema().getDataSource()),
+        groupId,
+        taskResource,
+        ingestionSchema.getDataSchema(),
+        ingestionSchema.getTuningConfig(),
+        context
+    );
+
+    Preconditions.checkArgument(
+        ingestionSchema.getTuningConfig().getPartitionsSpec() instanceof HashedPartitionsSpec,
+        "%s partitionsSpec required",
+        HashedPartitionsSpec.NAME
+    );
+
+    this.numAttempts = numAttempts;
+    this.ingestionSchema = ingestionSchema;
+    this.supervisorTaskId = supervisorTaskId;
+    this.jsonMapper = jsonMapper;
+  }
+
+  @JsonProperty
+  private int getNumAttempts()
+  {
+    return numAttempts;
+  }
+
+  @JsonProperty("spec")
+  private ParallelIndexIngestionSpec getIngestionSchema()
+  {
+    return ingestionSchema;
+  }
+
+  @JsonProperty
+  private String getSupervisorTaskId()
+  {
+    return supervisorTaskId;
+  }
+
+  @Override
+  public String getType()
+  {
+    return TYPE;
+  }
+
+  @Override
+  public boolean isReady(TaskActionClient taskActionClient) throws Exception
+  {
+    return tryTimeChunkLock(
+        taskActionClient,
+        getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals()
+    );
+  }
+
+  @Override
+  public TaskStatus runTask(TaskToolbox toolbox) throws Exception
+  {
+    DataSchema dataSchema = ingestionSchema.getDataSchema();
+    GranularitySpec granularitySpec = dataSchema.getGranularitySpec();
+    ParallelIndexTuningConfig tuningConfig = ingestionSchema.getTuningConfig();
+
+    HashedPartitionsSpec partitionsSpec = (HashedPartitionsSpec) tuningConfig.getPartitionsSpec();
+    Preconditions.checkNotNull(partitionsSpec, "partitionsSpec required in tuningConfig");
+
+    List<String> partitionDimensions = partitionsSpec.getPartitionDimensions();
+    if (partitionDimensions == null) {
+      partitionDimensions = HashBasedNumberedShardSpec.DEFAULT_PARTITION_DIMENSIONS;
+    }
+
+    InputSource inputSource = ingestionSchema.getIOConfig().getNonNullInputSource(
+        ingestionSchema.getDataSchema().getParser()
+    );
+    InputFormat inputFormat = inputSource.needsFormat()
+                              ? ParallelIndexSupervisorTask.getInputFormat(ingestionSchema)
+                              : null;
+    final RowIngestionMeters buildSegmentsMeters = toolbox.getRowIngestionMetersFactory().createRowIngestionMeters();
+    final ParseExceptionHandler parseExceptionHandler = new ParseExceptionHandler(
+        buildSegmentsMeters,
+        tuningConfig.isLogParseExceptions(),
+        tuningConfig.getMaxParseExceptions(),
+        tuningConfig.getMaxSavedParseExceptions()
+    );
+
+    try (
+        final CloseableIterator<InputRow> inputRowIterator = AbstractBatchIndexTask.inputSourceReader(
+            toolbox.getIndexingTmpDir(),
+            dataSchema,
+            inputSource,
+            inputFormat,
+            AbstractBatchIndexTask.defaultRowFilter(granularitySpec),
+            buildSegmentsMeters,
+            parseExceptionHandler
+        );
+        HandlingInputRowIterator iterator =
+            new DefaultIndexTaskInputRowIteratorBuilder()
+                .delegate(inputRowIterator)
+                .granularitySpec(granularitySpec)
+                .build()
+    ) {
+      Map<Interval, byte[]> cardinalities = determineCardinalities(
+          iterator,
+          granularitySpec,
+          partitionDimensions
+      );
+
+      sendReport(
+          toolbox,
+          new DimensionCardinalityReport(getId(), cardinalities)
+      );
+    }
+
+    return TaskStatus.success(getId());
+  }
+
+  private Map<Interval, byte[]> determineCardinalities(
+      HandlingInputRowIterator inputRowIterator,
+      GranularitySpec granularitySpec,
+      List<String> partitionDimensions
+  )
+  {
+    Map<Interval, HyperLogLogCollector> intervalToCardinalities = new HashMap<>();
+    while (inputRowIterator.hasNext()) {
+      InputRow inputRow = inputRowIterator.next();
+      if (inputRow == null) {
+        continue;
+      }
+
+      DateTime timestamp = inputRow.getTimestamp();
+
+      //noinspection OptionalGetWithoutIsPresent (InputRowIterator returns rows with present intervals)
+      Interval interval = granularitySpec.bucketInterval(timestamp).get();
+
+      LOG.info("TS: " + timestamp + " INTV: " + interval + " GSC: " + granularitySpec.getClass());

Review comment:
       I'll remove this, it was a debugging-only message that I don't think should be retained

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import org.apache.druid.data.input.HandlingInputRowIterator;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputSource;
+import org.apache.druid.hll.HyperLogLogCollector;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
+import org.apache.druid.indexing.common.task.ClientBasedTaskInfoProvider;
+import org.apache.druid.indexing.common.task.IndexTask;
+import org.apache.druid.indexing.common.task.TaskResource;
+import org.apache.druid.indexing.common.task.batch.parallel.iterator.DefaultIndexTaskInputRowIteratorBuilder;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.granularity.GranularitySpec;
+import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class PartialDimensionCardinalityTask extends PerfectRollupWorkerTask
+{
+  public static final String TYPE = "partial_dimension_cardinality";
+  private static final Logger LOG = new Logger(PartialDimensionCardinalityTask.class);
+
+  private final int numAttempts;
+  private final ParallelIndexIngestionSpec ingestionSchema;
+  private final String supervisorTaskId;
+
+  private final ObjectMapper jsonMapper;
+
+  @JsonCreator
+  PartialDimensionCardinalityTask(
+      // id shouldn't be null except when this task is created by ParallelIndexSupervisorTask
+      @JsonProperty("id") @Nullable String id,
+      @JsonProperty("groupId") final String groupId,
+      @JsonProperty("resource") final TaskResource taskResource,
+      @JsonProperty("supervisorTaskId") final String supervisorTaskId,
+      @JsonProperty("numAttempts") final int numAttempts, // zero-based counting
+      @JsonProperty("spec") final ParallelIndexIngestionSpec ingestionSchema,
+      @JsonProperty("context") final Map<String, Object> context,
+      @JacksonInject ObjectMapper jsonMapper
+  )
+  {
+    super(
+        getOrMakeId(id, TYPE, ingestionSchema.getDataSchema().getDataSource()),
+        groupId,
+        taskResource,
+        ingestionSchema.getDataSchema(),
+        ingestionSchema.getTuningConfig(),
+        context
+    );
+
+    Preconditions.checkArgument(
+        ingestionSchema.getTuningConfig().getPartitionsSpec() instanceof HashedPartitionsSpec,
+        "%s partitionsSpec required",
+        HashedPartitionsSpec.NAME
+    );
+
+    this.numAttempts = numAttempts;
+    this.ingestionSchema = ingestionSchema;
+    this.supervisorTaskId = supervisorTaskId;
+    this.jsonMapper = jsonMapper;
+  }
+
+  @JsonProperty
+  private int getNumAttempts()
+  {
+    return numAttempts;
+  }
+
+  @JsonProperty("spec")
+  private ParallelIndexIngestionSpec getIngestionSchema()
+  {
+    return ingestionSchema;
+  }
+
+  @JsonProperty
+  private String getSupervisorTaskId()
+  {
+    return supervisorTaskId;
+  }
+
+  @Override
+  public String getType()
+  {
+    return TYPE;
+  }
+
+  @Override
+  public boolean isReady(TaskActionClient taskActionClient) throws Exception
+  {
+    return tryTimeChunkLock(
+        taskActionClient,
+        getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals()
+    );
+  }

Review comment:
       Thanks, will look into this

##########
File path: core/src/main/java/org/apache/druid/timeline/partition/HashBasedNumberedShardSpec.java
##########
@@ -160,7 +160,7 @@ public static int hash(ObjectMapper jsonMapper, List<String> partitionDimensions
   }
 
   @VisibleForTesting
-  static List<Object> getGroupKey(final List<String> partitionDimensions, final long timestamp, final InputRow inputRow)
+  public static List<Object> getGroupKey(final List<String> partitionDimensions, final long timestamp, final InputRow inputRow)

Review comment:
       Removed the `@VisibleForTesting`

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
##########
@@ -499,17 +518,62 @@ private TaskStatus runMultiPhaseParallel(TaskToolbox toolbox) throws Exception
 
   private TaskStatus runHashPartitionMultiPhaseParallel(TaskToolbox toolbox) throws Exception
   {
+    TaskState state;
+
+    if (!(ingestionSchema.getTuningConfig().getPartitionsSpec() instanceof HashedPartitionsSpec)) {
+      // only range and hash partitioning is supported for multiphase parallel ingestion, see runMultiPhaseParallel()
+      throw new ISE(
+          "forceGuaranteedRollup is set but partitionsSpec [%s] is not a ranged or hash partition spec.",

Review comment:
       Changed the message to use "single_dim"

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import org.apache.druid.data.input.HandlingInputRowIterator;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputSource;
+import org.apache.druid.hll.HyperLogLogCollector;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
+import org.apache.druid.indexing.common.task.ClientBasedTaskInfoProvider;
+import org.apache.druid.indexing.common.task.IndexTask;
+import org.apache.druid.indexing.common.task.TaskResource;
+import org.apache.druid.indexing.common.task.batch.parallel.iterator.DefaultIndexTaskInputRowIteratorBuilder;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.granularity.GranularitySpec;
+import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class PartialDimensionCardinalityTask extends PerfectRollupWorkerTask
+{
+  public static final String TYPE = "partial_dimension_cardinality";
+  private static final Logger LOG = new Logger(PartialDimensionCardinalityTask.class);
+
+  private final int numAttempts;
+  private final ParallelIndexIngestionSpec ingestionSchema;
+  private final String supervisorTaskId;
+
+  private final ObjectMapper jsonMapper;
+
+  @JsonCreator
+  PartialDimensionCardinalityTask(
+      // id shouldn't be null except when this task is created by ParallelIndexSupervisorTask
+      @JsonProperty("id") @Nullable String id,
+      @JsonProperty("groupId") final String groupId,
+      @JsonProperty("resource") final TaskResource taskResource,
+      @JsonProperty("supervisorTaskId") final String supervisorTaskId,
+      @JsonProperty("numAttempts") final int numAttempts, // zero-based counting
+      @JsonProperty("spec") final ParallelIndexIngestionSpec ingestionSchema,
+      @JsonProperty("context") final Map<String, Object> context,
+      @JacksonInject ObjectMapper jsonMapper
+  )
+  {
+    super(
+        getOrMakeId(id, TYPE, ingestionSchema.getDataSchema().getDataSource()),
+        groupId,
+        taskResource,
+        ingestionSchema.getDataSchema(),
+        ingestionSchema.getTuningConfig(),
+        context
+    );
+
+    Preconditions.checkArgument(
+        ingestionSchema.getTuningConfig().getPartitionsSpec() instanceof HashedPartitionsSpec,
+        "%s partitionsSpec required",
+        HashedPartitionsSpec.NAME
+    );
+
+    this.numAttempts = numAttempts;
+    this.ingestionSchema = ingestionSchema;
+    this.supervisorTaskId = supervisorTaskId;
+    this.jsonMapper = jsonMapper;
+  }
+
+  @JsonProperty
+  private int getNumAttempts()
+  {
+    return numAttempts;
+  }
+
+  @JsonProperty("spec")
+  private ParallelIndexIngestionSpec getIngestionSchema()
+  {
+    return ingestionSchema;
+  }
+
+  @JsonProperty
+  private String getSupervisorTaskId()
+  {
+    return supervisorTaskId;
+  }
+
+  @Override
+  public String getType()
+  {
+    return TYPE;
+  }
+
+  @Override
+  public boolean isReady(TaskActionClient taskActionClient) throws Exception
+  {
+    return tryTimeChunkLock(
+        taskActionClient,
+        getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals()
+    );
+  }
+
+  @Override
+  public TaskStatus runTask(TaskToolbox toolbox) throws Exception
+  {
+    DataSchema dataSchema = ingestionSchema.getDataSchema();
+    GranularitySpec granularitySpec = dataSchema.getGranularitySpec();
+    ParallelIndexTuningConfig tuningConfig = ingestionSchema.getTuningConfig();
+
+    HashedPartitionsSpec partitionsSpec = (HashedPartitionsSpec) tuningConfig.getPartitionsSpec();
+    Preconditions.checkNotNull(partitionsSpec, "partitionsSpec required in tuningConfig");
+
+    List<String> partitionDimensions = partitionsSpec.getPartitionDimensions();
+    if (partitionDimensions == null) {
+      partitionDimensions = HashBasedNumberedShardSpec.DEFAULT_PARTITION_DIMENSIONS;
+    }
+
+    InputSource inputSource = ingestionSchema.getIOConfig().getNonNullInputSource(
+        ingestionSchema.getDataSchema().getParser()
+    );
+    InputFormat inputFormat = inputSource.needsFormat()
+                              ? ParallelIndexSupervisorTask.getInputFormat(ingestionSchema)
+                              : null;
+    final RowIngestionMeters buildSegmentsMeters = toolbox.getRowIngestionMetersFactory().createRowIngestionMeters();
+    final ParseExceptionHandler parseExceptionHandler = new ParseExceptionHandler(
+        buildSegmentsMeters,
+        tuningConfig.isLogParseExceptions(),
+        tuningConfig.getMaxParseExceptions(),
+        tuningConfig.getMaxSavedParseExceptions()
+    );
+
+    try (
+        final CloseableIterator<InputRow> inputRowIterator = AbstractBatchIndexTask.inputSourceReader(
+            toolbox.getIndexingTmpDir(),
+            dataSchema,
+            inputSource,
+            inputFormat,
+            AbstractBatchIndexTask.defaultRowFilter(granularitySpec),
+            buildSegmentsMeters,
+            parseExceptionHandler
+        );
+        HandlingInputRowIterator iterator =
+            new DefaultIndexTaskInputRowIteratorBuilder()
+                .delegate(inputRowIterator)
+                .granularitySpec(granularitySpec)
+                .build()
+    ) {
+      Map<Interval, byte[]> cardinalities = determineCardinalities(
+          iterator,
+          granularitySpec,
+          partitionDimensions
+      );
+
+      sendReport(
+          toolbox,
+          new DimensionCardinalityReport(getId(), cardinalities)
+      );
+    }
+
+    return TaskStatus.success(getId());
+  }
+
+  private Map<Interval, byte[]> determineCardinalities(
+      HandlingInputRowIterator inputRowIterator,
+      GranularitySpec granularitySpec,
+      List<String> partitionDimensions
+  )
+  {
+    Map<Interval, HyperLogLogCollector> intervalToCardinalities = new HashMap<>();
+    while (inputRowIterator.hasNext()) {
+      InputRow inputRow = inputRowIterator.next();
+      if (inputRow == null) {
+        continue;
+      }
+
+      DateTime timestamp = inputRow.getTimestamp();
+
+      //noinspection OptionalGetWithoutIsPresent (InputRowIterator returns rows with present intervals)
+      Interval interval = granularitySpec.bucketInterval(timestamp).get();
+
+      LOG.info("TS: " + timestamp + " INTV: " + interval + " GSC: " + granularitySpec.getClass());

Review comment:
       This has been removed

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import org.apache.druid.data.input.HandlingInputRowIterator;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputSource;
+import org.apache.druid.hll.HyperLogLogCollector;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
+import org.apache.druid.indexing.common.task.ClientBasedTaskInfoProvider;
+import org.apache.druid.indexing.common.task.IndexTask;
+import org.apache.druid.indexing.common.task.TaskResource;
+import org.apache.druid.indexing.common.task.batch.parallel.iterator.DefaultIndexTaskInputRowIteratorBuilder;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.granularity.GranularitySpec;
+import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class PartialDimensionCardinalityTask extends PerfectRollupWorkerTask
+{
+  public static final String TYPE = "partial_dimension_cardinality";
+  private static final Logger LOG = new Logger(PartialDimensionCardinalityTask.class);
+
+  private final int numAttempts;
+  private final ParallelIndexIngestionSpec ingestionSchema;
+  private final String supervisorTaskId;
+
+  private final ObjectMapper jsonMapper;
+
+  @JsonCreator
+  PartialDimensionCardinalityTask(
+      // id shouldn't be null except when this task is created by ParallelIndexSupervisorTask
+      @JsonProperty("id") @Nullable String id,
+      @JsonProperty("groupId") final String groupId,
+      @JsonProperty("resource") final TaskResource taskResource,
+      @JsonProperty("supervisorTaskId") final String supervisorTaskId,
+      @JsonProperty("numAttempts") final int numAttempts, // zero-based counting
+      @JsonProperty("spec") final ParallelIndexIngestionSpec ingestionSchema,
+      @JsonProperty("context") final Map<String, Object> context,
+      @JacksonInject ObjectMapper jsonMapper
+  )
+  {
+    super(
+        getOrMakeId(id, TYPE, ingestionSchema.getDataSchema().getDataSource()),
+        groupId,
+        taskResource,
+        ingestionSchema.getDataSchema(),
+        ingestionSchema.getTuningConfig(),
+        context
+    );
+
+    Preconditions.checkArgument(
+        ingestionSchema.getTuningConfig().getPartitionsSpec() instanceof HashedPartitionsSpec,
+        "%s partitionsSpec required",
+        HashedPartitionsSpec.NAME
+    );
+
+    this.numAttempts = numAttempts;
+    this.ingestionSchema = ingestionSchema;
+    this.supervisorTaskId = supervisorTaskId;
+    this.jsonMapper = jsonMapper;
+  }
+
+  @JsonProperty
+  private int getNumAttempts()
+  {
+    return numAttempts;
+  }
+
+  @JsonProperty("spec")
+  private ParallelIndexIngestionSpec getIngestionSchema()
+  {
+    return ingestionSchema;
+  }
+
+  @JsonProperty
+  private String getSupervisorTaskId()
+  {
+    return supervisorTaskId;
+  }
+
+  @Override
+  public String getType()
+  {
+    return TYPE;
+  }
+
+  @Override
+  public boolean isReady(TaskActionClient taskActionClient) throws Exception
+  {
+    return tryTimeChunkLock(
+        taskActionClient,
+        getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals()
+    );
+  }
+
+  @Override
+  public TaskStatus runTask(TaskToolbox toolbox) throws Exception
+  {
+    DataSchema dataSchema = ingestionSchema.getDataSchema();
+    GranularitySpec granularitySpec = dataSchema.getGranularitySpec();
+    ParallelIndexTuningConfig tuningConfig = ingestionSchema.getTuningConfig();
+
+    HashedPartitionsSpec partitionsSpec = (HashedPartitionsSpec) tuningConfig.getPartitionsSpec();
+    Preconditions.checkNotNull(partitionsSpec, "partitionsSpec required in tuningConfig");
+
+    List<String> partitionDimensions = partitionsSpec.getPartitionDimensions();
+    if (partitionDimensions == null) {
+      partitionDimensions = HashBasedNumberedShardSpec.DEFAULT_PARTITION_DIMENSIONS;
+    }
+
+    InputSource inputSource = ingestionSchema.getIOConfig().getNonNullInputSource(
+        ingestionSchema.getDataSchema().getParser()
+    );
+    InputFormat inputFormat = inputSource.needsFormat()
+                              ? ParallelIndexSupervisorTask.getInputFormat(ingestionSchema)
+                              : null;
+    final RowIngestionMeters buildSegmentsMeters = toolbox.getRowIngestionMetersFactory().createRowIngestionMeters();
+    final ParseExceptionHandler parseExceptionHandler = new ParseExceptionHandler(
+        buildSegmentsMeters,
+        tuningConfig.isLogParseExceptions(),
+        tuningConfig.getMaxParseExceptions(),
+        tuningConfig.getMaxSavedParseExceptions()
+    );
+
+    try (
+        final CloseableIterator<InputRow> inputRowIterator = AbstractBatchIndexTask.inputSourceReader(
+            toolbox.getIndexingTmpDir(),
+            dataSchema,
+            inputSource,
+            inputFormat,
+            AbstractBatchIndexTask.defaultRowFilter(granularitySpec),
+            buildSegmentsMeters,
+            parseExceptionHandler
+        );
+        HandlingInputRowIterator iterator =
+            new DefaultIndexTaskInputRowIteratorBuilder()
+                .delegate(inputRowIterator)
+                .granularitySpec(granularitySpec)
+                .build()
+    ) {
+      Map<Interval, byte[]> cardinalities = determineCardinalities(
+          iterator,
+          granularitySpec,
+          partitionDimensions
+      );
+
+      sendReport(
+          toolbox,
+          new DimensionCardinalityReport(getId(), cardinalities)
+      );
+    }
+
+    return TaskStatus.success(getId());
+  }
+
+  private Map<Interval, byte[]> determineCardinalities(
+      HandlingInputRowIterator inputRowIterator,
+      GranularitySpec granularitySpec,
+      List<String> partitionDimensions
+  )
+  {
+    Map<Interval, HyperLogLogCollector> intervalToCardinalities = new HashMap<>();
+    while (inputRowIterator.hasNext()) {
+      InputRow inputRow = inputRowIterator.next();
+      if (inputRow == null) {
+        continue;
+      }
+
+      DateTime timestamp = inputRow.getTimestamp();
+
+      //noinspection OptionalGetWithoutIsPresent (InputRowIterator returns rows with present intervals)
+      Interval interval = granularitySpec.bucketInterval(timestamp).get();
+
+      LOG.info("TS: " + timestamp + " INTV: " + interval + " GSC: " + granularitySpec.getClass());
+
+      HyperLogLogCollector hllCollector = intervalToCardinalities.computeIfAbsent(

Review comment:
       I've updated this to use HllSketch instead

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import org.apache.druid.data.input.HandlingInputRowIterator;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputSource;
+import org.apache.druid.hll.HyperLogLogCollector;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
+import org.apache.druid.indexing.common.task.ClientBasedTaskInfoProvider;
+import org.apache.druid.indexing.common.task.IndexTask;
+import org.apache.druid.indexing.common.task.TaskResource;
+import org.apache.druid.indexing.common.task.batch.parallel.iterator.DefaultIndexTaskInputRowIteratorBuilder;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.granularity.GranularitySpec;
+import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class PartialDimensionCardinalityTask extends PerfectRollupWorkerTask
+{
+  public static final String TYPE = "partial_dimension_cardinality";
+  private static final Logger LOG = new Logger(PartialDimensionCardinalityTask.class);
+
+  private final int numAttempts;
+  private final ParallelIndexIngestionSpec ingestionSchema;
+  private final String supervisorTaskId;
+
+  private final ObjectMapper jsonMapper;
+
+  @JsonCreator
+  PartialDimensionCardinalityTask(
+      // id shouldn't be null except when this task is created by ParallelIndexSupervisorTask
+      @JsonProperty("id") @Nullable String id,
+      @JsonProperty("groupId") final String groupId,
+      @JsonProperty("resource") final TaskResource taskResource,
+      @JsonProperty("supervisorTaskId") final String supervisorTaskId,
+      @JsonProperty("numAttempts") final int numAttempts, // zero-based counting
+      @JsonProperty("spec") final ParallelIndexIngestionSpec ingestionSchema,
+      @JsonProperty("context") final Map<String, Object> context,
+      @JacksonInject ObjectMapper jsonMapper
+  )
+  {
+    super(
+        getOrMakeId(id, TYPE, ingestionSchema.getDataSchema().getDataSource()),
+        groupId,
+        taskResource,
+        ingestionSchema.getDataSchema(),
+        ingestionSchema.getTuningConfig(),
+        context
+    );
+
+    Preconditions.checkArgument(
+        ingestionSchema.getTuningConfig().getPartitionsSpec() instanceof HashedPartitionsSpec,
+        "%s partitionsSpec required",
+        HashedPartitionsSpec.NAME
+    );
+
+    this.numAttempts = numAttempts;
+    this.ingestionSchema = ingestionSchema;
+    this.supervisorTaskId = supervisorTaskId;
+    this.jsonMapper = jsonMapper;
+  }
+
+  @JsonProperty
+  private int getNumAttempts()
+  {
+    return numAttempts;
+  }
+
+  @JsonProperty("spec")
+  private ParallelIndexIngestionSpec getIngestionSchema()
+  {
+    return ingestionSchema;
+  }
+
+  @JsonProperty
+  private String getSupervisorTaskId()
+  {
+    return supervisorTaskId;
+  }
+
+  @Override
+  public String getType()
+  {
+    return TYPE;
+  }
+
+  @Override
+  public boolean isReady(TaskActionClient taskActionClient) throws Exception
+  {
+    return tryTimeChunkLock(
+        taskActionClient,
+        getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals()
+    );
+  }

Review comment:
       I didn't see any failures when I was testing this on a cluster or in unit tests with the current locking (maybe related to the javadoc comment on isReady(): "This method must be idempotent, as it may be run multiple times per task."?).
   
   I updated `PartialHashSegmentGenerateTask.isReady()` to skip the lock acquisition if the `numShardsOverride` is set (indicating that that cardinality phase ran).

##########
File path: core/src/main/java/org/apache/druid/indexer/partitions/HashedPartitionsSpec.java
##########
@@ -160,7 +160,7 @@ public Integer getNumShards()
   @Override
   public String getForceGuaranteedRollupIncompatiblityReason()
   {
-    return getNumShards() == null ? NUM_SHARDS + " must be specified" : FORCE_GUARANTEED_ROLLUP_COMPATIBLE;
+    return FORCE_GUARANTEED_ROLLUP_COMPATIBLE;

Review comment:
       Updated the docs to mark `numShards` as optional and added a description of the new cardinality scan phase.

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
##########
@@ -499,17 +518,62 @@ private TaskStatus runMultiPhaseParallel(TaskToolbox toolbox) throws Exception
 
   private TaskStatus runHashPartitionMultiPhaseParallel(TaskToolbox toolbox) throws Exception
   {
+    TaskState state;
+
+    if (!(ingestionSchema.getTuningConfig().getPartitionsSpec() instanceof HashedPartitionsSpec)) {
+      // only range and hash partitioning is supported for multiphase parallel ingestion, see runMultiPhaseParallel()
+      throw new ISE(
+          "forceGuaranteedRollup is set but partitionsSpec [%s] is not a ranged or hash partition spec.",
+          ingestionSchema.getTuningConfig().getPartitionsSpec()
+      );
+    }
+
+    final Integer numShardsOverride;
+    HashedPartitionsSpec partitionsSpec = (HashedPartitionsSpec) ingestionSchema.getTuningConfig().getPartitionsSpec();
+    if (ingestionSchema.getTuningConfig().isForceGuaranteedRollup() && partitionsSpec.getNumShards() == null) {

Review comment:
       Removed the isForceGuaranteedRollup check here




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jon-wei commented on a change in pull request #10419: Automatically determine numShards for parallel ingestion hash partitioning

Posted by GitBox <gi...@apache.org>.
jon-wei commented on a change in pull request #10419:
URL: https://github.com/apache/druid/pull/10419#discussion_r493978602



##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import org.apache.datasketches.hll.HllSketch;
+import org.apache.druid.data.input.HandlingInputRowIterator;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputSource;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
+import org.apache.druid.indexing.common.task.ClientBasedTaskInfoProvider;
+import org.apache.druid.indexing.common.task.IndexTask;
+import org.apache.druid.indexing.common.task.TaskResource;
+import org.apache.druid.indexing.common.task.batch.parallel.iterator.DefaultIndexTaskInputRowIteratorBuilder;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.granularity.GranularitySpec;
+import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class PartialDimensionCardinalityTask extends PerfectRollupWorkerTask
+{
+  public static final String TYPE = "partial_dimension_cardinality";
+  private static final Logger LOG = new Logger(PartialDimensionCardinalityTask.class);
+
+  private final int numAttempts;
+  private final ParallelIndexIngestionSpec ingestionSchema;
+  private final String supervisorTaskId;
+
+  private final ObjectMapper jsonMapper;
+
+  @JsonCreator
+  PartialDimensionCardinalityTask(
+      // id shouldn't be null except when this task is created by ParallelIndexSupervisorTask
+      @JsonProperty("id") @Nullable String id,
+      @JsonProperty("groupId") final String groupId,
+      @JsonProperty("resource") final TaskResource taskResource,
+      @JsonProperty("supervisorTaskId") final String supervisorTaskId,
+      @JsonProperty("numAttempts") final int numAttempts, // zero-based counting
+      @JsonProperty("spec") final ParallelIndexIngestionSpec ingestionSchema,
+      @JsonProperty("context") final Map<String, Object> context,
+      @JacksonInject ObjectMapper jsonMapper
+  )
+  {
+    super(
+        getOrMakeId(id, TYPE, ingestionSchema.getDataSchema().getDataSource()),
+        groupId,
+        taskResource,
+        ingestionSchema.getDataSchema(),
+        ingestionSchema.getTuningConfig(),
+        context
+    );
+
+    Preconditions.checkArgument(
+        ingestionSchema.getTuningConfig().getPartitionsSpec() instanceof HashedPartitionsSpec,
+        "%s partitionsSpec required",
+        HashedPartitionsSpec.NAME
+    );
+
+    this.numAttempts = numAttempts;
+    this.ingestionSchema = ingestionSchema;
+    this.supervisorTaskId = supervisorTaskId;
+    this.jsonMapper = jsonMapper;
+  }
+
+  @JsonProperty
+  private int getNumAttempts()
+  {
+    return numAttempts;
+  }
+
+  @JsonProperty("spec")
+  private ParallelIndexIngestionSpec getIngestionSchema()
+  {
+    return ingestionSchema;
+  }
+
+  @JsonProperty
+  private String getSupervisorTaskId()
+  {
+    return supervisorTaskId;
+  }
+
+  @Override
+  public String getType()
+  {
+    return TYPE;
+  }
+
+  @Override
+  public boolean isReady(TaskActionClient taskActionClient) throws Exception
+  {
+    return tryTimeChunkLock(
+        taskActionClient,
+        getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals()
+    );
+  }
+
+  @Override
+  public TaskStatus runTask(TaskToolbox toolbox) throws Exception
+  {
+    DataSchema dataSchema = ingestionSchema.getDataSchema();
+    GranularitySpec granularitySpec = dataSchema.getGranularitySpec();
+    ParallelIndexTuningConfig tuningConfig = ingestionSchema.getTuningConfig();
+
+    HashedPartitionsSpec partitionsSpec = (HashedPartitionsSpec) tuningConfig.getPartitionsSpec();
+    Preconditions.checkNotNull(partitionsSpec, "partitionsSpec required in tuningConfig");
+
+    List<String> partitionDimensions = partitionsSpec.getPartitionDimensions();
+    if (partitionDimensions == null) {
+      partitionDimensions = HashBasedNumberedShardSpec.DEFAULT_PARTITION_DIMENSIONS;
+    }
+
+    InputSource inputSource = ingestionSchema.getIOConfig().getNonNullInputSource(
+        ingestionSchema.getDataSchema().getParser()
+    );
+    InputFormat inputFormat = inputSource.needsFormat()
+                              ? ParallelIndexSupervisorTask.getInputFormat(ingestionSchema)
+                              : null;
+    final RowIngestionMeters buildSegmentsMeters = toolbox.getRowIngestionMetersFactory().createRowIngestionMeters();
+    final ParseExceptionHandler parseExceptionHandler = new ParseExceptionHandler(
+        buildSegmentsMeters,
+        tuningConfig.isLogParseExceptions(),
+        tuningConfig.getMaxParseExceptions(),
+        tuningConfig.getMaxSavedParseExceptions()
+    );
+
+    try (
+        final CloseableIterator<InputRow> inputRowIterator = AbstractBatchIndexTask.inputSourceReader(
+            toolbox.getIndexingTmpDir(),
+            dataSchema,
+            inputSource,
+            inputFormat,
+            AbstractBatchIndexTask.defaultRowFilter(granularitySpec),
+            buildSegmentsMeters,
+            parseExceptionHandler
+        );
+        HandlingInputRowIterator iterator =
+            new DefaultIndexTaskInputRowIteratorBuilder()
+                .delegate(inputRowIterator)
+                .granularitySpec(granularitySpec)
+                .build()
+    ) {
+      Map<Interval, byte[]> cardinalities = determineCardinalities(
+          iterator,
+          granularitySpec,
+          partitionDimensions
+      );
+
+      sendReport(
+          toolbox,
+          new DimensionCardinalityReport(getId(), cardinalities)
+      );
+    }
+
+    return TaskStatus.success(getId());
+  }
+
+  private Map<Interval, byte[]> determineCardinalities(
+      HandlingInputRowIterator inputRowIterator,
+      GranularitySpec granularitySpec,
+      List<String> partitionDimensions
+  )
+  {
+    Map<Interval, HllSketch> intervalToCardinalities = new HashMap<>();
+    while (inputRowIterator.hasNext()) {
+      InputRow inputRow = inputRowIterator.next();
+      if (inputRow == null) {
+        continue;
+      }
+
+      DateTime timestamp = inputRow.getTimestamp();
+      //noinspection OptionalGetWithoutIsPresent (InputRowIterator returns rows with present intervals)
+      Interval interval = granularitySpec.bucketInterval(timestamp).get();
+
+      HllSketch hllSketch = intervalToCardinalities.computeIfAbsent(
+          interval,
+          (intervalKey) -> {
+            return DimensionCardinalityReport.createHllSketchForReport();
+          }
+      );
+      List<Object> groupKey = HashBasedNumberedShardSpec.getGroupKey(
+          partitionDimensions,
+          interval.getStartMillis(),

Review comment:
       Ah, thanks, I fixed this and updated `PartialDimensionCardinalityTaskTest.sendsCorrectReportWithMultipleIntervalsInData()` to test that case




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a change in pull request #10419: Automatically determine numShards for parallel ingestion hash partitioning

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on a change in pull request #10419:
URL: https://github.com/apache/druid/pull/10419#discussion_r493417970



##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import org.apache.datasketches.hll.HllSketch;
+import org.apache.druid.data.input.HandlingInputRowIterator;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputSource;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
+import org.apache.druid.indexing.common.task.ClientBasedTaskInfoProvider;
+import org.apache.druid.indexing.common.task.IndexTask;
+import org.apache.druid.indexing.common.task.TaskResource;
+import org.apache.druid.indexing.common.task.batch.parallel.iterator.DefaultIndexTaskInputRowIteratorBuilder;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.granularity.GranularitySpec;
+import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class PartialDimensionCardinalityTask extends PerfectRollupWorkerTask
+{
+  public static final String TYPE = "partial_dimension_cardinality";
+  private static final Logger LOG = new Logger(PartialDimensionCardinalityTask.class);
+
+  private final int numAttempts;
+  private final ParallelIndexIngestionSpec ingestionSchema;
+  private final String supervisorTaskId;
+
+  private final ObjectMapper jsonMapper;
+
+  @JsonCreator
+  PartialDimensionCardinalityTask(
+      // id shouldn't be null except when this task is created by ParallelIndexSupervisorTask
+      @JsonProperty("id") @Nullable String id,
+      @JsonProperty("groupId") final String groupId,
+      @JsonProperty("resource") final TaskResource taskResource,
+      @JsonProperty("supervisorTaskId") final String supervisorTaskId,
+      @JsonProperty("numAttempts") final int numAttempts, // zero-based counting
+      @JsonProperty("spec") final ParallelIndexIngestionSpec ingestionSchema,
+      @JsonProperty("context") final Map<String, Object> context,
+      @JacksonInject ObjectMapper jsonMapper
+  )
+  {
+    super(
+        getOrMakeId(id, TYPE, ingestionSchema.getDataSchema().getDataSource()),
+        groupId,
+        taskResource,
+        ingestionSchema.getDataSchema(),
+        ingestionSchema.getTuningConfig(),
+        context
+    );
+
+    Preconditions.checkArgument(
+        ingestionSchema.getTuningConfig().getPartitionsSpec() instanceof HashedPartitionsSpec,
+        "%s partitionsSpec required",
+        HashedPartitionsSpec.NAME
+    );
+
+    this.numAttempts = numAttempts;
+    this.ingestionSchema = ingestionSchema;
+    this.supervisorTaskId = supervisorTaskId;
+    this.jsonMapper = jsonMapper;
+  }
+
+  @JsonProperty
+  private int getNumAttempts()
+  {
+    return numAttempts;
+  }
+
+  @JsonProperty("spec")
+  private ParallelIndexIngestionSpec getIngestionSchema()
+  {
+    return ingestionSchema;
+  }
+
+  @JsonProperty
+  private String getSupervisorTaskId()
+  {
+    return supervisorTaskId;
+  }
+
+  @Override
+  public String getType()
+  {
+    return TYPE;
+  }
+
+  @Override
+  public boolean isReady(TaskActionClient taskActionClient) throws Exception
+  {
+    return tryTimeChunkLock(
+        taskActionClient,
+        getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals()
+    );
+  }
+
+  @Override
+  public TaskStatus runTask(TaskToolbox toolbox) throws Exception
+  {
+    DataSchema dataSchema = ingestionSchema.getDataSchema();
+    GranularitySpec granularitySpec = dataSchema.getGranularitySpec();
+    ParallelIndexTuningConfig tuningConfig = ingestionSchema.getTuningConfig();
+
+    HashedPartitionsSpec partitionsSpec = (HashedPartitionsSpec) tuningConfig.getPartitionsSpec();
+    Preconditions.checkNotNull(partitionsSpec, "partitionsSpec required in tuningConfig");
+
+    List<String> partitionDimensions = partitionsSpec.getPartitionDimensions();
+    if (partitionDimensions == null) {
+      partitionDimensions = HashBasedNumberedShardSpec.DEFAULT_PARTITION_DIMENSIONS;
+    }
+
+    InputSource inputSource = ingestionSchema.getIOConfig().getNonNullInputSource(
+        ingestionSchema.getDataSchema().getParser()
+    );
+    InputFormat inputFormat = inputSource.needsFormat()
+                              ? ParallelIndexSupervisorTask.getInputFormat(ingestionSchema)
+                              : null;
+    final RowIngestionMeters buildSegmentsMeters = toolbox.getRowIngestionMetersFactory().createRowIngestionMeters();
+    final ParseExceptionHandler parseExceptionHandler = new ParseExceptionHandler(
+        buildSegmentsMeters,
+        tuningConfig.isLogParseExceptions(),
+        tuningConfig.getMaxParseExceptions(),
+        tuningConfig.getMaxSavedParseExceptions()
+    );
+
+    try (
+        final CloseableIterator<InputRow> inputRowIterator = AbstractBatchIndexTask.inputSourceReader(
+            toolbox.getIndexingTmpDir(),
+            dataSchema,
+            inputSource,
+            inputFormat,
+            AbstractBatchIndexTask.defaultRowFilter(granularitySpec),
+            buildSegmentsMeters,
+            parseExceptionHandler
+        );
+        HandlingInputRowIterator iterator =
+            new DefaultIndexTaskInputRowIteratorBuilder()
+                .delegate(inputRowIterator)
+                .granularitySpec(granularitySpec)
+                .build()
+    ) {
+      Map<Interval, byte[]> cardinalities = determineCardinalities(
+          iterator,
+          granularitySpec,
+          partitionDimensions
+      );
+
+      sendReport(
+          toolbox,
+          new DimensionCardinalityReport(getId(), cardinalities)
+      );
+    }
+
+    return TaskStatus.success(getId());
+  }
+
+  private Map<Interval, byte[]> determineCardinalities(
+      HandlingInputRowIterator inputRowIterator,
+      GranularitySpec granularitySpec,
+      List<String> partitionDimensions
+  )
+  {
+    Map<Interval, HllSketch> intervalToCardinalities = new HashMap<>();
+    while (inputRowIterator.hasNext()) {
+      InputRow inputRow = inputRowIterator.next();
+      if (inputRow == null) {
+        continue;
+      }
+
+      DateTime timestamp = inputRow.getTimestamp();
+      //noinspection OptionalGetWithoutIsPresent (InputRowIterator returns rows with present intervals)
+      Interval interval = granularitySpec.bucketInterval(timestamp).get();
+
+      HllSketch hllSketch = intervalToCardinalities.computeIfAbsent(
+          interval,
+          (intervalKey) -> {
+            return DimensionCardinalityReport.createHllSketchForReport();
+          }
+      );
+      List<Object> groupKey = HashBasedNumberedShardSpec.getGroupKey(
+          partitionDimensions,
+          interval.getStartMillis(),
+          inputRow
+      );
+
+      try {
+        hllSketch.update(
+            IndexTask.HASH_FUNCTION.hashBytes(jsonMapper.writeValueAsBytes(groupKey)).asBytes()

Review comment:
       out of curiosity, what is the effect if we pass `jsonMapper.writeValueAsBytes(groupKey)` directly to `hllSketch`? does it affect performance or accuracy in any way? `hllSketch` is already doing hashing on the byte array input.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jon-wei commented on a change in pull request #10419: Automatically determine numShards for parallel ingestion hash partitioning

Posted by GitBox <gi...@apache.org>.
jon-wei commented on a change in pull request #10419:
URL: https://github.com/apache/druid/pull/10419#discussion_r493977686



##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import org.apache.datasketches.hll.HllSketch;
+import org.apache.druid.data.input.HandlingInputRowIterator;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputSource;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
+import org.apache.druid.indexing.common.task.ClientBasedTaskInfoProvider;
+import org.apache.druid.indexing.common.task.IndexTask;
+import org.apache.druid.indexing.common.task.TaskResource;
+import org.apache.druid.indexing.common.task.batch.parallel.iterator.DefaultIndexTaskInputRowIteratorBuilder;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.granularity.GranularitySpec;
+import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class PartialDimensionCardinalityTask extends PerfectRollupWorkerTask
+{
+  public static final String TYPE = "partial_dimension_cardinality";
+  private static final Logger LOG = new Logger(PartialDimensionCardinalityTask.class);
+
+  private final int numAttempts;
+  private final ParallelIndexIngestionSpec ingestionSchema;
+  private final String supervisorTaskId;
+
+  private final ObjectMapper jsonMapper;
+
+  @JsonCreator
+  PartialDimensionCardinalityTask(
+      // id shouldn't be null except when this task is created by ParallelIndexSupervisorTask
+      @JsonProperty("id") @Nullable String id,
+      @JsonProperty("groupId") final String groupId,
+      @JsonProperty("resource") final TaskResource taskResource,
+      @JsonProperty("supervisorTaskId") final String supervisorTaskId,
+      @JsonProperty("numAttempts") final int numAttempts, // zero-based counting
+      @JsonProperty("spec") final ParallelIndexIngestionSpec ingestionSchema,
+      @JsonProperty("context") final Map<String, Object> context,
+      @JacksonInject ObjectMapper jsonMapper
+  )
+  {
+    super(
+        getOrMakeId(id, TYPE, ingestionSchema.getDataSchema().getDataSource()),
+        groupId,
+        taskResource,
+        ingestionSchema.getDataSchema(),
+        ingestionSchema.getTuningConfig(),
+        context
+    );
+
+    Preconditions.checkArgument(
+        ingestionSchema.getTuningConfig().getPartitionsSpec() instanceof HashedPartitionsSpec,
+        "%s partitionsSpec required",
+        HashedPartitionsSpec.NAME
+    );
+
+    this.numAttempts = numAttempts;
+    this.ingestionSchema = ingestionSchema;
+    this.supervisorTaskId = supervisorTaskId;
+    this.jsonMapper = jsonMapper;
+  }
+
+  @JsonProperty
+  private int getNumAttempts()
+  {
+    return numAttempts;
+  }
+
+  @JsonProperty("spec")
+  private ParallelIndexIngestionSpec getIngestionSchema()
+  {
+    return ingestionSchema;
+  }
+
+  @JsonProperty
+  private String getSupervisorTaskId()
+  {
+    return supervisorTaskId;
+  }
+
+  @Override
+  public String getType()
+  {
+    return TYPE;
+  }
+
+  @Override
+  public boolean isReady(TaskActionClient taskActionClient) throws Exception
+  {
+    return tryTimeChunkLock(
+        taskActionClient,
+        getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals()
+    );
+  }
+
+  @Override
+  public TaskStatus runTask(TaskToolbox toolbox) throws Exception
+  {
+    DataSchema dataSchema = ingestionSchema.getDataSchema();
+    GranularitySpec granularitySpec = dataSchema.getGranularitySpec();
+    ParallelIndexTuningConfig tuningConfig = ingestionSchema.getTuningConfig();
+
+    HashedPartitionsSpec partitionsSpec = (HashedPartitionsSpec) tuningConfig.getPartitionsSpec();
+    Preconditions.checkNotNull(partitionsSpec, "partitionsSpec required in tuningConfig");
+
+    List<String> partitionDimensions = partitionsSpec.getPartitionDimensions();
+    if (partitionDimensions == null) {
+      partitionDimensions = HashBasedNumberedShardSpec.DEFAULT_PARTITION_DIMENSIONS;
+    }
+
+    InputSource inputSource = ingestionSchema.getIOConfig().getNonNullInputSource(
+        ingestionSchema.getDataSchema().getParser()
+    );
+    InputFormat inputFormat = inputSource.needsFormat()
+                              ? ParallelIndexSupervisorTask.getInputFormat(ingestionSchema)
+                              : null;
+    final RowIngestionMeters buildSegmentsMeters = toolbox.getRowIngestionMetersFactory().createRowIngestionMeters();
+    final ParseExceptionHandler parseExceptionHandler = new ParseExceptionHandler(
+        buildSegmentsMeters,
+        tuningConfig.isLogParseExceptions(),
+        tuningConfig.getMaxParseExceptions(),
+        tuningConfig.getMaxSavedParseExceptions()
+    );
+
+    try (
+        final CloseableIterator<InputRow> inputRowIterator = AbstractBatchIndexTask.inputSourceReader(
+            toolbox.getIndexingTmpDir(),
+            dataSchema,
+            inputSource,
+            inputFormat,
+            AbstractBatchIndexTask.defaultRowFilter(granularitySpec),
+            buildSegmentsMeters,
+            parseExceptionHandler
+        );
+        HandlingInputRowIterator iterator =
+            new DefaultIndexTaskInputRowIteratorBuilder()
+                .delegate(inputRowIterator)
+                .granularitySpec(granularitySpec)
+                .build()
+    ) {
+      Map<Interval, byte[]> cardinalities = determineCardinalities(
+          iterator,
+          granularitySpec,
+          partitionDimensions
+      );
+
+      sendReport(
+          toolbox,
+          new DimensionCardinalityReport(getId(), cardinalities)
+      );
+    }
+
+    return TaskStatus.success(getId());
+  }
+
+  private Map<Interval, byte[]> determineCardinalities(
+      HandlingInputRowIterator inputRowIterator,
+      GranularitySpec granularitySpec,
+      List<String> partitionDimensions
+  )
+  {
+    Map<Interval, HllSketch> intervalToCardinalities = new HashMap<>();
+    while (inputRowIterator.hasNext()) {
+      InputRow inputRow = inputRowIterator.next();
+      if (inputRow == null) {
+        continue;
+      }
+
+      DateTime timestamp = inputRow.getTimestamp();
+      //noinspection OptionalGetWithoutIsPresent (InputRowIterator returns rows with present intervals)
+      Interval interval = granularitySpec.bucketInterval(timestamp).get();
+
+      HllSketch hllSketch = intervalToCardinalities.computeIfAbsent(
+          interval,
+          (intervalKey) -> {
+            return DimensionCardinalityReport.createHllSketchForReport();
+          }
+      );
+      List<Object> groupKey = HashBasedNumberedShardSpec.getGroupKey(
+          partitionDimensions,
+          interval.getStartMillis(),
+          inputRow
+      );
+
+      try {
+        hllSketch.update(
+            IndexTask.HASH_FUNCTION.hashBytes(jsonMapper.writeValueAsBytes(groupKey)).asBytes()

Review comment:
       Good point, I got rid of the first level of hashing there, it should be more accurate this way

##########
File path: docs/ingestion/native-batch.md
##########
@@ -294,11 +294,17 @@ How the worker task creates segments is:
 |property|description|default|required?|
 |--------|-----------|-------|---------|
 |type|This should always be `hashed`|none|yes|
-|numShards|Directly specify the number of shards to create. If this is specified and `intervals` is specified in the `granularitySpec`, the index task can skip the determine intervals/partitions pass through the data.|null|yes|
+|numShards|Directly specify the number of shards to create. If this is specified and `intervals` is specified in the `granularitySpec`, the index task can skip the determine intervals/partitions pass through the data.|null|no|

Review comment:
       Added docs for `targetRowsPerSegment`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on pull request #10419: Automatically determine numShards for parallel ingestion hash partitioning

Posted by GitBox <gi...@apache.org>.
jihoonson commented on pull request #10419:
URL: https://github.com/apache/druid/pull/10419#issuecomment-698508270


   One thing to note: after this PR, the parallel task can compute the number of partitions automatically, but the same number will be applied to all intervals. I think it will be better to compute and apply different numbers of partitions per interval to handle potential data skew between intervals which is what the simple task (`IndexTask`) does. But I'm OK with doing this improvement as a follow-up.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jon-wei merged pull request #10419: Automatically determine numShards for parallel ingestion hash partitioning

Posted by GitBox <gi...@apache.org>.
jon-wei merged pull request #10419:
URL: https://github.com/apache/druid/pull/10419


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jon-wei commented on pull request #10419: Automatically determine numShards for parallel ingestion hash partitioning

Posted by GitBox <gi...@apache.org>.
jon-wei commented on pull request #10419:
URL: https://github.com/apache/druid/pull/10419#issuecomment-698047996


   I added the locking back on PartialHashSegmentGenerateTask to address https://github.com/apache/druid/pull/10419#discussion_r493899887


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #10419: Automatically determine numShards for parallel ingestion hash partitioning

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #10419:
URL: https://github.com/apache/druid/pull/10419#discussion_r493891466



##########
File path: docs/ingestion/native-batch.md
##########
@@ -294,11 +294,17 @@ How the worker task creates segments is:
 |property|description|default|required?|
 |--------|-----------|-------|---------|
 |type|This should always be `hashed`|none|yes|
-|numShards|Directly specify the number of shards to create. If this is specified and `intervals` is specified in the `granularitySpec`, the index task can skip the determine intervals/partitions pass through the data.|null|yes|
+|numShards|Directly specify the number of shards to create. If this is specified and `intervals` is specified in the `granularitySpec`, the index task can skip the determine intervals/partitions pass through the data.|null|no|
 |partitionDimensions|The dimensions to partition on. Leave blank to select all dimensions.|null|no|
 
 The Parallel task with hash-based partitioning is similar to [MapReduce](https://en.wikipedia.org/wiki/MapReduce).
-The task runs in 2 phases, i.e., `partial segment generation` and `partial segment merge`.
+The task runs in up to 3 phases: `partial_dimension_cardinality`, `partial segment generation` and `partial segment merge`.
+- The `partial_dimension_cardinality` phase is an optional phase that only runs if `numShards` is not specified.
+The Parallel task splits the input data and assigns them to worker tasks based on the split hint spec.
+Each worker task (type `partial_dimension_cardinality`) gathers estimates of partitioning dimensions cardinality for
+each time chunk. The Parallel task will aggregate these estimates from the worker tasks and determine the highest
+cardinality across all of the time chunks in the input data, dividing this cardinality by `maxRowsPerSegment` to

Review comment:
       It seems like that it will guarantee that you will not have segments than the computed `numShards`, but it will not be guaranteed that number of rows per segment doesn't exceed `maxRowsPerSegment` since the partition dimensions can be skewed. Is this correct? Then, I suggest `targetRowsPerSegment` since it's not a hard limit.

##########
File path: docs/ingestion/native-batch.md
##########
@@ -294,11 +294,17 @@ How the worker task creates segments is:
 |property|description|default|required?|
 |--------|-----------|-------|---------|
 |type|This should always be `hashed`|none|yes|
-|numShards|Directly specify the number of shards to create. If this is specified and `intervals` is specified in the `granularitySpec`, the index task can skip the determine intervals/partitions pass through the data.|null|yes|
+|numShards|Directly specify the number of shards to create. If this is specified and `intervals` is specified in the `granularitySpec`, the index task can skip the determine intervals/partitions pass through the data.|null|no|

Review comment:
       Please add the new parameter and its description.

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import org.apache.datasketches.hll.HllSketch;
+import org.apache.druid.data.input.HandlingInputRowIterator;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputSource;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
+import org.apache.druid.indexing.common.task.ClientBasedTaskInfoProvider;
+import org.apache.druid.indexing.common.task.IndexTask;
+import org.apache.druid.indexing.common.task.TaskResource;
+import org.apache.druid.indexing.common.task.batch.parallel.iterator.DefaultIndexTaskInputRowIteratorBuilder;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.granularity.GranularitySpec;
+import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class PartialDimensionCardinalityTask extends PerfectRollupWorkerTask
+{
+  public static final String TYPE = "partial_dimension_cardinality";
+  private static final Logger LOG = new Logger(PartialDimensionCardinalityTask.class);
+
+  private final int numAttempts;
+  private final ParallelIndexIngestionSpec ingestionSchema;
+  private final String supervisorTaskId;
+
+  private final ObjectMapper jsonMapper;
+
+  @JsonCreator
+  PartialDimensionCardinalityTask(
+      // id shouldn't be null except when this task is created by ParallelIndexSupervisorTask
+      @JsonProperty("id") @Nullable String id,
+      @JsonProperty("groupId") final String groupId,
+      @JsonProperty("resource") final TaskResource taskResource,
+      @JsonProperty("supervisorTaskId") final String supervisorTaskId,
+      @JsonProperty("numAttempts") final int numAttempts, // zero-based counting
+      @JsonProperty("spec") final ParallelIndexIngestionSpec ingestionSchema,
+      @JsonProperty("context") final Map<String, Object> context,
+      @JacksonInject ObjectMapper jsonMapper
+  )
+  {
+    super(
+        getOrMakeId(id, TYPE, ingestionSchema.getDataSchema().getDataSource()),
+        groupId,
+        taskResource,
+        ingestionSchema.getDataSchema(),
+        ingestionSchema.getTuningConfig(),
+        context
+    );
+
+    Preconditions.checkArgument(
+        ingestionSchema.getTuningConfig().getPartitionsSpec() instanceof HashedPartitionsSpec,
+        "%s partitionsSpec required",
+        HashedPartitionsSpec.NAME
+    );
+
+    this.numAttempts = numAttempts;
+    this.ingestionSchema = ingestionSchema;
+    this.supervisorTaskId = supervisorTaskId;
+    this.jsonMapper = jsonMapper;
+  }
+
+  @JsonProperty
+  private int getNumAttempts()
+  {
+    return numAttempts;
+  }
+
+  @JsonProperty("spec")
+  private ParallelIndexIngestionSpec getIngestionSchema()
+  {
+    return ingestionSchema;
+  }
+
+  @JsonProperty
+  private String getSupervisorTaskId()
+  {
+    return supervisorTaskId;
+  }
+
+  @Override
+  public String getType()
+  {
+    return TYPE;
+  }
+
+  @Override
+  public boolean isReady(TaskActionClient taskActionClient) throws Exception
+  {
+    return tryTimeChunkLock(
+        taskActionClient,
+        getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals()
+    );
+  }
+
+  @Override
+  public TaskStatus runTask(TaskToolbox toolbox) throws Exception
+  {
+    DataSchema dataSchema = ingestionSchema.getDataSchema();
+    GranularitySpec granularitySpec = dataSchema.getGranularitySpec();
+    ParallelIndexTuningConfig tuningConfig = ingestionSchema.getTuningConfig();
+
+    HashedPartitionsSpec partitionsSpec = (HashedPartitionsSpec) tuningConfig.getPartitionsSpec();
+    Preconditions.checkNotNull(partitionsSpec, "partitionsSpec required in tuningConfig");
+
+    List<String> partitionDimensions = partitionsSpec.getPartitionDimensions();
+    if (partitionDimensions == null) {
+      partitionDimensions = HashBasedNumberedShardSpec.DEFAULT_PARTITION_DIMENSIONS;
+    }
+
+    InputSource inputSource = ingestionSchema.getIOConfig().getNonNullInputSource(
+        ingestionSchema.getDataSchema().getParser()
+    );
+    InputFormat inputFormat = inputSource.needsFormat()
+                              ? ParallelIndexSupervisorTask.getInputFormat(ingestionSchema)
+                              : null;
+    final RowIngestionMeters buildSegmentsMeters = toolbox.getRowIngestionMetersFactory().createRowIngestionMeters();
+    final ParseExceptionHandler parseExceptionHandler = new ParseExceptionHandler(
+        buildSegmentsMeters,
+        tuningConfig.isLogParseExceptions(),
+        tuningConfig.getMaxParseExceptions(),
+        tuningConfig.getMaxSavedParseExceptions()
+    );
+
+    try (
+        final CloseableIterator<InputRow> inputRowIterator = AbstractBatchIndexTask.inputSourceReader(
+            toolbox.getIndexingTmpDir(),
+            dataSchema,
+            inputSource,
+            inputFormat,
+            AbstractBatchIndexTask.defaultRowFilter(granularitySpec),
+            buildSegmentsMeters,
+            parseExceptionHandler
+        );
+        HandlingInputRowIterator iterator =
+            new DefaultIndexTaskInputRowIteratorBuilder()

Review comment:
       nit: `DefaultIndexTaskInputRowIteratorBuilder` effectively does nothing here since its core functionality has been moved to `FilteringCloseableInputRowIterator` in #10336. I haven't cleaned up this interface yet.  Now, it's only useful in range partitioning as some more useful inputRowHandlers are appended to the default builder.

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import org.apache.datasketches.hll.HllSketch;
+import org.apache.druid.data.input.HandlingInputRowIterator;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputSource;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
+import org.apache.druid.indexing.common.task.ClientBasedTaskInfoProvider;
+import org.apache.druid.indexing.common.task.IndexTask;
+import org.apache.druid.indexing.common.task.TaskResource;
+import org.apache.druid.indexing.common.task.batch.parallel.iterator.DefaultIndexTaskInputRowIteratorBuilder;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.granularity.GranularitySpec;
+import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class PartialDimensionCardinalityTask extends PerfectRollupWorkerTask
+{
+  public static final String TYPE = "partial_dimension_cardinality";
+  private static final Logger LOG = new Logger(PartialDimensionCardinalityTask.class);
+
+  private final int numAttempts;
+  private final ParallelIndexIngestionSpec ingestionSchema;
+  private final String supervisorTaskId;
+
+  private final ObjectMapper jsonMapper;
+
+  @JsonCreator
+  PartialDimensionCardinalityTask(
+      // id shouldn't be null except when this task is created by ParallelIndexSupervisorTask
+      @JsonProperty("id") @Nullable String id,
+      @JsonProperty("groupId") final String groupId,
+      @JsonProperty("resource") final TaskResource taskResource,
+      @JsonProperty("supervisorTaskId") final String supervisorTaskId,
+      @JsonProperty("numAttempts") final int numAttempts, // zero-based counting
+      @JsonProperty("spec") final ParallelIndexIngestionSpec ingestionSchema,
+      @JsonProperty("context") final Map<String, Object> context,
+      @JacksonInject ObjectMapper jsonMapper
+  )
+  {
+    super(
+        getOrMakeId(id, TYPE, ingestionSchema.getDataSchema().getDataSource()),
+        groupId,
+        taskResource,
+        ingestionSchema.getDataSchema(),
+        ingestionSchema.getTuningConfig(),
+        context
+    );
+
+    Preconditions.checkArgument(
+        ingestionSchema.getTuningConfig().getPartitionsSpec() instanceof HashedPartitionsSpec,
+        "%s partitionsSpec required",
+        HashedPartitionsSpec.NAME
+    );
+
+    this.numAttempts = numAttempts;
+    this.ingestionSchema = ingestionSchema;
+    this.supervisorTaskId = supervisorTaskId;
+    this.jsonMapper = jsonMapper;
+  }
+
+  @JsonProperty
+  private int getNumAttempts()
+  {
+    return numAttempts;
+  }
+
+  @JsonProperty("spec")
+  private ParallelIndexIngestionSpec getIngestionSchema()
+  {
+    return ingestionSchema;
+  }
+
+  @JsonProperty
+  private String getSupervisorTaskId()
+  {
+    return supervisorTaskId;
+  }
+
+  @Override
+  public String getType()
+  {
+    return TYPE;
+  }
+
+  @Override
+  public boolean isReady(TaskActionClient taskActionClient) throws Exception
+  {
+    return tryTimeChunkLock(
+        taskActionClient,
+        getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals()
+    );
+  }
+
+  @Override
+  public TaskStatus runTask(TaskToolbox toolbox) throws Exception
+  {
+    DataSchema dataSchema = ingestionSchema.getDataSchema();
+    GranularitySpec granularitySpec = dataSchema.getGranularitySpec();
+    ParallelIndexTuningConfig tuningConfig = ingestionSchema.getTuningConfig();
+
+    HashedPartitionsSpec partitionsSpec = (HashedPartitionsSpec) tuningConfig.getPartitionsSpec();
+    Preconditions.checkNotNull(partitionsSpec, "partitionsSpec required in tuningConfig");
+
+    List<String> partitionDimensions = partitionsSpec.getPartitionDimensions();
+    if (partitionDimensions == null) {
+      partitionDimensions = HashBasedNumberedShardSpec.DEFAULT_PARTITION_DIMENSIONS;
+    }
+
+    InputSource inputSource = ingestionSchema.getIOConfig().getNonNullInputSource(
+        ingestionSchema.getDataSchema().getParser()
+    );
+    InputFormat inputFormat = inputSource.needsFormat()
+                              ? ParallelIndexSupervisorTask.getInputFormat(ingestionSchema)
+                              : null;
+    final RowIngestionMeters buildSegmentsMeters = toolbox.getRowIngestionMetersFactory().createRowIngestionMeters();
+    final ParseExceptionHandler parseExceptionHandler = new ParseExceptionHandler(
+        buildSegmentsMeters,
+        tuningConfig.isLogParseExceptions(),
+        tuningConfig.getMaxParseExceptions(),
+        tuningConfig.getMaxSavedParseExceptions()
+    );
+
+    try (
+        final CloseableIterator<InputRow> inputRowIterator = AbstractBatchIndexTask.inputSourceReader(
+            toolbox.getIndexingTmpDir(),
+            dataSchema,
+            inputSource,
+            inputFormat,
+            AbstractBatchIndexTask.defaultRowFilter(granularitySpec),
+            buildSegmentsMeters,
+            parseExceptionHandler
+        );
+        HandlingInputRowIterator iterator =
+            new DefaultIndexTaskInputRowIteratorBuilder()
+                .delegate(inputRowIterator)
+                .granularitySpec(granularitySpec)
+                .build()
+    ) {
+      Map<Interval, byte[]> cardinalities = determineCardinalities(
+          iterator,
+          granularitySpec,
+          partitionDimensions
+      );
+
+      sendReport(
+          toolbox,
+          new DimensionCardinalityReport(getId(), cardinalities)
+      );
+    }
+
+    return TaskStatus.success(getId());
+  }
+
+  private Map<Interval, byte[]> determineCardinalities(
+      HandlingInputRowIterator inputRowIterator,
+      GranularitySpec granularitySpec,
+      List<String> partitionDimensions
+  )
+  {
+    Map<Interval, HllSketch> intervalToCardinalities = new HashMap<>();
+    while (inputRowIterator.hasNext()) {
+      InputRow inputRow = inputRowIterator.next();
+      if (inputRow == null) {
+        continue;
+      }
+
+      DateTime timestamp = inputRow.getTimestamp();
+      //noinspection OptionalGetWithoutIsPresent (InputRowIterator returns rows with present intervals)
+      Interval interval = granularitySpec.bucketInterval(timestamp).get();
+
+      HllSketch hllSketch = intervalToCardinalities.computeIfAbsent(
+          interval,
+          (intervalKey) -> {
+            return DimensionCardinalityReport.createHllSketchForReport();
+          }
+      );
+      List<Object> groupKey = HashBasedNumberedShardSpec.getGroupKey(
+          partitionDimensions,
+          interval.getStartMillis(),

Review comment:
       This timestamp should be bucketed based on the query granularity. See https://github.com/apache/druid/blob/master/core/src/main/java/org/apache/druid/timeline/partition/HashBasedNumberedShardSpec.java#L146.

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
##########
@@ -582,6 +652,50 @@ private TaskStatus runRangePartitionMultiPhaseParallel(TaskToolbox toolbox) thro
     return TaskStatus.fromCode(getId(), mergeState);
   }
 
+  @VisibleForTesting
+  public static int determineNumShardsFromCardinalityReport(
+      Collection<DimensionCardinalityReport> reports,
+      int maxRowsPerSegment
+  )
+  {
+    // aggregate all the sub-reports
+    Map<Interval, Union> finalCollectors = new HashMap<>();
+    reports.forEach(report -> {
+      Map<Interval, byte[]> intervalToCardinality = report.getIntervalToCardinalities();
+      for (Map.Entry<Interval, byte[]> entry : intervalToCardinality.entrySet()) {
+        Union union = finalCollectors.computeIfAbsent(
+            entry.getKey(),
+            (key) -> {
+              return new Union(DimensionCardinalityReport.HLL_SKETCH_LOG_K);
+            }
+        );
+        HllSketch entryHll = HllSketch.wrap(Memory.wrap(entry.getValue()));
+        union.update(entryHll);
+      }
+    });
+
+    // determine the highest cardinality in any interval
+    long maxCardinality = Long.MIN_VALUE;
+    for (Union union : finalCollectors.values()) {
+      maxCardinality = Math.max(maxCardinality, (long) union.getEstimate());
+    }
+
+    LOG.info("Estimated max cardinality: " + maxCardinality);
+
+    // determine numShards based on maxRowsPerSegment and the highest per-interval cardinality
+    long numShards = maxCardinality / maxRowsPerSegment;
+    if (maxCardinality % maxRowsPerSegment != 0) {
+      // if there's a remainder add 1 so we stay under maxRowsPerSegment
+      numShards += 1;
+    }
+    try {
+      return Math.toIntExact(numShards);
+    }
+    catch (ArithmeticException ae) {
+      return Integer.MAX_VALUE;

Review comment:
       Hmm.. Should we fail instead? Since the timeline in the coordinator and the broker will explode if you have this many segments per interval.

##########
File path: docs/ingestion/native-batch.md
##########
@@ -294,11 +294,17 @@ How the worker task creates segments is:
 |property|description|default|required?|
 |--------|-----------|-------|---------|
 |type|This should always be `hashed`|none|yes|
-|numShards|Directly specify the number of shards to create. If this is specified and `intervals` is specified in the `granularitySpec`, the index task can skip the determine intervals/partitions pass through the data.|null|yes|
+|numShards|Directly specify the number of shards to create. If this is specified and `intervals` is specified in the `granularitySpec`, the index task can skip the determine intervals/partitions pass through the data.|null|no|
 |partitionDimensions|The dimensions to partition on. Leave blank to select all dimensions.|null|no|
 
 The Parallel task with hash-based partitioning is similar to [MapReduce](https://en.wikipedia.org/wiki/MapReduce).
-The task runs in 2 phases, i.e., `partial segment generation` and `partial segment merge`.
+The task runs in up to 3 phases: `partial_dimension_cardinality`, `partial segment generation` and `partial segment merge`.

Review comment:
       When I wrote this, my intention was showing the name of phases (e.g., https://github.com/apache/druid/pull/10419/files#diff-05bbc55d565a3d9462353d9b4771cb09R34). This phase name is not shown anywhere currently, but will be available in the task live reports and metrics after #10352. How about removing underscores from the here too?

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import org.apache.druid.data.input.HandlingInputRowIterator;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputSource;
+import org.apache.druid.hll.HyperLogLogCollector;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
+import org.apache.druid.indexing.common.task.ClientBasedTaskInfoProvider;
+import org.apache.druid.indexing.common.task.IndexTask;
+import org.apache.druid.indexing.common.task.TaskResource;
+import org.apache.druid.indexing.common.task.batch.parallel.iterator.DefaultIndexTaskInputRowIteratorBuilder;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.granularity.GranularitySpec;
+import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class PartialDimensionCardinalityTask extends PerfectRollupWorkerTask
+{
+  public static final String TYPE = "partial_dimension_cardinality";
+  private static final Logger LOG = new Logger(PartialDimensionCardinalityTask.class);
+
+  private final int numAttempts;
+  private final ParallelIndexIngestionSpec ingestionSchema;
+  private final String supervisorTaskId;
+
+  private final ObjectMapper jsonMapper;
+
+  @JsonCreator
+  PartialDimensionCardinalityTask(
+      // id shouldn't be null except when this task is created by ParallelIndexSupervisorTask
+      @JsonProperty("id") @Nullable String id,
+      @JsonProperty("groupId") final String groupId,
+      @JsonProperty("resource") final TaskResource taskResource,
+      @JsonProperty("supervisorTaskId") final String supervisorTaskId,
+      @JsonProperty("numAttempts") final int numAttempts, // zero-based counting
+      @JsonProperty("spec") final ParallelIndexIngestionSpec ingestionSchema,
+      @JsonProperty("context") final Map<String, Object> context,
+      @JacksonInject ObjectMapper jsonMapper
+  )
+  {
+    super(
+        getOrMakeId(id, TYPE, ingestionSchema.getDataSchema().getDataSource()),
+        groupId,
+        taskResource,
+        ingestionSchema.getDataSchema(),
+        ingestionSchema.getTuningConfig(),
+        context
+    );
+
+    Preconditions.checkArgument(
+        ingestionSchema.getTuningConfig().getPartitionsSpec() instanceof HashedPartitionsSpec,
+        "%s partitionsSpec required",
+        HashedPartitionsSpec.NAME
+    );
+
+    this.numAttempts = numAttempts;
+    this.ingestionSchema = ingestionSchema;
+    this.supervisorTaskId = supervisorTaskId;
+    this.jsonMapper = jsonMapper;
+  }
+
+  @JsonProperty
+  private int getNumAttempts()
+  {
+    return numAttempts;
+  }
+
+  @JsonProperty("spec")
+  private ParallelIndexIngestionSpec getIngestionSchema()
+  {
+    return ingestionSchema;
+  }
+
+  @JsonProperty
+  private String getSupervisorTaskId()
+  {
+    return supervisorTaskId;
+  }
+
+  @Override
+  public String getType()
+  {
+    return TYPE;
+  }
+
+  @Override
+  public boolean isReady(TaskActionClient taskActionClient) throws Exception
+  {
+    return tryTimeChunkLock(
+        taskActionClient,
+        getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals()
+    );
+  }

Review comment:
       I think acquiring locks here should be fine since it's idempotent. The supervisor task and all its subtasks share the same lock based on their groupId. I actually think it's better to call `tryTimeChunkLock()` in every subtask since it will make the task fail early when its lock is revoked. Otherwise, the task will fail when it publishes segments which happens at the last stage in batch ingestion.

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import org.apache.datasketches.hll.HllSketch;
+import org.apache.druid.data.input.HandlingInputRowIterator;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputSource;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
+import org.apache.druid.indexing.common.task.ClientBasedTaskInfoProvider;
+import org.apache.druid.indexing.common.task.IndexTask;
+import org.apache.druid.indexing.common.task.TaskResource;
+import org.apache.druid.indexing.common.task.batch.parallel.iterator.DefaultIndexTaskInputRowIteratorBuilder;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.granularity.GranularitySpec;
+import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class PartialDimensionCardinalityTask extends PerfectRollupWorkerTask
+{
+  public static final String TYPE = "partial_dimension_cardinality";
+  private static final Logger LOG = new Logger(PartialDimensionCardinalityTask.class);
+
+  private final int numAttempts;
+  private final ParallelIndexIngestionSpec ingestionSchema;
+  private final String supervisorTaskId;
+
+  private final ObjectMapper jsonMapper;
+
+  @JsonCreator
+  PartialDimensionCardinalityTask(
+      // id shouldn't be null except when this task is created by ParallelIndexSupervisorTask
+      @JsonProperty("id") @Nullable String id,
+      @JsonProperty("groupId") final String groupId,
+      @JsonProperty("resource") final TaskResource taskResource,
+      @JsonProperty("supervisorTaskId") final String supervisorTaskId,
+      @JsonProperty("numAttempts") final int numAttempts, // zero-based counting
+      @JsonProperty("spec") final ParallelIndexIngestionSpec ingestionSchema,
+      @JsonProperty("context") final Map<String, Object> context,
+      @JacksonInject ObjectMapper jsonMapper
+  )
+  {
+    super(
+        getOrMakeId(id, TYPE, ingestionSchema.getDataSchema().getDataSource()),
+        groupId,
+        taskResource,
+        ingestionSchema.getDataSchema(),
+        ingestionSchema.getTuningConfig(),
+        context
+    );
+
+    Preconditions.checkArgument(
+        ingestionSchema.getTuningConfig().getPartitionsSpec() instanceof HashedPartitionsSpec,
+        "%s partitionsSpec required",
+        HashedPartitionsSpec.NAME
+    );
+
+    this.numAttempts = numAttempts;
+    this.ingestionSchema = ingestionSchema;
+    this.supervisorTaskId = supervisorTaskId;
+    this.jsonMapper = jsonMapper;
+  }
+
+  @JsonProperty
+  private int getNumAttempts()
+  {
+    return numAttempts;
+  }
+
+  @JsonProperty("spec")
+  private ParallelIndexIngestionSpec getIngestionSchema()
+  {
+    return ingestionSchema;
+  }
+
+  @JsonProperty
+  private String getSupervisorTaskId()
+  {
+    return supervisorTaskId;
+  }
+
+  @Override
+  public String getType()
+  {
+    return TYPE;
+  }
+
+  @Override
+  public boolean isReady(TaskActionClient taskActionClient) throws Exception
+  {
+    return tryTimeChunkLock(
+        taskActionClient,
+        getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals()
+    );
+  }
+
+  @Override
+  public TaskStatus runTask(TaskToolbox toolbox) throws Exception
+  {
+    DataSchema dataSchema = ingestionSchema.getDataSchema();
+    GranularitySpec granularitySpec = dataSchema.getGranularitySpec();
+    ParallelIndexTuningConfig tuningConfig = ingestionSchema.getTuningConfig();
+
+    HashedPartitionsSpec partitionsSpec = (HashedPartitionsSpec) tuningConfig.getPartitionsSpec();
+    Preconditions.checkNotNull(partitionsSpec, "partitionsSpec required in tuningConfig");
+
+    List<String> partitionDimensions = partitionsSpec.getPartitionDimensions();
+    if (partitionDimensions == null) {
+      partitionDimensions = HashBasedNumberedShardSpec.DEFAULT_PARTITION_DIMENSIONS;
+    }
+
+    InputSource inputSource = ingestionSchema.getIOConfig().getNonNullInputSource(
+        ingestionSchema.getDataSchema().getParser()
+    );
+    InputFormat inputFormat = inputSource.needsFormat()
+                              ? ParallelIndexSupervisorTask.getInputFormat(ingestionSchema)
+                              : null;
+    final RowIngestionMeters buildSegmentsMeters = toolbox.getRowIngestionMetersFactory().createRowIngestionMeters();
+    final ParseExceptionHandler parseExceptionHandler = new ParseExceptionHandler(
+        buildSegmentsMeters,
+        tuningConfig.isLogParseExceptions(),
+        tuningConfig.getMaxParseExceptions(),
+        tuningConfig.getMaxSavedParseExceptions()
+    );
+
+    try (
+        final CloseableIterator<InputRow> inputRowIterator = AbstractBatchIndexTask.inputSourceReader(
+            toolbox.getIndexingTmpDir(),
+            dataSchema,
+            inputSource,
+            inputFormat,
+            AbstractBatchIndexTask.defaultRowFilter(granularitySpec),
+            buildSegmentsMeters,
+            parseExceptionHandler
+        );
+        HandlingInputRowIterator iterator =
+            new DefaultIndexTaskInputRowIteratorBuilder()
+                .delegate(inputRowIterator)
+                .granularitySpec(granularitySpec)
+                .build()
+    ) {
+      Map<Interval, byte[]> cardinalities = determineCardinalities(
+          iterator,
+          granularitySpec,
+          partitionDimensions
+      );
+
+      sendReport(
+          toolbox,
+          new DimensionCardinalityReport(getId(), cardinalities)
+      );
+    }
+
+    return TaskStatus.success(getId());
+  }
+
+  private Map<Interval, byte[]> determineCardinalities(
+      HandlingInputRowIterator inputRowIterator,
+      GranularitySpec granularitySpec,
+      List<String> partitionDimensions
+  )
+  {
+    Map<Interval, HllSketch> intervalToCardinalities = new HashMap<>();
+    while (inputRowIterator.hasNext()) {
+      InputRow inputRow = inputRowIterator.next();
+      if (inputRow == null) {

Review comment:
       nit: `inputRow` is safely non-null since `FilteringCloseableInputRowIterator` filters out all null rows. See `AbstractBatchIndexTask.defaultRowFilter()`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jon-wei commented on a change in pull request #10419: Automatically determine numShards for parallel ingestion hash partitioning

Posted by GitBox <gi...@apache.org>.
jon-wei commented on a change in pull request #10419:
URL: https://github.com/apache/druid/pull/10419#discussion_r493978191



##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import org.apache.datasketches.hll.HllSketch;
+import org.apache.druid.data.input.HandlingInputRowIterator;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputSource;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
+import org.apache.druid.indexing.common.task.ClientBasedTaskInfoProvider;
+import org.apache.druid.indexing.common.task.IndexTask;
+import org.apache.druid.indexing.common.task.TaskResource;
+import org.apache.druid.indexing.common.task.batch.parallel.iterator.DefaultIndexTaskInputRowIteratorBuilder;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.granularity.GranularitySpec;
+import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class PartialDimensionCardinalityTask extends PerfectRollupWorkerTask
+{
+  public static final String TYPE = "partial_dimension_cardinality";
+  private static final Logger LOG = new Logger(PartialDimensionCardinalityTask.class);
+
+  private final int numAttempts;
+  private final ParallelIndexIngestionSpec ingestionSchema;
+  private final String supervisorTaskId;
+
+  private final ObjectMapper jsonMapper;
+
+  @JsonCreator
+  PartialDimensionCardinalityTask(
+      // id shouldn't be null except when this task is created by ParallelIndexSupervisorTask
+      @JsonProperty("id") @Nullable String id,
+      @JsonProperty("groupId") final String groupId,
+      @JsonProperty("resource") final TaskResource taskResource,
+      @JsonProperty("supervisorTaskId") final String supervisorTaskId,
+      @JsonProperty("numAttempts") final int numAttempts, // zero-based counting
+      @JsonProperty("spec") final ParallelIndexIngestionSpec ingestionSchema,
+      @JsonProperty("context") final Map<String, Object> context,
+      @JacksonInject ObjectMapper jsonMapper
+  )
+  {
+    super(
+        getOrMakeId(id, TYPE, ingestionSchema.getDataSchema().getDataSource()),
+        groupId,
+        taskResource,
+        ingestionSchema.getDataSchema(),
+        ingestionSchema.getTuningConfig(),
+        context
+    );
+
+    Preconditions.checkArgument(
+        ingestionSchema.getTuningConfig().getPartitionsSpec() instanceof HashedPartitionsSpec,
+        "%s partitionsSpec required",
+        HashedPartitionsSpec.NAME
+    );
+
+    this.numAttempts = numAttempts;
+    this.ingestionSchema = ingestionSchema;
+    this.supervisorTaskId = supervisorTaskId;
+    this.jsonMapper = jsonMapper;
+  }
+
+  @JsonProperty
+  private int getNumAttempts()
+  {
+    return numAttempts;
+  }
+
+  @JsonProperty("spec")
+  private ParallelIndexIngestionSpec getIngestionSchema()
+  {
+    return ingestionSchema;
+  }
+
+  @JsonProperty
+  private String getSupervisorTaskId()
+  {
+    return supervisorTaskId;
+  }
+
+  @Override
+  public String getType()
+  {
+    return TYPE;
+  }
+
+  @Override
+  public boolean isReady(TaskActionClient taskActionClient) throws Exception
+  {
+    return tryTimeChunkLock(
+        taskActionClient,
+        getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals()
+    );
+  }
+
+  @Override
+  public TaskStatus runTask(TaskToolbox toolbox) throws Exception
+  {
+    DataSchema dataSchema = ingestionSchema.getDataSchema();
+    GranularitySpec granularitySpec = dataSchema.getGranularitySpec();
+    ParallelIndexTuningConfig tuningConfig = ingestionSchema.getTuningConfig();
+
+    HashedPartitionsSpec partitionsSpec = (HashedPartitionsSpec) tuningConfig.getPartitionsSpec();
+    Preconditions.checkNotNull(partitionsSpec, "partitionsSpec required in tuningConfig");
+
+    List<String> partitionDimensions = partitionsSpec.getPartitionDimensions();
+    if (partitionDimensions == null) {
+      partitionDimensions = HashBasedNumberedShardSpec.DEFAULT_PARTITION_DIMENSIONS;
+    }
+
+    InputSource inputSource = ingestionSchema.getIOConfig().getNonNullInputSource(
+        ingestionSchema.getDataSchema().getParser()
+    );
+    InputFormat inputFormat = inputSource.needsFormat()
+                              ? ParallelIndexSupervisorTask.getInputFormat(ingestionSchema)
+                              : null;
+    final RowIngestionMeters buildSegmentsMeters = toolbox.getRowIngestionMetersFactory().createRowIngestionMeters();
+    final ParseExceptionHandler parseExceptionHandler = new ParseExceptionHandler(
+        buildSegmentsMeters,
+        tuningConfig.isLogParseExceptions(),
+        tuningConfig.getMaxParseExceptions(),
+        tuningConfig.getMaxSavedParseExceptions()
+    );
+
+    try (
+        final CloseableIterator<InputRow> inputRowIterator = AbstractBatchIndexTask.inputSourceReader(
+            toolbox.getIndexingTmpDir(),
+            dataSchema,
+            inputSource,
+            inputFormat,
+            AbstractBatchIndexTask.defaultRowFilter(granularitySpec),
+            buildSegmentsMeters,
+            parseExceptionHandler
+        );
+        HandlingInputRowIterator iterator =
+            new DefaultIndexTaskInputRowIteratorBuilder()

Review comment:
       I changed this to just use the iterator above this directly




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] ccaominh commented on a change in pull request #10419: Automatically determine numShards for parallel ingestion hash partitioning

Posted by GitBox <gi...@apache.org>.
ccaominh commented on a change in pull request #10419:
URL: https://github.com/apache/druid/pull/10419#discussion_r493092529



##########
File path: core/src/main/java/org/apache/druid/indexer/partitions/HashedPartitionsSpec.java
##########
@@ -160,7 +160,7 @@ public Integer getNumShards()
   @Override
   public String getForceGuaranteedRollupIncompatiblityReason()
   {
-    return getNumShards() == null ? NUM_SHARDS + " must be specified" : FORCE_GUARANTEED_ROLLUP_COMPATIBLE;
+    return FORCE_GUARANTEED_ROLLUP_COMPATIBLE;

Review comment:
       https://github.com/apache/druid/blob/master/docs/ingestion/native-batch.md#hash-based-partitioning needs to be updated to say that `numShards` is no longer required and also to mention the new `partial dimension cardinality` task.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jon-wei commented on a change in pull request #10419: Automatically determine numShards for parallel ingestion hash partitioning

Posted by GitBox <gi...@apache.org>.
jon-wei commented on a change in pull request #10419:
URL: https://github.com/apache/druid/pull/10419#discussion_r493118119



##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import org.apache.druid.data.input.HandlingInputRowIterator;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputSource;
+import org.apache.druid.hll.HyperLogLogCollector;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
+import org.apache.druid.indexing.common.task.ClientBasedTaskInfoProvider;
+import org.apache.druid.indexing.common.task.IndexTask;
+import org.apache.druid.indexing.common.task.TaskResource;
+import org.apache.druid.indexing.common.task.batch.parallel.iterator.DefaultIndexTaskInputRowIteratorBuilder;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.granularity.GranularitySpec;
+import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class PartialDimensionCardinalityTask extends PerfectRollupWorkerTask
+{
+  public static final String TYPE = "partial_dimension_cardinality";
+  private static final Logger LOG = new Logger(PartialDimensionCardinalityTask.class);
+
+  private final int numAttempts;
+  private final ParallelIndexIngestionSpec ingestionSchema;
+  private final String supervisorTaskId;
+
+  private final ObjectMapper jsonMapper;
+
+  @JsonCreator
+  PartialDimensionCardinalityTask(
+      // id shouldn't be null except when this task is created by ParallelIndexSupervisorTask
+      @JsonProperty("id") @Nullable String id,
+      @JsonProperty("groupId") final String groupId,
+      @JsonProperty("resource") final TaskResource taskResource,
+      @JsonProperty("supervisorTaskId") final String supervisorTaskId,
+      @JsonProperty("numAttempts") final int numAttempts, // zero-based counting
+      @JsonProperty("spec") final ParallelIndexIngestionSpec ingestionSchema,
+      @JsonProperty("context") final Map<String, Object> context,
+      @JacksonInject ObjectMapper jsonMapper
+  )
+  {
+    super(
+        getOrMakeId(id, TYPE, ingestionSchema.getDataSchema().getDataSource()),
+        groupId,
+        taskResource,
+        ingestionSchema.getDataSchema(),
+        ingestionSchema.getTuningConfig(),
+        context
+    );
+
+    Preconditions.checkArgument(
+        ingestionSchema.getTuningConfig().getPartitionsSpec() instanceof HashedPartitionsSpec,
+        "%s partitionsSpec required",
+        HashedPartitionsSpec.NAME
+    );
+
+    this.numAttempts = numAttempts;
+    this.ingestionSchema = ingestionSchema;
+    this.supervisorTaskId = supervisorTaskId;
+    this.jsonMapper = jsonMapper;
+  }
+
+  @JsonProperty
+  private int getNumAttempts()
+  {
+    return numAttempts;
+  }
+
+  @JsonProperty("spec")
+  private ParallelIndexIngestionSpec getIngestionSchema()
+  {
+    return ingestionSchema;
+  }
+
+  @JsonProperty
+  private String getSupervisorTaskId()
+  {
+    return supervisorTaskId;
+  }
+
+  @Override
+  public String getType()
+  {
+    return TYPE;
+  }
+
+  @Override
+  public boolean isReady(TaskActionClient taskActionClient) throws Exception
+  {
+    return tryTimeChunkLock(
+        taskActionClient,
+        getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals()
+    );
+  }
+
+  @Override
+  public TaskStatus runTask(TaskToolbox toolbox) throws Exception
+  {
+    DataSchema dataSchema = ingestionSchema.getDataSchema();
+    GranularitySpec granularitySpec = dataSchema.getGranularitySpec();
+    ParallelIndexTuningConfig tuningConfig = ingestionSchema.getTuningConfig();
+
+    HashedPartitionsSpec partitionsSpec = (HashedPartitionsSpec) tuningConfig.getPartitionsSpec();
+    Preconditions.checkNotNull(partitionsSpec, "partitionsSpec required in tuningConfig");
+
+    List<String> partitionDimensions = partitionsSpec.getPartitionDimensions();
+    if (partitionDimensions == null) {
+      partitionDimensions = HashBasedNumberedShardSpec.DEFAULT_PARTITION_DIMENSIONS;
+    }
+
+    InputSource inputSource = ingestionSchema.getIOConfig().getNonNullInputSource(
+        ingestionSchema.getDataSchema().getParser()
+    );
+    InputFormat inputFormat = inputSource.needsFormat()
+                              ? ParallelIndexSupervisorTask.getInputFormat(ingestionSchema)
+                              : null;
+    final RowIngestionMeters buildSegmentsMeters = toolbox.getRowIngestionMetersFactory().createRowIngestionMeters();
+    final ParseExceptionHandler parseExceptionHandler = new ParseExceptionHandler(
+        buildSegmentsMeters,
+        tuningConfig.isLogParseExceptions(),
+        tuningConfig.getMaxParseExceptions(),
+        tuningConfig.getMaxSavedParseExceptions()
+    );
+
+    try (
+        final CloseableIterator<InputRow> inputRowIterator = AbstractBatchIndexTask.inputSourceReader(
+            toolbox.getIndexingTmpDir(),
+            dataSchema,
+            inputSource,
+            inputFormat,
+            AbstractBatchIndexTask.defaultRowFilter(granularitySpec),
+            buildSegmentsMeters,
+            parseExceptionHandler
+        );
+        HandlingInputRowIterator iterator =
+            new DefaultIndexTaskInputRowIteratorBuilder()
+                .delegate(inputRowIterator)
+                .granularitySpec(granularitySpec)
+                .build()
+    ) {
+      Map<Interval, byte[]> cardinalities = determineCardinalities(
+          iterator,
+          granularitySpec,
+          partitionDimensions
+      );
+
+      sendReport(
+          toolbox,
+          new DimensionCardinalityReport(getId(), cardinalities)
+      );
+    }
+
+    return TaskStatus.success(getId());
+  }
+
+  private Map<Interval, byte[]> determineCardinalities(
+      HandlingInputRowIterator inputRowIterator,
+      GranularitySpec granularitySpec,
+      List<String> partitionDimensions
+  )
+  {
+    Map<Interval, HyperLogLogCollector> intervalToCardinalities = new HashMap<>();
+    while (inputRowIterator.hasNext()) {
+      InputRow inputRow = inputRowIterator.next();
+      if (inputRow == null) {
+        continue;
+      }
+
+      DateTime timestamp = inputRow.getTimestamp();
+
+      //noinspection OptionalGetWithoutIsPresent (InputRowIterator returns rows with present intervals)
+      Interval interval = granularitySpec.bucketInterval(timestamp).get();
+
+      LOG.info("TS: " + timestamp + " INTV: " + interval + " GSC: " + granularitySpec.getClass());

Review comment:
       I'll remove this, it was a debugging-only message that I don't think should be retained




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] ccaominh commented on a change in pull request #10419: Automatically determine numShards for parallel ingestion hash partitioning

Posted by GitBox <gi...@apache.org>.
ccaominh commented on a change in pull request #10419:
URL: https://github.com/apache/druid/pull/10419#discussion_r493077626



##########
File path: core/src/main/java/org/apache/druid/timeline/partition/HashBasedNumberedShardSpec.java
##########
@@ -160,7 +160,7 @@ public static int hash(ObjectMapper jsonMapper, List<String> partitionDimensions
   }
 
   @VisibleForTesting
-  static List<Object> getGroupKey(final List<String> partitionDimensions, final long timestamp, final InputRow inputRow)
+  public static List<Object> getGroupKey(final List<String> partitionDimensions, final long timestamp, final InputRow inputRow)

Review comment:
       Does it make sense to keep the `@VisibleForTesting` since this is now public (so that it can be used in `PartialDimensionCardinalityTask`)?

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
##########
@@ -499,17 +518,62 @@ private TaskStatus runMultiPhaseParallel(TaskToolbox toolbox) throws Exception
 
   private TaskStatus runHashPartitionMultiPhaseParallel(TaskToolbox toolbox) throws Exception
   {
+    TaskState state;
+
+    if (!(ingestionSchema.getTuningConfig().getPartitionsSpec() instanceof HashedPartitionsSpec)) {
+      // only range and hash partitioning is supported for multiphase parallel ingestion, see runMultiPhaseParallel()
+      throw new ISE(
+          "forceGuaranteedRollup is set but partitionsSpec [%s] is not a ranged or hash partition spec.",

Review comment:
       Using "single_dim" instead of "ranged" in the error message maps better to the naming in the docs

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import org.apache.druid.data.input.HandlingInputRowIterator;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputSource;
+import org.apache.druid.hll.HyperLogLogCollector;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
+import org.apache.druid.indexing.common.task.ClientBasedTaskInfoProvider;
+import org.apache.druid.indexing.common.task.IndexTask;
+import org.apache.druid.indexing.common.task.TaskResource;
+import org.apache.druid.indexing.common.task.batch.parallel.iterator.DefaultIndexTaskInputRowIteratorBuilder;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.granularity.GranularitySpec;
+import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class PartialDimensionCardinalityTask extends PerfectRollupWorkerTask
+{
+  public static final String TYPE = "partial_dimension_cardinality";
+  private static final Logger LOG = new Logger(PartialDimensionCardinalityTask.class);
+
+  private final int numAttempts;
+  private final ParallelIndexIngestionSpec ingestionSchema;
+  private final String supervisorTaskId;
+
+  private final ObjectMapper jsonMapper;
+
+  @JsonCreator
+  PartialDimensionCardinalityTask(
+      // id shouldn't be null except when this task is created by ParallelIndexSupervisorTask
+      @JsonProperty("id") @Nullable String id,
+      @JsonProperty("groupId") final String groupId,
+      @JsonProperty("resource") final TaskResource taskResource,
+      @JsonProperty("supervisorTaskId") final String supervisorTaskId,
+      @JsonProperty("numAttempts") final int numAttempts, // zero-based counting
+      @JsonProperty("spec") final ParallelIndexIngestionSpec ingestionSchema,
+      @JsonProperty("context") final Map<String, Object> context,
+      @JacksonInject ObjectMapper jsonMapper
+  )
+  {
+    super(
+        getOrMakeId(id, TYPE, ingestionSchema.getDataSchema().getDataSource()),
+        groupId,
+        taskResource,
+        ingestionSchema.getDataSchema(),
+        ingestionSchema.getTuningConfig(),
+        context
+    );
+
+    Preconditions.checkArgument(
+        ingestionSchema.getTuningConfig().getPartitionsSpec() instanceof HashedPartitionsSpec,
+        "%s partitionsSpec required",
+        HashedPartitionsSpec.NAME
+    );
+
+    this.numAttempts = numAttempts;
+    this.ingestionSchema = ingestionSchema;
+    this.supervisorTaskId = supervisorTaskId;
+    this.jsonMapper = jsonMapper;
+  }
+
+  @JsonProperty
+  private int getNumAttempts()
+  {
+    return numAttempts;
+  }
+
+  @JsonProperty("spec")
+  private ParallelIndexIngestionSpec getIngestionSchema()
+  {
+    return ingestionSchema;
+  }
+
+  @JsonProperty
+  private String getSupervisorTaskId()
+  {
+    return supervisorTaskId;
+  }
+
+  @Override
+  public String getType()
+  {
+    return TYPE;
+  }
+
+  @Override
+  public boolean isReady(TaskActionClient taskActionClient) throws Exception
+  {
+    return tryTimeChunkLock(
+        taskActionClient,
+        getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals()
+    );
+  }
+
+  @Override
+  public TaskStatus runTask(TaskToolbox toolbox) throws Exception
+  {
+    DataSchema dataSchema = ingestionSchema.getDataSchema();
+    GranularitySpec granularitySpec = dataSchema.getGranularitySpec();
+    ParallelIndexTuningConfig tuningConfig = ingestionSchema.getTuningConfig();
+
+    HashedPartitionsSpec partitionsSpec = (HashedPartitionsSpec) tuningConfig.getPartitionsSpec();
+    Preconditions.checkNotNull(partitionsSpec, "partitionsSpec required in tuningConfig");
+
+    List<String> partitionDimensions = partitionsSpec.getPartitionDimensions();
+    if (partitionDimensions == null) {
+      partitionDimensions = HashBasedNumberedShardSpec.DEFAULT_PARTITION_DIMENSIONS;
+    }
+
+    InputSource inputSource = ingestionSchema.getIOConfig().getNonNullInputSource(
+        ingestionSchema.getDataSchema().getParser()
+    );
+    InputFormat inputFormat = inputSource.needsFormat()
+                              ? ParallelIndexSupervisorTask.getInputFormat(ingestionSchema)
+                              : null;
+    final RowIngestionMeters buildSegmentsMeters = toolbox.getRowIngestionMetersFactory().createRowIngestionMeters();
+    final ParseExceptionHandler parseExceptionHandler = new ParseExceptionHandler(
+        buildSegmentsMeters,
+        tuningConfig.isLogParseExceptions(),
+        tuningConfig.getMaxParseExceptions(),
+        tuningConfig.getMaxSavedParseExceptions()
+    );
+
+    try (
+        final CloseableIterator<InputRow> inputRowIterator = AbstractBatchIndexTask.inputSourceReader(
+            toolbox.getIndexingTmpDir(),
+            dataSchema,
+            inputSource,
+            inputFormat,
+            AbstractBatchIndexTask.defaultRowFilter(granularitySpec),
+            buildSegmentsMeters,
+            parseExceptionHandler
+        );
+        HandlingInputRowIterator iterator =
+            new DefaultIndexTaskInputRowIteratorBuilder()
+                .delegate(inputRowIterator)
+                .granularitySpec(granularitySpec)
+                .build()
+    ) {
+      Map<Interval, byte[]> cardinalities = determineCardinalities(
+          iterator,
+          granularitySpec,
+          partitionDimensions
+      );
+
+      sendReport(
+          toolbox,
+          new DimensionCardinalityReport(getId(), cardinalities)
+      );
+    }
+
+    return TaskStatus.success(getId());
+  }
+
+  private Map<Interval, byte[]> determineCardinalities(
+      HandlingInputRowIterator inputRowIterator,
+      GranularitySpec granularitySpec,
+      List<String> partitionDimensions
+  )
+  {
+    Map<Interval, HyperLogLogCollector> intervalToCardinalities = new HashMap<>();
+    while (inputRowIterator.hasNext()) {
+      InputRow inputRow = inputRowIterator.next();
+      if (inputRow == null) {
+        continue;
+      }
+
+      DateTime timestamp = inputRow.getTimestamp();
+
+      //noinspection OptionalGetWithoutIsPresent (InputRowIterator returns rows with present intervals)
+      Interval interval = granularitySpec.bucketInterval(timestamp).get();
+
+      LOG.info("TS: " + timestamp + " INTV: " + interval + " GSC: " + granularitySpec.getClass());

Review comment:
       What do you think about changing the logging level to reduce the logging amount since this is printed for each row?

##########
File path: indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTaskTest.java
##########
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import org.apache.druid.client.indexing.NoopIndexingServiceClient;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.InputSource;
+import org.apache.druid.data.input.impl.InlineInputSource;
+import org.apache.druid.hll.HyperLogLogCollector;
+import org.apache.druid.indexer.TaskState;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
+import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
+import org.apache.druid.indexer.partitions.PartitionsSpec;
+import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
+import org.apache.druid.indexing.common.TaskInfoProvider;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory;
+import org.apache.druid.indexing.common.task.IndexTaskClientFactory;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
+import org.apache.druid.testing.junit.LoggerCaptureRule;
+import org.apache.logging.log4j.core.LogEvent;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.hamcrest.Matchers;
+import org.joda.time.Duration;
+import org.joda.time.Interval;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+@RunWith(Enclosed.class)
+public class PartialDimensionCardinalityTaskTest

Review comment:
       I like the extensive set of tests!

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import org.apache.druid.data.input.HandlingInputRowIterator;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputSource;
+import org.apache.druid.hll.HyperLogLogCollector;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
+import org.apache.druid.indexing.common.task.ClientBasedTaskInfoProvider;
+import org.apache.druid.indexing.common.task.IndexTask;
+import org.apache.druid.indexing.common.task.TaskResource;
+import org.apache.druid.indexing.common.task.batch.parallel.iterator.DefaultIndexTaskInputRowIteratorBuilder;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.granularity.GranularitySpec;
+import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class PartialDimensionCardinalityTask extends PerfectRollupWorkerTask
+{
+  public static final String TYPE = "partial_dimension_cardinality";
+  private static final Logger LOG = new Logger(PartialDimensionCardinalityTask.class);
+
+  private final int numAttempts;
+  private final ParallelIndexIngestionSpec ingestionSchema;
+  private final String supervisorTaskId;
+
+  private final ObjectMapper jsonMapper;
+
+  @JsonCreator
+  PartialDimensionCardinalityTask(
+      // id shouldn't be null except when this task is created by ParallelIndexSupervisorTask
+      @JsonProperty("id") @Nullable String id,
+      @JsonProperty("groupId") final String groupId,
+      @JsonProperty("resource") final TaskResource taskResource,
+      @JsonProperty("supervisorTaskId") final String supervisorTaskId,
+      @JsonProperty("numAttempts") final int numAttempts, // zero-based counting
+      @JsonProperty("spec") final ParallelIndexIngestionSpec ingestionSchema,
+      @JsonProperty("context") final Map<String, Object> context,
+      @JacksonInject ObjectMapper jsonMapper
+  )
+  {
+    super(
+        getOrMakeId(id, TYPE, ingestionSchema.getDataSchema().getDataSource()),
+        groupId,
+        taskResource,
+        ingestionSchema.getDataSchema(),
+        ingestionSchema.getTuningConfig(),
+        context
+    );
+
+    Preconditions.checkArgument(
+        ingestionSchema.getTuningConfig().getPartitionsSpec() instanceof HashedPartitionsSpec,
+        "%s partitionsSpec required",
+        HashedPartitionsSpec.NAME
+    );
+
+    this.numAttempts = numAttempts;
+    this.ingestionSchema = ingestionSchema;
+    this.supervisorTaskId = supervisorTaskId;
+    this.jsonMapper = jsonMapper;
+  }
+
+  @JsonProperty
+  private int getNumAttempts()
+  {
+    return numAttempts;
+  }
+
+  @JsonProperty("spec")
+  private ParallelIndexIngestionSpec getIngestionSchema()
+  {
+    return ingestionSchema;
+  }
+
+  @JsonProperty
+  private String getSupervisorTaskId()
+  {
+    return supervisorTaskId;
+  }
+
+  @Override
+  public String getType()
+  {
+    return TYPE;
+  }
+
+  @Override
+  public boolean isReady(TaskActionClient taskActionClient) throws Exception
+  {
+    return tryTimeChunkLock(
+        taskActionClient,
+        getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals()
+    );
+  }
+
+  @Override
+  public TaskStatus runTask(TaskToolbox toolbox) throws Exception
+  {
+    DataSchema dataSchema = ingestionSchema.getDataSchema();
+    GranularitySpec granularitySpec = dataSchema.getGranularitySpec();
+    ParallelIndexTuningConfig tuningConfig = ingestionSchema.getTuningConfig();
+
+    HashedPartitionsSpec partitionsSpec = (HashedPartitionsSpec) tuningConfig.getPartitionsSpec();
+    Preconditions.checkNotNull(partitionsSpec, "partitionsSpec required in tuningConfig");
+
+    List<String> partitionDimensions = partitionsSpec.getPartitionDimensions();
+    if (partitionDimensions == null) {
+      partitionDimensions = HashBasedNumberedShardSpec.DEFAULT_PARTITION_DIMENSIONS;
+    }
+
+    InputSource inputSource = ingestionSchema.getIOConfig().getNonNullInputSource(
+        ingestionSchema.getDataSchema().getParser()
+    );
+    InputFormat inputFormat = inputSource.needsFormat()
+                              ? ParallelIndexSupervisorTask.getInputFormat(ingestionSchema)
+                              : null;
+    final RowIngestionMeters buildSegmentsMeters = toolbox.getRowIngestionMetersFactory().createRowIngestionMeters();
+    final ParseExceptionHandler parseExceptionHandler = new ParseExceptionHandler(
+        buildSegmentsMeters,
+        tuningConfig.isLogParseExceptions(),
+        tuningConfig.getMaxParseExceptions(),
+        tuningConfig.getMaxSavedParseExceptions()
+    );
+
+    try (
+        final CloseableIterator<InputRow> inputRowIterator = AbstractBatchIndexTask.inputSourceReader(
+            toolbox.getIndexingTmpDir(),
+            dataSchema,
+            inputSource,
+            inputFormat,
+            AbstractBatchIndexTask.defaultRowFilter(granularitySpec),
+            buildSegmentsMeters,
+            parseExceptionHandler
+        );
+        HandlingInputRowIterator iterator =
+            new DefaultIndexTaskInputRowIteratorBuilder()
+                .delegate(inputRowIterator)
+                .granularitySpec(granularitySpec)
+                .build()
+    ) {
+      Map<Interval, byte[]> cardinalities = determineCardinalities(
+          iterator,
+          granularitySpec,
+          partitionDimensions
+      );
+
+      sendReport(
+          toolbox,
+          new DimensionCardinalityReport(getId(), cardinalities)
+      );
+    }
+
+    return TaskStatus.success(getId());
+  }
+
+  private Map<Interval, byte[]> determineCardinalities(
+      HandlingInputRowIterator inputRowIterator,
+      GranularitySpec granularitySpec,
+      List<String> partitionDimensions
+  )
+  {
+    Map<Interval, HyperLogLogCollector> intervalToCardinalities = new HashMap<>();
+    while (inputRowIterator.hasNext()) {
+      InputRow inputRow = inputRowIterator.next();
+      if (inputRow == null) {
+        continue;
+      }
+
+      DateTime timestamp = inputRow.getTimestamp();
+
+      //noinspection OptionalGetWithoutIsPresent (InputRowIterator returns rows with present intervals)
+      Interval interval = granularitySpec.bucketInterval(timestamp).get();
+
+      LOG.info("TS: " + timestamp + " INTV: " + interval + " GSC: " + granularitySpec.getClass());
+
+      HyperLogLogCollector hllCollector = intervalToCardinalities.computeIfAbsent(

Review comment:
       What do you think about using `HllSketch` instead of `HyperLogLogCollector` since `HllSketch` provides much more accurate estimates if the cardinality does not exceed the sketch's `k` value: http://datasketches.apache.org/docs/HLL/HllSketchVsDruidHyperLogLogCollector.html. Using `HllSketch` also will be more accurate and faster when the partial HLLs are merged.
   
   Using `HllSketch` would mean that the implementation for parallel ingestion is different from [the one for sequential ingestion](https://github.com/apache/druid/blob/7cc0a7be68d37d07b18a7f3e75989534f2ec626d/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java#L754) though.

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import org.apache.druid.data.input.HandlingInputRowIterator;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputSource;
+import org.apache.druid.hll.HyperLogLogCollector;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
+import org.apache.druid.indexing.common.task.ClientBasedTaskInfoProvider;
+import org.apache.druid.indexing.common.task.IndexTask;
+import org.apache.druid.indexing.common.task.TaskResource;
+import org.apache.druid.indexing.common.task.batch.parallel.iterator.DefaultIndexTaskInputRowIteratorBuilder;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.granularity.GranularitySpec;
+import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class PartialDimensionCardinalityTask extends PerfectRollupWorkerTask
+{
+  public static final String TYPE = "partial_dimension_cardinality";
+  private static final Logger LOG = new Logger(PartialDimensionCardinalityTask.class);
+
+  private final int numAttempts;
+  private final ParallelIndexIngestionSpec ingestionSchema;
+  private final String supervisorTaskId;
+
+  private final ObjectMapper jsonMapper;
+
+  @JsonCreator
+  PartialDimensionCardinalityTask(
+      // id shouldn't be null except when this task is created by ParallelIndexSupervisorTask
+      @JsonProperty("id") @Nullable String id,
+      @JsonProperty("groupId") final String groupId,
+      @JsonProperty("resource") final TaskResource taskResource,
+      @JsonProperty("supervisorTaskId") final String supervisorTaskId,
+      @JsonProperty("numAttempts") final int numAttempts, // zero-based counting
+      @JsonProperty("spec") final ParallelIndexIngestionSpec ingestionSchema,
+      @JsonProperty("context") final Map<String, Object> context,
+      @JacksonInject ObjectMapper jsonMapper
+  )
+  {
+    super(
+        getOrMakeId(id, TYPE, ingestionSchema.getDataSchema().getDataSource()),
+        groupId,
+        taskResource,
+        ingestionSchema.getDataSchema(),
+        ingestionSchema.getTuningConfig(),
+        context
+    );
+
+    Preconditions.checkArgument(
+        ingestionSchema.getTuningConfig().getPartitionsSpec() instanceof HashedPartitionsSpec,
+        "%s partitionsSpec required",
+        HashedPartitionsSpec.NAME
+    );
+
+    this.numAttempts = numAttempts;
+    this.ingestionSchema = ingestionSchema;
+    this.supervisorTaskId = supervisorTaskId;
+    this.jsonMapper = jsonMapper;
+  }
+
+  @JsonProperty
+  private int getNumAttempts()
+  {
+    return numAttempts;
+  }
+
+  @JsonProperty("spec")
+  private ParallelIndexIngestionSpec getIngestionSchema()
+  {
+    return ingestionSchema;
+  }
+
+  @JsonProperty
+  private String getSupervisorTaskId()
+  {
+    return supervisorTaskId;
+  }
+
+  @Override
+  public String getType()
+  {
+    return TYPE;
+  }
+
+  @Override
+  public boolean isReady(TaskActionClient taskActionClient) throws Exception
+  {
+    return tryTimeChunkLock(
+        taskActionClient,
+        getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals()
+    );
+  }

Review comment:
       I believe the logic in `PartialHashSegmentGenerateTask.isReady()` will need to be adjusted; otherwise if `PartialDimensionCardinalityTask` runs and grabs the locks here it'll get stuck. For example, `PartialRangeSegmentGenerateTask.isReady()` does not grab the locks since they're acquired earlier by `PartialDimensionDistributionTask`.
   
   I think a good place to add a test would be in `HashPartitionMultiPhaseParallelIndexingTest`. Currently, its test cases all specify a value of 2 for `numShards`, so we can add cases with `null`.

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
##########
@@ -499,17 +518,62 @@ private TaskStatus runMultiPhaseParallel(TaskToolbox toolbox) throws Exception
 
   private TaskStatus runHashPartitionMultiPhaseParallel(TaskToolbox toolbox) throws Exception
   {
+    TaskState state;
+
+    if (!(ingestionSchema.getTuningConfig().getPartitionsSpec() instanceof HashedPartitionsSpec)) {
+      // only range and hash partitioning is supported for multiphase parallel ingestion, see runMultiPhaseParallel()
+      throw new ISE(
+          "forceGuaranteedRollup is set but partitionsSpec [%s] is not a ranged or hash partition spec.",
+          ingestionSchema.getTuningConfig().getPartitionsSpec()
+      );
+    }
+
+    final Integer numShardsOverride;
+    HashedPartitionsSpec partitionsSpec = (HashedPartitionsSpec) ingestionSchema.getTuningConfig().getPartitionsSpec();
+    if (ingestionSchema.getTuningConfig().isForceGuaranteedRollup() && partitionsSpec.getNumShards() == null) {

Review comment:
       Checking `isForceGuaranteedRollup()` is probably redundant here

##########
File path: core/src/main/java/org/apache/druid/indexer/partitions/HashedPartitionsSpec.java
##########
@@ -160,7 +160,7 @@ public Integer getNumShards()
   @Override
   public String getForceGuaranteedRollupIncompatiblityReason()
   {
-    return getNumShards() == null ? NUM_SHARDS + " must be specified" : FORCE_GUARANTEED_ROLLUP_COMPATIBLE;
+    return FORCE_GUARANTEED_ROLLUP_COMPATIBLE;

Review comment:
       https://druid.apache.org/docs/latest/ingestion/native-batch.html#hash-based-partitioning needs to be updated to say that `numShards` is no longer required




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] ccaominh commented on a change in pull request #10419: Automatically determine numShards for parallel ingestion hash partitioning

Posted by GitBox <gi...@apache.org>.
ccaominh commented on a change in pull request #10419:
URL: https://github.com/apache/druid/pull/10419#discussion_r493949005



##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import org.apache.druid.data.input.HandlingInputRowIterator;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputSource;
+import org.apache.druid.hll.HyperLogLogCollector;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
+import org.apache.druid.indexing.common.task.ClientBasedTaskInfoProvider;
+import org.apache.druid.indexing.common.task.IndexTask;
+import org.apache.druid.indexing.common.task.TaskResource;
+import org.apache.druid.indexing.common.task.batch.parallel.iterator.DefaultIndexTaskInputRowIteratorBuilder;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.granularity.GranularitySpec;
+import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class PartialDimensionCardinalityTask extends PerfectRollupWorkerTask
+{
+  public static final String TYPE = "partial_dimension_cardinality";
+  private static final Logger LOG = new Logger(PartialDimensionCardinalityTask.class);
+
+  private final int numAttempts;
+  private final ParallelIndexIngestionSpec ingestionSchema;
+  private final String supervisorTaskId;
+
+  private final ObjectMapper jsonMapper;
+
+  @JsonCreator
+  PartialDimensionCardinalityTask(
+      // id shouldn't be null except when this task is created by ParallelIndexSupervisorTask
+      @JsonProperty("id") @Nullable String id,
+      @JsonProperty("groupId") final String groupId,
+      @JsonProperty("resource") final TaskResource taskResource,
+      @JsonProperty("supervisorTaskId") final String supervisorTaskId,
+      @JsonProperty("numAttempts") final int numAttempts, // zero-based counting
+      @JsonProperty("spec") final ParallelIndexIngestionSpec ingestionSchema,
+      @JsonProperty("context") final Map<String, Object> context,
+      @JacksonInject ObjectMapper jsonMapper
+  )
+  {
+    super(
+        getOrMakeId(id, TYPE, ingestionSchema.getDataSchema().getDataSource()),
+        groupId,
+        taskResource,
+        ingestionSchema.getDataSchema(),
+        ingestionSchema.getTuningConfig(),
+        context
+    );
+
+    Preconditions.checkArgument(
+        ingestionSchema.getTuningConfig().getPartitionsSpec() instanceof HashedPartitionsSpec,
+        "%s partitionsSpec required",
+        HashedPartitionsSpec.NAME
+    );
+
+    this.numAttempts = numAttempts;
+    this.ingestionSchema = ingestionSchema;
+    this.supervisorTaskId = supervisorTaskId;
+    this.jsonMapper = jsonMapper;
+  }
+
+  @JsonProperty
+  private int getNumAttempts()
+  {
+    return numAttempts;
+  }
+
+  @JsonProperty("spec")
+  private ParallelIndexIngestionSpec getIngestionSchema()
+  {
+    return ingestionSchema;
+  }
+
+  @JsonProperty
+  private String getSupervisorTaskId()
+  {
+    return supervisorTaskId;
+  }
+
+  @Override
+  public String getType()
+  {
+    return TYPE;
+  }
+
+  @Override
+  public boolean isReady(TaskActionClient taskActionClient) throws Exception
+  {
+    return tryTimeChunkLock(
+        taskActionClient,
+        getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals()
+    );
+  }

Review comment:
       I wasn't able to reproduce the self deadlock with `RangePartitionMultiPhaseParallelIndexingTest` or `ITPerfectRollupParallelIndexTest` in master, so either I misremembered stuff or the issue has been fixed in the meantime.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #10419: Automatically determine numShards for parallel ingestion hash partitioning

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #10419:
URL: https://github.com/apache/druid/pull/10419#discussion_r493940517



##########
File path: docs/ingestion/native-batch.md
##########
@@ -294,11 +294,17 @@ How the worker task creates segments is:
 |property|description|default|required?|
 |--------|-----------|-------|---------|
 |type|This should always be `hashed`|none|yes|
-|numShards|Directly specify the number of shards to create. If this is specified and `intervals` is specified in the `granularitySpec`, the index task can skip the determine intervals/partitions pass through the data.|null|yes|
+|numShards|Directly specify the number of shards to create. If this is specified and `intervals` is specified in the `granularitySpec`, the index task can skip the determine intervals/partitions pass through the data.|null|no|

Review comment:
       Oh, I should have been more specific. I meant, the newly supported parameter, `maxRowsPerSegment` (or `targetRowsPerSegment` which I suggested below). Neither of them is not described here since we didn't support it before. Maybe https://github.com/apache/druid/blob/master/docs/ingestion/hadoop.md#hash-based-partitioning helps.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on pull request #10419: Automatically determine numShards for parallel ingestion hash partitioning

Posted by GitBox <gi...@apache.org>.
jihoonson commented on pull request #10419:
URL: https://github.com/apache/druid/pull/10419#issuecomment-698508270


   One thing to note: after this PR, the parallel task can compute the number of partitions automatically, but the same number will be applied to all intervals. I think it will be better to compute and apply different numbers of partitions per interval to handle potential data skew between intervals which is what the simple task (`IndexTask`) does. But I'm OK with doing this improvement as a follow-up.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] ccaominh commented on a change in pull request #10419: Automatically determine numShards for parallel ingestion hash partitioning

Posted by GitBox <gi...@apache.org>.
ccaominh commented on a change in pull request #10419:
URL: https://github.com/apache/druid/pull/10419#discussion_r493077626



##########
File path: core/src/main/java/org/apache/druid/timeline/partition/HashBasedNumberedShardSpec.java
##########
@@ -160,7 +160,7 @@ public static int hash(ObjectMapper jsonMapper, List<String> partitionDimensions
   }
 
   @VisibleForTesting
-  static List<Object> getGroupKey(final List<String> partitionDimensions, final long timestamp, final InputRow inputRow)
+  public static List<Object> getGroupKey(final List<String> partitionDimensions, final long timestamp, final InputRow inputRow)

Review comment:
       Does it make sense to keep the `@VisibleForTesting` since this is now public (so that it can be used in `PartialDimensionCardinalityTask`)?

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
##########
@@ -499,17 +518,62 @@ private TaskStatus runMultiPhaseParallel(TaskToolbox toolbox) throws Exception
 
   private TaskStatus runHashPartitionMultiPhaseParallel(TaskToolbox toolbox) throws Exception
   {
+    TaskState state;
+
+    if (!(ingestionSchema.getTuningConfig().getPartitionsSpec() instanceof HashedPartitionsSpec)) {
+      // only range and hash partitioning is supported for multiphase parallel ingestion, see runMultiPhaseParallel()
+      throw new ISE(
+          "forceGuaranteedRollup is set but partitionsSpec [%s] is not a ranged or hash partition spec.",

Review comment:
       Using "single_dim" instead of "ranged" in the error message maps better to the naming in the docs

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import org.apache.druid.data.input.HandlingInputRowIterator;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputSource;
+import org.apache.druid.hll.HyperLogLogCollector;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
+import org.apache.druid.indexing.common.task.ClientBasedTaskInfoProvider;
+import org.apache.druid.indexing.common.task.IndexTask;
+import org.apache.druid.indexing.common.task.TaskResource;
+import org.apache.druid.indexing.common.task.batch.parallel.iterator.DefaultIndexTaskInputRowIteratorBuilder;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.granularity.GranularitySpec;
+import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class PartialDimensionCardinalityTask extends PerfectRollupWorkerTask
+{
+  public static final String TYPE = "partial_dimension_cardinality";
+  private static final Logger LOG = new Logger(PartialDimensionCardinalityTask.class);
+
+  private final int numAttempts;
+  private final ParallelIndexIngestionSpec ingestionSchema;
+  private final String supervisorTaskId;
+
+  private final ObjectMapper jsonMapper;
+
+  @JsonCreator
+  PartialDimensionCardinalityTask(
+      // id shouldn't be null except when this task is created by ParallelIndexSupervisorTask
+      @JsonProperty("id") @Nullable String id,
+      @JsonProperty("groupId") final String groupId,
+      @JsonProperty("resource") final TaskResource taskResource,
+      @JsonProperty("supervisorTaskId") final String supervisorTaskId,
+      @JsonProperty("numAttempts") final int numAttempts, // zero-based counting
+      @JsonProperty("spec") final ParallelIndexIngestionSpec ingestionSchema,
+      @JsonProperty("context") final Map<String, Object> context,
+      @JacksonInject ObjectMapper jsonMapper
+  )
+  {
+    super(
+        getOrMakeId(id, TYPE, ingestionSchema.getDataSchema().getDataSource()),
+        groupId,
+        taskResource,
+        ingestionSchema.getDataSchema(),
+        ingestionSchema.getTuningConfig(),
+        context
+    );
+
+    Preconditions.checkArgument(
+        ingestionSchema.getTuningConfig().getPartitionsSpec() instanceof HashedPartitionsSpec,
+        "%s partitionsSpec required",
+        HashedPartitionsSpec.NAME
+    );
+
+    this.numAttempts = numAttempts;
+    this.ingestionSchema = ingestionSchema;
+    this.supervisorTaskId = supervisorTaskId;
+    this.jsonMapper = jsonMapper;
+  }
+
+  @JsonProperty
+  private int getNumAttempts()
+  {
+    return numAttempts;
+  }
+
+  @JsonProperty("spec")
+  private ParallelIndexIngestionSpec getIngestionSchema()
+  {
+    return ingestionSchema;
+  }
+
+  @JsonProperty
+  private String getSupervisorTaskId()
+  {
+    return supervisorTaskId;
+  }
+
+  @Override
+  public String getType()
+  {
+    return TYPE;
+  }
+
+  @Override
+  public boolean isReady(TaskActionClient taskActionClient) throws Exception
+  {
+    return tryTimeChunkLock(
+        taskActionClient,
+        getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals()
+    );
+  }
+
+  @Override
+  public TaskStatus runTask(TaskToolbox toolbox) throws Exception
+  {
+    DataSchema dataSchema = ingestionSchema.getDataSchema();
+    GranularitySpec granularitySpec = dataSchema.getGranularitySpec();
+    ParallelIndexTuningConfig tuningConfig = ingestionSchema.getTuningConfig();
+
+    HashedPartitionsSpec partitionsSpec = (HashedPartitionsSpec) tuningConfig.getPartitionsSpec();
+    Preconditions.checkNotNull(partitionsSpec, "partitionsSpec required in tuningConfig");
+
+    List<String> partitionDimensions = partitionsSpec.getPartitionDimensions();
+    if (partitionDimensions == null) {
+      partitionDimensions = HashBasedNumberedShardSpec.DEFAULT_PARTITION_DIMENSIONS;
+    }
+
+    InputSource inputSource = ingestionSchema.getIOConfig().getNonNullInputSource(
+        ingestionSchema.getDataSchema().getParser()
+    );
+    InputFormat inputFormat = inputSource.needsFormat()
+                              ? ParallelIndexSupervisorTask.getInputFormat(ingestionSchema)
+                              : null;
+    final RowIngestionMeters buildSegmentsMeters = toolbox.getRowIngestionMetersFactory().createRowIngestionMeters();
+    final ParseExceptionHandler parseExceptionHandler = new ParseExceptionHandler(
+        buildSegmentsMeters,
+        tuningConfig.isLogParseExceptions(),
+        tuningConfig.getMaxParseExceptions(),
+        tuningConfig.getMaxSavedParseExceptions()
+    );
+
+    try (
+        final CloseableIterator<InputRow> inputRowIterator = AbstractBatchIndexTask.inputSourceReader(
+            toolbox.getIndexingTmpDir(),
+            dataSchema,
+            inputSource,
+            inputFormat,
+            AbstractBatchIndexTask.defaultRowFilter(granularitySpec),
+            buildSegmentsMeters,
+            parseExceptionHandler
+        );
+        HandlingInputRowIterator iterator =
+            new DefaultIndexTaskInputRowIteratorBuilder()
+                .delegate(inputRowIterator)
+                .granularitySpec(granularitySpec)
+                .build()
+    ) {
+      Map<Interval, byte[]> cardinalities = determineCardinalities(
+          iterator,
+          granularitySpec,
+          partitionDimensions
+      );
+
+      sendReport(
+          toolbox,
+          new DimensionCardinalityReport(getId(), cardinalities)
+      );
+    }
+
+    return TaskStatus.success(getId());
+  }
+
+  private Map<Interval, byte[]> determineCardinalities(
+      HandlingInputRowIterator inputRowIterator,
+      GranularitySpec granularitySpec,
+      List<String> partitionDimensions
+  )
+  {
+    Map<Interval, HyperLogLogCollector> intervalToCardinalities = new HashMap<>();
+    while (inputRowIterator.hasNext()) {
+      InputRow inputRow = inputRowIterator.next();
+      if (inputRow == null) {
+        continue;
+      }
+
+      DateTime timestamp = inputRow.getTimestamp();
+
+      //noinspection OptionalGetWithoutIsPresent (InputRowIterator returns rows with present intervals)
+      Interval interval = granularitySpec.bucketInterval(timestamp).get();
+
+      LOG.info("TS: " + timestamp + " INTV: " + interval + " GSC: " + granularitySpec.getClass());

Review comment:
       What do you think about changing the logging level to reduce the logging amount since this is printed for each row?

##########
File path: indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTaskTest.java
##########
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import org.apache.druid.client.indexing.NoopIndexingServiceClient;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.InputSource;
+import org.apache.druid.data.input.impl.InlineInputSource;
+import org.apache.druid.hll.HyperLogLogCollector;
+import org.apache.druid.indexer.TaskState;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
+import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
+import org.apache.druid.indexer.partitions.PartitionsSpec;
+import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
+import org.apache.druid.indexing.common.TaskInfoProvider;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory;
+import org.apache.druid.indexing.common.task.IndexTaskClientFactory;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
+import org.apache.druid.testing.junit.LoggerCaptureRule;
+import org.apache.logging.log4j.core.LogEvent;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.hamcrest.Matchers;
+import org.joda.time.Duration;
+import org.joda.time.Interval;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+@RunWith(Enclosed.class)
+public class PartialDimensionCardinalityTaskTest

Review comment:
       I like the extensive set of tests!

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import org.apache.druid.data.input.HandlingInputRowIterator;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputSource;
+import org.apache.druid.hll.HyperLogLogCollector;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
+import org.apache.druid.indexing.common.task.ClientBasedTaskInfoProvider;
+import org.apache.druid.indexing.common.task.IndexTask;
+import org.apache.druid.indexing.common.task.TaskResource;
+import org.apache.druid.indexing.common.task.batch.parallel.iterator.DefaultIndexTaskInputRowIteratorBuilder;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.granularity.GranularitySpec;
+import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class PartialDimensionCardinalityTask extends PerfectRollupWorkerTask
+{
+  public static final String TYPE = "partial_dimension_cardinality";
+  private static final Logger LOG = new Logger(PartialDimensionCardinalityTask.class);
+
+  private final int numAttempts;
+  private final ParallelIndexIngestionSpec ingestionSchema;
+  private final String supervisorTaskId;
+
+  private final ObjectMapper jsonMapper;
+
+  @JsonCreator
+  PartialDimensionCardinalityTask(
+      // id shouldn't be null except when this task is created by ParallelIndexSupervisorTask
+      @JsonProperty("id") @Nullable String id,
+      @JsonProperty("groupId") final String groupId,
+      @JsonProperty("resource") final TaskResource taskResource,
+      @JsonProperty("supervisorTaskId") final String supervisorTaskId,
+      @JsonProperty("numAttempts") final int numAttempts, // zero-based counting
+      @JsonProperty("spec") final ParallelIndexIngestionSpec ingestionSchema,
+      @JsonProperty("context") final Map<String, Object> context,
+      @JacksonInject ObjectMapper jsonMapper
+  )
+  {
+    super(
+        getOrMakeId(id, TYPE, ingestionSchema.getDataSchema().getDataSource()),
+        groupId,
+        taskResource,
+        ingestionSchema.getDataSchema(),
+        ingestionSchema.getTuningConfig(),
+        context
+    );
+
+    Preconditions.checkArgument(
+        ingestionSchema.getTuningConfig().getPartitionsSpec() instanceof HashedPartitionsSpec,
+        "%s partitionsSpec required",
+        HashedPartitionsSpec.NAME
+    );
+
+    this.numAttempts = numAttempts;
+    this.ingestionSchema = ingestionSchema;
+    this.supervisorTaskId = supervisorTaskId;
+    this.jsonMapper = jsonMapper;
+  }
+
+  @JsonProperty
+  private int getNumAttempts()
+  {
+    return numAttempts;
+  }
+
+  @JsonProperty("spec")
+  private ParallelIndexIngestionSpec getIngestionSchema()
+  {
+    return ingestionSchema;
+  }
+
+  @JsonProperty
+  private String getSupervisorTaskId()
+  {
+    return supervisorTaskId;
+  }
+
+  @Override
+  public String getType()
+  {
+    return TYPE;
+  }
+
+  @Override
+  public boolean isReady(TaskActionClient taskActionClient) throws Exception
+  {
+    return tryTimeChunkLock(
+        taskActionClient,
+        getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals()
+    );
+  }
+
+  @Override
+  public TaskStatus runTask(TaskToolbox toolbox) throws Exception
+  {
+    DataSchema dataSchema = ingestionSchema.getDataSchema();
+    GranularitySpec granularitySpec = dataSchema.getGranularitySpec();
+    ParallelIndexTuningConfig tuningConfig = ingestionSchema.getTuningConfig();
+
+    HashedPartitionsSpec partitionsSpec = (HashedPartitionsSpec) tuningConfig.getPartitionsSpec();
+    Preconditions.checkNotNull(partitionsSpec, "partitionsSpec required in tuningConfig");
+
+    List<String> partitionDimensions = partitionsSpec.getPartitionDimensions();
+    if (partitionDimensions == null) {
+      partitionDimensions = HashBasedNumberedShardSpec.DEFAULT_PARTITION_DIMENSIONS;
+    }
+
+    InputSource inputSource = ingestionSchema.getIOConfig().getNonNullInputSource(
+        ingestionSchema.getDataSchema().getParser()
+    );
+    InputFormat inputFormat = inputSource.needsFormat()
+                              ? ParallelIndexSupervisorTask.getInputFormat(ingestionSchema)
+                              : null;
+    final RowIngestionMeters buildSegmentsMeters = toolbox.getRowIngestionMetersFactory().createRowIngestionMeters();
+    final ParseExceptionHandler parseExceptionHandler = new ParseExceptionHandler(
+        buildSegmentsMeters,
+        tuningConfig.isLogParseExceptions(),
+        tuningConfig.getMaxParseExceptions(),
+        tuningConfig.getMaxSavedParseExceptions()
+    );
+
+    try (
+        final CloseableIterator<InputRow> inputRowIterator = AbstractBatchIndexTask.inputSourceReader(
+            toolbox.getIndexingTmpDir(),
+            dataSchema,
+            inputSource,
+            inputFormat,
+            AbstractBatchIndexTask.defaultRowFilter(granularitySpec),
+            buildSegmentsMeters,
+            parseExceptionHandler
+        );
+        HandlingInputRowIterator iterator =
+            new DefaultIndexTaskInputRowIteratorBuilder()
+                .delegate(inputRowIterator)
+                .granularitySpec(granularitySpec)
+                .build()
+    ) {
+      Map<Interval, byte[]> cardinalities = determineCardinalities(
+          iterator,
+          granularitySpec,
+          partitionDimensions
+      );
+
+      sendReport(
+          toolbox,
+          new DimensionCardinalityReport(getId(), cardinalities)
+      );
+    }
+
+    return TaskStatus.success(getId());
+  }
+
+  private Map<Interval, byte[]> determineCardinalities(
+      HandlingInputRowIterator inputRowIterator,
+      GranularitySpec granularitySpec,
+      List<String> partitionDimensions
+  )
+  {
+    Map<Interval, HyperLogLogCollector> intervalToCardinalities = new HashMap<>();
+    while (inputRowIterator.hasNext()) {
+      InputRow inputRow = inputRowIterator.next();
+      if (inputRow == null) {
+        continue;
+      }
+
+      DateTime timestamp = inputRow.getTimestamp();
+
+      //noinspection OptionalGetWithoutIsPresent (InputRowIterator returns rows with present intervals)
+      Interval interval = granularitySpec.bucketInterval(timestamp).get();
+
+      LOG.info("TS: " + timestamp + " INTV: " + interval + " GSC: " + granularitySpec.getClass());
+
+      HyperLogLogCollector hllCollector = intervalToCardinalities.computeIfAbsent(

Review comment:
       What do you think about using `HllSketch` instead of `HyperLogLogCollector` since `HllSketch` provides much more accurate estimates if the cardinality does not exceed the sketch's `k` value: http://datasketches.apache.org/docs/HLL/HllSketchVsDruidHyperLogLogCollector.html. Using `HllSketch` also will be more accurate and faster when the partial HLLs are merged.
   
   Using `HllSketch` would mean that the implementation for parallel ingestion is different from [the one for sequential ingestion](https://github.com/apache/druid/blob/7cc0a7be68d37d07b18a7f3e75989534f2ec626d/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java#L754) though.

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import org.apache.druid.data.input.HandlingInputRowIterator;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputSource;
+import org.apache.druid.hll.HyperLogLogCollector;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
+import org.apache.druid.indexing.common.task.ClientBasedTaskInfoProvider;
+import org.apache.druid.indexing.common.task.IndexTask;
+import org.apache.druid.indexing.common.task.TaskResource;
+import org.apache.druid.indexing.common.task.batch.parallel.iterator.DefaultIndexTaskInputRowIteratorBuilder;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.granularity.GranularitySpec;
+import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class PartialDimensionCardinalityTask extends PerfectRollupWorkerTask
+{
+  public static final String TYPE = "partial_dimension_cardinality";
+  private static final Logger LOG = new Logger(PartialDimensionCardinalityTask.class);
+
+  private final int numAttempts;
+  private final ParallelIndexIngestionSpec ingestionSchema;
+  private final String supervisorTaskId;
+
+  private final ObjectMapper jsonMapper;
+
+  @JsonCreator
+  PartialDimensionCardinalityTask(
+      // id shouldn't be null except when this task is created by ParallelIndexSupervisorTask
+      @JsonProperty("id") @Nullable String id,
+      @JsonProperty("groupId") final String groupId,
+      @JsonProperty("resource") final TaskResource taskResource,
+      @JsonProperty("supervisorTaskId") final String supervisorTaskId,
+      @JsonProperty("numAttempts") final int numAttempts, // zero-based counting
+      @JsonProperty("spec") final ParallelIndexIngestionSpec ingestionSchema,
+      @JsonProperty("context") final Map<String, Object> context,
+      @JacksonInject ObjectMapper jsonMapper
+  )
+  {
+    super(
+        getOrMakeId(id, TYPE, ingestionSchema.getDataSchema().getDataSource()),
+        groupId,
+        taskResource,
+        ingestionSchema.getDataSchema(),
+        ingestionSchema.getTuningConfig(),
+        context
+    );
+
+    Preconditions.checkArgument(
+        ingestionSchema.getTuningConfig().getPartitionsSpec() instanceof HashedPartitionsSpec,
+        "%s partitionsSpec required",
+        HashedPartitionsSpec.NAME
+    );
+
+    this.numAttempts = numAttempts;
+    this.ingestionSchema = ingestionSchema;
+    this.supervisorTaskId = supervisorTaskId;
+    this.jsonMapper = jsonMapper;
+  }
+
+  @JsonProperty
+  private int getNumAttempts()
+  {
+    return numAttempts;
+  }
+
+  @JsonProperty("spec")
+  private ParallelIndexIngestionSpec getIngestionSchema()
+  {
+    return ingestionSchema;
+  }
+
+  @JsonProperty
+  private String getSupervisorTaskId()
+  {
+    return supervisorTaskId;
+  }
+
+  @Override
+  public String getType()
+  {
+    return TYPE;
+  }
+
+  @Override
+  public boolean isReady(TaskActionClient taskActionClient) throws Exception
+  {
+    return tryTimeChunkLock(
+        taskActionClient,
+        getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals()
+    );
+  }

Review comment:
       I believe the logic in `PartialHashSegmentGenerateTask.isReady()` will need to be adjusted; otherwise if `PartialDimensionCardinalityTask` runs and grabs the locks here it'll get stuck. For example, `PartialRangeSegmentGenerateTask.isReady()` does not grab the locks since they're acquired earlier by `PartialDimensionDistributionTask`.
   
   I think a good place to add a test would be in `HashPartitionMultiPhaseParallelIndexingTest`. Currently, its test cases all specify a value of 2 for `numShards`, so we can add cases with `null`.

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
##########
@@ -499,17 +518,62 @@ private TaskStatus runMultiPhaseParallel(TaskToolbox toolbox) throws Exception
 
   private TaskStatus runHashPartitionMultiPhaseParallel(TaskToolbox toolbox) throws Exception
   {
+    TaskState state;
+
+    if (!(ingestionSchema.getTuningConfig().getPartitionsSpec() instanceof HashedPartitionsSpec)) {
+      // only range and hash partitioning is supported for multiphase parallel ingestion, see runMultiPhaseParallel()
+      throw new ISE(
+          "forceGuaranteedRollup is set but partitionsSpec [%s] is not a ranged or hash partition spec.",
+          ingestionSchema.getTuningConfig().getPartitionsSpec()
+      );
+    }
+
+    final Integer numShardsOverride;
+    HashedPartitionsSpec partitionsSpec = (HashedPartitionsSpec) ingestionSchema.getTuningConfig().getPartitionsSpec();
+    if (ingestionSchema.getTuningConfig().isForceGuaranteedRollup() && partitionsSpec.getNumShards() == null) {

Review comment:
       Checking `isForceGuaranteedRollup()` is probably redundant here

##########
File path: core/src/main/java/org/apache/druid/indexer/partitions/HashedPartitionsSpec.java
##########
@@ -160,7 +160,7 @@ public Integer getNumShards()
   @Override
   public String getForceGuaranteedRollupIncompatiblityReason()
   {
-    return getNumShards() == null ? NUM_SHARDS + " must be specified" : FORCE_GUARANTEED_ROLLUP_COMPATIBLE;
+    return FORCE_GUARANTEED_ROLLUP_COMPATIBLE;

Review comment:
       https://druid.apache.org/docs/latest/ingestion/native-batch.html#hash-based-partitioning needs to be updated to say that `numShards` is no longer required

##########
File path: core/src/main/java/org/apache/druid/indexer/partitions/HashedPartitionsSpec.java
##########
@@ -160,7 +160,7 @@ public Integer getNumShards()
   @Override
   public String getForceGuaranteedRollupIncompatiblityReason()
   {
-    return getNumShards() == null ? NUM_SHARDS + " must be specified" : FORCE_GUARANTEED_ROLLUP_COMPATIBLE;
+    return FORCE_GUARANTEED_ROLLUP_COMPATIBLE;

Review comment:
       https://github.com/apache/druid/blob/master/docs/ingestion/native-batch.md#hash-based-partitioning needs to be updated to say that `numShards` is no longer required and also to mention the new `partial dimension cardinality` task.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jon-wei commented on a change in pull request #10419: Automatically determine numShards for parallel ingestion hash partitioning

Posted by GitBox <gi...@apache.org>.
jon-wei commented on a change in pull request #10419:
URL: https://github.com/apache/druid/pull/10419#discussion_r493978044



##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import org.apache.datasketches.hll.HllSketch;
+import org.apache.druid.data.input.HandlingInputRowIterator;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputSource;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
+import org.apache.druid.indexing.common.task.ClientBasedTaskInfoProvider;
+import org.apache.druid.indexing.common.task.IndexTask;
+import org.apache.druid.indexing.common.task.TaskResource;
+import org.apache.druid.indexing.common.task.batch.parallel.iterator.DefaultIndexTaskInputRowIteratorBuilder;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.granularity.GranularitySpec;
+import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class PartialDimensionCardinalityTask extends PerfectRollupWorkerTask
+{
+  public static final String TYPE = "partial_dimension_cardinality";
+  private static final Logger LOG = new Logger(PartialDimensionCardinalityTask.class);
+
+  private final int numAttempts;
+  private final ParallelIndexIngestionSpec ingestionSchema;
+  private final String supervisorTaskId;
+
+  private final ObjectMapper jsonMapper;
+
+  @JsonCreator
+  PartialDimensionCardinalityTask(
+      // id shouldn't be null except when this task is created by ParallelIndexSupervisorTask
+      @JsonProperty("id") @Nullable String id,
+      @JsonProperty("groupId") final String groupId,
+      @JsonProperty("resource") final TaskResource taskResource,
+      @JsonProperty("supervisorTaskId") final String supervisorTaskId,
+      @JsonProperty("numAttempts") final int numAttempts, // zero-based counting
+      @JsonProperty("spec") final ParallelIndexIngestionSpec ingestionSchema,
+      @JsonProperty("context") final Map<String, Object> context,
+      @JacksonInject ObjectMapper jsonMapper
+  )
+  {
+    super(
+        getOrMakeId(id, TYPE, ingestionSchema.getDataSchema().getDataSource()),
+        groupId,
+        taskResource,
+        ingestionSchema.getDataSchema(),
+        ingestionSchema.getTuningConfig(),
+        context
+    );
+
+    Preconditions.checkArgument(
+        ingestionSchema.getTuningConfig().getPartitionsSpec() instanceof HashedPartitionsSpec,
+        "%s partitionsSpec required",
+        HashedPartitionsSpec.NAME
+    );
+
+    this.numAttempts = numAttempts;
+    this.ingestionSchema = ingestionSchema;
+    this.supervisorTaskId = supervisorTaskId;
+    this.jsonMapper = jsonMapper;
+  }
+
+  @JsonProperty
+  private int getNumAttempts()
+  {
+    return numAttempts;
+  }
+
+  @JsonProperty("spec")
+  private ParallelIndexIngestionSpec getIngestionSchema()
+  {
+    return ingestionSchema;
+  }
+
+  @JsonProperty
+  private String getSupervisorTaskId()
+  {
+    return supervisorTaskId;
+  }
+
+  @Override
+  public String getType()
+  {
+    return TYPE;
+  }
+
+  @Override
+  public boolean isReady(TaskActionClient taskActionClient) throws Exception
+  {
+    return tryTimeChunkLock(
+        taskActionClient,
+        getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals()
+    );
+  }
+
+  @Override
+  public TaskStatus runTask(TaskToolbox toolbox) throws Exception
+  {
+    DataSchema dataSchema = ingestionSchema.getDataSchema();
+    GranularitySpec granularitySpec = dataSchema.getGranularitySpec();
+    ParallelIndexTuningConfig tuningConfig = ingestionSchema.getTuningConfig();
+
+    HashedPartitionsSpec partitionsSpec = (HashedPartitionsSpec) tuningConfig.getPartitionsSpec();
+    Preconditions.checkNotNull(partitionsSpec, "partitionsSpec required in tuningConfig");
+
+    List<String> partitionDimensions = partitionsSpec.getPartitionDimensions();
+    if (partitionDimensions == null) {
+      partitionDimensions = HashBasedNumberedShardSpec.DEFAULT_PARTITION_DIMENSIONS;
+    }
+
+    InputSource inputSource = ingestionSchema.getIOConfig().getNonNullInputSource(
+        ingestionSchema.getDataSchema().getParser()
+    );
+    InputFormat inputFormat = inputSource.needsFormat()
+                              ? ParallelIndexSupervisorTask.getInputFormat(ingestionSchema)
+                              : null;
+    final RowIngestionMeters buildSegmentsMeters = toolbox.getRowIngestionMetersFactory().createRowIngestionMeters();
+    final ParseExceptionHandler parseExceptionHandler = new ParseExceptionHandler(
+        buildSegmentsMeters,
+        tuningConfig.isLogParseExceptions(),
+        tuningConfig.getMaxParseExceptions(),
+        tuningConfig.getMaxSavedParseExceptions()
+    );
+
+    try (
+        final CloseableIterator<InputRow> inputRowIterator = AbstractBatchIndexTask.inputSourceReader(
+            toolbox.getIndexingTmpDir(),
+            dataSchema,
+            inputSource,
+            inputFormat,
+            AbstractBatchIndexTask.defaultRowFilter(granularitySpec),
+            buildSegmentsMeters,
+            parseExceptionHandler
+        );
+        HandlingInputRowIterator iterator =
+            new DefaultIndexTaskInputRowIteratorBuilder()
+                .delegate(inputRowIterator)
+                .granularitySpec(granularitySpec)
+                .build()
+    ) {
+      Map<Interval, byte[]> cardinalities = determineCardinalities(
+          iterator,
+          granularitySpec,
+          partitionDimensions
+      );
+
+      sendReport(
+          toolbox,
+          new DimensionCardinalityReport(getId(), cardinalities)
+      );
+    }
+
+    return TaskStatus.success(getId());
+  }
+
+  private Map<Interval, byte[]> determineCardinalities(
+      HandlingInputRowIterator inputRowIterator,
+      GranularitySpec granularitySpec,
+      List<String> partitionDimensions
+  )
+  {
+    Map<Interval, HllSketch> intervalToCardinalities = new HashMap<>();
+    while (inputRowIterator.hasNext()) {
+      InputRow inputRow = inputRowIterator.next();
+      if (inputRow == null) {

Review comment:
       Removed the null check




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jon-wei commented on a change in pull request #10419: Automatically determine numShards for parallel ingestion hash partitioning

Posted by GitBox <gi...@apache.org>.
jon-wei commented on a change in pull request #10419:
URL: https://github.com/apache/druid/pull/10419#discussion_r493977947



##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
##########
@@ -582,6 +652,50 @@ private TaskStatus runRangePartitionMultiPhaseParallel(TaskToolbox toolbox) thro
     return TaskStatus.fromCode(getId(), mergeState);
   }
 
+  @VisibleForTesting
+  public static int determineNumShardsFromCardinalityReport(
+      Collection<DimensionCardinalityReport> reports,
+      int maxRowsPerSegment
+  )
+  {
+    // aggregate all the sub-reports
+    Map<Interval, Union> finalCollectors = new HashMap<>();
+    reports.forEach(report -> {
+      Map<Interval, byte[]> intervalToCardinality = report.getIntervalToCardinalities();
+      for (Map.Entry<Interval, byte[]> entry : intervalToCardinality.entrySet()) {
+        Union union = finalCollectors.computeIfAbsent(
+            entry.getKey(),
+            (key) -> {
+              return new Union(DimensionCardinalityReport.HLL_SKETCH_LOG_K);
+            }
+        );
+        HllSketch entryHll = HllSketch.wrap(Memory.wrap(entry.getValue()));
+        union.update(entryHll);
+      }
+    });
+
+    // determine the highest cardinality in any interval
+    long maxCardinality = Long.MIN_VALUE;
+    for (Union union : finalCollectors.values()) {
+      maxCardinality = Math.max(maxCardinality, (long) union.getEstimate());
+    }
+
+    LOG.info("Estimated max cardinality: " + maxCardinality);
+
+    // determine numShards based on maxRowsPerSegment and the highest per-interval cardinality
+    long numShards = maxCardinality / maxRowsPerSegment;
+    if (maxCardinality % maxRowsPerSegment != 0) {
+      // if there's a remainder add 1 so we stay under maxRowsPerSegment
+      numShards += 1;
+    }
+    try {
+      return Math.toIntExact(numShards);
+    }
+    catch (ArithmeticException ae) {
+      return Integer.MAX_VALUE;

Review comment:
       I changed this to throw an exception now




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jon-wei commented on a change in pull request #10419: Automatically determine numShards for parallel ingestion hash partitioning

Posted by GitBox <gi...@apache.org>.
jon-wei commented on a change in pull request #10419:
URL: https://github.com/apache/druid/pull/10419#discussion_r493978244



##########
File path: docs/ingestion/native-batch.md
##########
@@ -294,11 +294,17 @@ How the worker task creates segments is:
 |property|description|default|required?|
 |--------|-----------|-------|---------|
 |type|This should always be `hashed`|none|yes|
-|numShards|Directly specify the number of shards to create. If this is specified and `intervals` is specified in the `granularitySpec`, the index task can skip the determine intervals/partitions pass through the data.|null|yes|
+|numShards|Directly specify the number of shards to create. If this is specified and `intervals` is specified in the `granularitySpec`, the index task can skip the determine intervals/partitions pass through the data.|null|no|
 |partitionDimensions|The dimensions to partition on. Leave blank to select all dimensions.|null|no|
 
 The Parallel task with hash-based partitioning is similar to [MapReduce](https://en.wikipedia.org/wiki/MapReduce).
-The task runs in 2 phases, i.e., `partial segment generation` and `partial segment merge`.
+The task runs in up to 3 phases: `partial_dimension_cardinality`, `partial segment generation` and `partial segment merge`.

Review comment:
       Removed the underscores here




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jon-wei commented on a change in pull request #10419: Automatically determine numShards for parallel ingestion hash partitioning

Posted by GitBox <gi...@apache.org>.
jon-wei commented on a change in pull request #10419:
URL: https://github.com/apache/druid/pull/10419#discussion_r493118246



##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import org.apache.druid.data.input.HandlingInputRowIterator;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputSource;
+import org.apache.druid.hll.HyperLogLogCollector;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
+import org.apache.druid.indexing.common.task.ClientBasedTaskInfoProvider;
+import org.apache.druid.indexing.common.task.IndexTask;
+import org.apache.druid.indexing.common.task.TaskResource;
+import org.apache.druid.indexing.common.task.batch.parallel.iterator.DefaultIndexTaskInputRowIteratorBuilder;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.granularity.GranularitySpec;
+import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class PartialDimensionCardinalityTask extends PerfectRollupWorkerTask
+{
+  public static final String TYPE = "partial_dimension_cardinality";
+  private static final Logger LOG = new Logger(PartialDimensionCardinalityTask.class);
+
+  private final int numAttempts;
+  private final ParallelIndexIngestionSpec ingestionSchema;
+  private final String supervisorTaskId;
+
+  private final ObjectMapper jsonMapper;
+
+  @JsonCreator
+  PartialDimensionCardinalityTask(
+      // id shouldn't be null except when this task is created by ParallelIndexSupervisorTask
+      @JsonProperty("id") @Nullable String id,
+      @JsonProperty("groupId") final String groupId,
+      @JsonProperty("resource") final TaskResource taskResource,
+      @JsonProperty("supervisorTaskId") final String supervisorTaskId,
+      @JsonProperty("numAttempts") final int numAttempts, // zero-based counting
+      @JsonProperty("spec") final ParallelIndexIngestionSpec ingestionSchema,
+      @JsonProperty("context") final Map<String, Object> context,
+      @JacksonInject ObjectMapper jsonMapper
+  )
+  {
+    super(
+        getOrMakeId(id, TYPE, ingestionSchema.getDataSchema().getDataSource()),
+        groupId,
+        taskResource,
+        ingestionSchema.getDataSchema(),
+        ingestionSchema.getTuningConfig(),
+        context
+    );
+
+    Preconditions.checkArgument(
+        ingestionSchema.getTuningConfig().getPartitionsSpec() instanceof HashedPartitionsSpec,
+        "%s partitionsSpec required",
+        HashedPartitionsSpec.NAME
+    );
+
+    this.numAttempts = numAttempts;
+    this.ingestionSchema = ingestionSchema;
+    this.supervisorTaskId = supervisorTaskId;
+    this.jsonMapper = jsonMapper;
+  }
+
+  @JsonProperty
+  private int getNumAttempts()
+  {
+    return numAttempts;
+  }
+
+  @JsonProperty("spec")
+  private ParallelIndexIngestionSpec getIngestionSchema()
+  {
+    return ingestionSchema;
+  }
+
+  @JsonProperty
+  private String getSupervisorTaskId()
+  {
+    return supervisorTaskId;
+  }
+
+  @Override
+  public String getType()
+  {
+    return TYPE;
+  }
+
+  @Override
+  public boolean isReady(TaskActionClient taskActionClient) throws Exception
+  {
+    return tryTimeChunkLock(
+        taskActionClient,
+        getIngestionSchema().getDataSchema().getGranularitySpec().inputIntervals()
+    );
+  }

Review comment:
       Thanks, will look into this




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org