You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by "kfaraz (via GitHub)" <gi...@apache.org> on 2023/02/01 05:09:07 UTC

[GitHub] [druid] kfaraz commented on a diff in pull request #13303: Hadoop based batch ingestion support range partition

kfaraz commented on code in PR #13303:
URL: https://github.com/apache/druid/pull/13303#discussion_r1092765726


##########
indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJob.java:
##########
@@ -346,10 +361,14 @@ protected void innerMap(
           rollupGranularity.bucketStart(inputRow.getTimestamp()).getMillis(),
           inputRow
       );
-      context.write(
-          new BytesWritable(HadoopDruidIndexerConfig.JSON_MAPPER.writeValueAsBytes(groupKey)),
-          NullWritable.get()
-      );
+
+      final byte[] bytes = HadoopDruidIndexerConfig.JSON_MAPPER.writeValueAsBytes(groupKey);
+      if (sample <= 1 || Math.abs(HASH_FUNCTION.hashBytes(bytes).asInt()) % sample == 0) {

Review Comment:
   Why are we using a hash of the bytes here? Doesn't this mean that we will never emit certain group keys?
   



##########
core/src/main/java/org/apache/druid/indexer/partitions/DimensionRangePartitionsSpec.java:
##########
@@ -144,6 +146,17 @@ public List<String> getPartitionDimensions()
     return partitionDimensions;
   }
 
+  public DimensionRangeShardSpec createShardSpec(

Review Comment:
   It is better to have this logic in `DeterminePartitionsJob` since it is only used there. You can check the number of dimensions and create either a `DimensionRangeShardSpec` or a `SingleDimensionShardSpec`.



##########
indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJob.java:
##########
@@ -346,10 +361,14 @@ protected void innerMap(
           rollupGranularity.bucketStart(inputRow.getTimestamp()).getMillis(),
           inputRow
       );
-      context.write(
-          new BytesWritable(HadoopDruidIndexerConfig.JSON_MAPPER.writeValueAsBytes(groupKey)),
-          NullWritable.get()
-      );
+
+      final byte[] bytes = HadoopDruidIndexerConfig.JSON_MAPPER.writeValueAsBytes(groupKey);
+      if (sample <= 1 || Math.abs(HASH_FUNCTION.hashBytes(bytes).asInt()) % sample == 0) {
+        context.write(

Review Comment:
   Nit: This can be put on a single-line now.



##########
indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJob.java:
##########
@@ -476,22 +502,29 @@ void emitDimValueCounts(
       final byte[] groupKey = buf.array();
 
       // Emit row-counter value.
-      write(context, groupKey, new DimValueCount("", "", 1));
-
-      for (final Map.Entry<String, Iterable<String>> dimAndValues : dims.entrySet()) {
-        final String dim = dimAndValues.getKey();
-
-        if (partitionDimension == null || partitionDimension.equals(dim)) {
-          final Iterable<String> dimValues = dimAndValues.getValue();
-
-          if (Iterables.size(dimValues) == 1) {
-            // Emit this value.
-            write(context, groupKey, new DimValueCount(dim, Iterables.getOnlyElement(dimValues), 1));
+      write(context, groupKey, new DimValueCount(Collections.emptyList(), StringTuple.create(), 1));
+
+      Iterator<List<String>> dimensionGroupIterator = dimensionGroupingSet.iterator();

Review Comment:
   Ah, I see it now. Thanks for the clarification. 
   
   In the case of single_dim too, I was wondering if we even need this check anymore. I don't think we can submit a spec with null partition dimension even for single_dim, atleast for index_parallel. There is a check in `ParallelIndexTuningConfig` which validates that partition dimensions are always specified. 
   
   We should have a similar check in `HadoopTuningConfig` too. Secretly choosing a partition dimension behind the scenes is probably not so great. It's better to fail if no partition dimension has been specified. What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org