You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2022/09/12 03:34:03 UTC

[GitHub] [druid] cryptoe commented on a diff in pull request #12998: Update ClusterByStatisticsCollectorImpl to use bytes instead of keys

cryptoe commented on code in PR #12998:
URL: https://github.com/apache/druid/pull/12998#discussion_r967948716


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/QuantilesSketchKeyCollectorSnapshot.java:
##########
@@ -20,26 +20,35 @@
 package org.apache.druid.msq.statistics;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonValue;
+import com.fasterxml.jackson.annotation.JsonProperty;
 
 import java.util.Objects;
 
 public class QuantilesSketchKeyCollectorSnapshot implements KeyCollectorSnapshot
 {
   private final String encodedSketch;
 
+  private final double averageKeyLength;
+
   @JsonCreator
-  public QuantilesSketchKeyCollectorSnapshot(String encodedSketch)
+  public QuantilesSketchKeyCollectorSnapshot(@JsonProperty("encodedSketch") String encodedSketch, @JsonProperty("averageKeyLength") double averageKeyLength)

Review Comment:
   Some SERDE tests would be helpful here. 



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/QuantilesSketchKeyCollector.java:
##########
@@ -42,23 +42,29 @@ public class QuantilesSketchKeyCollector implements KeyCollector<QuantilesSketch
 {
   private final Comparator<RowKey> comparator;
   private ItemsSketch<RowKey> sketch;
+  private double averageKeyLength;
 
   QuantilesSketchKeyCollector(
       final Comparator<RowKey> comparator,
-      @Nullable final ItemsSketch<RowKey> sketch
+      @Nullable final ItemsSketch<RowKey> sketch,
+      double averageKeyLength
   )
   {
     this.comparator = comparator;
     this.sketch = sketch;
+    this.averageKeyLength = averageKeyLength;
   }
 
   @Override
   public void add(RowKey key, long weight)
   {
+    double total = averageKeyLength * sketch.getN();
+    total += key.array().length * weight;
     for (int i = 0; i < weight; i++) {
       // Add the same key multiple times to make it "heavier".
       sketch.update(key);

Review Comment:
   Can you please mention why we are not updating the total for each sketch update. 
   
   



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/DistinctKeyCollector.java:
##########
@@ -120,14 +121,16 @@ public void add(RowKey key, long weight)
       if (isNewMin && !retainedKeys.isEmpty() && !isKeySelected(retainedKeys.firstKey())) {
         // Old min should be kicked out.
         totalWeightUnadjusted -= retainedKeys.removeLong(retainedKeys.firstKey());
+        retainedBytes -= retainedKeys.firstKey().array().length;

Review Comment:
   Should we add a method call keySize in rowKey class. Its very hard for the user to make out from here that the array() is actually a byte array.



##########
extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/statistics/DelegateOrMinKeyCollectorTest.java:
##########
@@ -78,12 +85,13 @@ public void testAdd()
             QuantilesSketchKeyCollectorFactory.create(clusterBy)
         ).newKeyCollector();
 
-    collector.add(createKey(1L), 1);
+    RowKey key = createKey(1L);
+    collector.add(key, 1);
 
     Assert.assertTrue(collector.getDelegate().isPresent());

Review Comment:
   Let's add a test case for the weighted average addAll?



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/QuantilesSketchKeyCollector.java:
##########
@@ -42,23 +42,29 @@ public class QuantilesSketchKeyCollector implements KeyCollector<QuantilesSketch
 {
   private final Comparator<RowKey> comparator;
   private ItemsSketch<RowKey> sketch;
+  private double averageKeyLength;
 
   QuantilesSketchKeyCollector(
       final Comparator<RowKey> comparator,
-      @Nullable final ItemsSketch<RowKey> sketch
+      @Nullable final ItemsSketch<RowKey> sketch,
+      double averageKeyLength
   )
   {
     this.comparator = comparator;
     this.sketch = sketch;
+    this.averageKeyLength = averageKeyLength;
   }
 
   @Override
   public void add(RowKey key, long weight)
   {
+    double total = averageKeyLength * sketch.getN();

Review Comment:
   Lets add a class level javadoc for the merge and the addition logic of the averageKeylength 



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/ClusterByStatisticsCollectorImpl.java:
##########
@@ -365,75 +363,75 @@ private BucketHolder getOrCreateBucketHolder(final RowKey bucketKey)
   }
 
   /**
-   * Reduce the number of retained keys by about half, if possible. May reduce by less than that, or keep the
+   * Reduce the number of retained bytes by about half, if possible. May reduce by less than that, or keep the
    * number the same, if downsampling is not possible. (For example: downsampling is not possible if all buckets
    * have been downsampled all the way to one key each.)
    */
   private void downSample()
   {
-    int newTotalRetainedKeys = totalRetainedKeys;
-    final int targetTotalRetainedKeys = totalRetainedKeys / 2;
+    double newTotalRetainedBytes = totalRetainedBytes;
+    final double targetTotalRetainedBytes = totalRetainedBytes / 2;
 
     final List<BucketHolder> sortedHolders = new ArrayList<>(buckets.size());
 
     // Only consider holders with more than one retained key. Holders with a single retained key cannot be downsampled.
     for (final BucketHolder holder : buckets.values()) {
-      if (holder.retainedKeys > 1) {
+      if (holder.keyCollector.estimatedRetainedKeys() > 1) {
         sortedHolders.add(holder);
       }
     }
 
     // Downsample least-dense buckets first. (They're less likely to need high resolution.)
     sortedHolders.sort(
         Comparator.comparing((BucketHolder holder) ->
-                                 (double) holder.keyCollector.estimatedTotalWeight() / holder.retainedKeys)
+                                 (double) holder.keyCollector.estimatedTotalWeight() / holder.retainedBytes)
     );
 
     int i = 0;
-    while (i < sortedHolders.size() && newTotalRetainedKeys > targetTotalRetainedKeys) {
+    while (i < sortedHolders.size() && newTotalRetainedBytes > targetTotalRetainedBytes) {
       final BucketHolder bucketHolder = sortedHolders.get(i);
 
       // Ignore false return, because we wrap all collectors in DelegateOrMinKeyCollector and can be assured that
       // it will downsample all the way to one if needed. Can't do better than that.
       bucketHolder.keyCollector.downSample();
-      newTotalRetainedKeys += bucketHolder.updateRetainedKeys();
+      newTotalRetainedBytes += bucketHolder.updateRetainedBytes();
 
-      if (i == sortedHolders.size() - 1 || sortedHolders.get(i + 1).retainedKeys > bucketHolder.retainedKeys) {
+      if (i == sortedHolders.size() - 1 || sortedHolders.get(i + 1).retainedBytes > bucketHolder.retainedBytes) {
         i++;
       }
     }
 
-    totalRetainedKeys = newTotalRetainedKeys;
+    totalRetainedBytes = newTotalRetainedBytes;
   }
 
   private void assertRetainedKeyCountsAreTrackedCorrectly()
   {
     // Check cached value of retainedKeys in each holder.
     assert buckets.values()
                   .stream()
-                  .allMatch(holder -> holder.retainedKeys == holder.keyCollector.estimatedRetainedKeys());
+                  .allMatch(holder -> holder.retainedBytes == holder.keyCollector.estimatedRetainedBytes());
 
-    // Check cached value of totalRetainedKeys.
-    assert totalRetainedKeys ==
-           buckets.values().stream().mapToInt(holder -> holder.keyCollector.estimatedRetainedKeys()).sum();
+    // Check cached value of totalRetainedBytes.
+    assert totalRetainedBytes ==
+           buckets.values().stream().mapToDouble(holder -> holder.keyCollector.estimatedRetainedBytes()).sum();
   }
 
   private static class BucketHolder
   {
     private final KeyCollector<?> keyCollector;
-    private int retainedKeys;
+    private double retainedBytes;
 
     public BucketHolder(final KeyCollector<?> keyCollector)
     {
       this.keyCollector = keyCollector;
-      this.retainedKeys = keyCollector.estimatedRetainedKeys();
+      this.retainedBytes = keyCollector.estimatedRetainedBytes();
     }
 
-    public int updateRetainedKeys()
+    public double updateRetainedBytes()
     {
-      final int newRetainedKeys = keyCollector.estimatedRetainedKeys();
-      final int difference = newRetainedKeys - retainedKeys;
-      retainedKeys = newRetainedKeys;
+      final double newRetainedKeys = keyCollector.estimatedRetainedBytes();

Review Comment:
   nit: rename variables here 



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/ClusterByStatisticsCollectorImpl.java:
##########
@@ -365,75 +363,75 @@ private BucketHolder getOrCreateBucketHolder(final RowKey bucketKey)
   }
 
   /**
-   * Reduce the number of retained keys by about half, if possible. May reduce by less than that, or keep the
+   * Reduce the number of retained bytes by about half, if possible. May reduce by less than that, or keep the
    * number the same, if downsampling is not possible. (For example: downsampling is not possible if all buckets
    * have been downsampled all the way to one key each.)
    */
   private void downSample()
   {
-    int newTotalRetainedKeys = totalRetainedKeys;
-    final int targetTotalRetainedKeys = totalRetainedKeys / 2;
+    double newTotalRetainedBytes = totalRetainedBytes;
+    final double targetTotalRetainedBytes = totalRetainedBytes / 2;
 
     final List<BucketHolder> sortedHolders = new ArrayList<>(buckets.size());
 
     // Only consider holders with more than one retained key. Holders with a single retained key cannot be downsampled.
     for (final BucketHolder holder : buckets.values()) {
-      if (holder.retainedKeys > 1) {
+      if (holder.keyCollector.estimatedRetainedKeys() > 1) {
         sortedHolders.add(holder);
       }
     }
 
     // Downsample least-dense buckets first. (They're less likely to need high resolution.)
     sortedHolders.sort(
         Comparator.comparing((BucketHolder holder) ->
-                                 (double) holder.keyCollector.estimatedTotalWeight() / holder.retainedKeys)
+                                 (double) holder.keyCollector.estimatedTotalWeight() / holder.retainedBytes)

Review Comment:
   Should we use retainedKeys or retained bytes I am not sure. 
   
   Consider the scenario where we have 100 total keys. Each key size is lets say 5 bytes. 
   So if we are using a quantile sketch, we would have 100/5*100  ~.20 as the sort weight
   
   Now let's say we use 10 keys with 4 bytes each. 
   using quantile sketch, we would have 10/40 ~ .25 so it would downsample the first case. 
   
   Do we want that?
   
   I think we might have to do some calculations before we decide on this. 
   
    
   



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/QuantilesSketchKeyCollector.java:
##########
@@ -42,23 +42,29 @@ public class QuantilesSketchKeyCollector implements KeyCollector<QuantilesSketch
 {
   private final Comparator<RowKey> comparator;
   private ItemsSketch<RowKey> sketch;
+  private double averageKeyLength;
 
   QuantilesSketchKeyCollector(
       final Comparator<RowKey> comparator,
-      @Nullable final ItemsSketch<RowKey> sketch
+      @Nullable final ItemsSketch<RowKey> sketch,
+      double averageKeyLength
   )
   {
     this.comparator = comparator;
     this.sketch = sketch;
+    this.averageKeyLength = averageKeyLength;
   }
 
   @Override
   public void add(RowKey key, long weight)
   {
+    double total = averageKeyLength * sketch.getN();

Review Comment:
   nit: estimatedTotalSketchSizeInBytes



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/ClusterByStatisticsCollectorImpl.java:
##########
@@ -365,75 +363,75 @@ private BucketHolder getOrCreateBucketHolder(final RowKey bucketKey)
   }
 
   /**
-   * Reduce the number of retained keys by about half, if possible. May reduce by less than that, or keep the
+   * Reduce the number of retained bytes by about half, if possible. May reduce by less than that, or keep the
    * number the same, if downsampling is not possible. (For example: downsampling is not possible if all buckets
    * have been downsampled all the way to one key each.)
    */
   private void downSample()
   {
-    int newTotalRetainedKeys = totalRetainedKeys;
-    final int targetTotalRetainedKeys = totalRetainedKeys / 2;
+    double newTotalRetainedBytes = totalRetainedBytes;
+    final double targetTotalRetainedBytes = totalRetainedBytes / 2;
 
     final List<BucketHolder> sortedHolders = new ArrayList<>(buckets.size());
 
     // Only consider holders with more than one retained key. Holders with a single retained key cannot be downsampled.
     for (final BucketHolder holder : buckets.values()) {
-      if (holder.retainedKeys > 1) {
+      if (holder.keyCollector.estimatedRetainedKeys() > 1) {
         sortedHolders.add(holder);
       }
     }
 
     // Downsample least-dense buckets first. (They're less likely to need high resolution.)
     sortedHolders.sort(
         Comparator.comparing((BucketHolder holder) ->
-                                 (double) holder.keyCollector.estimatedTotalWeight() / holder.retainedKeys)
+                                 (double) holder.keyCollector.estimatedTotalWeight() / holder.retainedBytes)
     );
 
     int i = 0;
-    while (i < sortedHolders.size() && newTotalRetainedKeys > targetTotalRetainedKeys) {
+    while (i < sortedHolders.size() && newTotalRetainedBytes > targetTotalRetainedBytes) {
       final BucketHolder bucketHolder = sortedHolders.get(i);
 
       // Ignore false return, because we wrap all collectors in DelegateOrMinKeyCollector and can be assured that
       // it will downsample all the way to one if needed. Can't do better than that.
       bucketHolder.keyCollector.downSample();
-      newTotalRetainedKeys += bucketHolder.updateRetainedKeys();
+      newTotalRetainedBytes += bucketHolder.updateRetainedBytes();
 
-      if (i == sortedHolders.size() - 1 || sortedHolders.get(i + 1).retainedKeys > bucketHolder.retainedKeys) {
+      if (i == sortedHolders.size() - 1 || sortedHolders.get(i + 1).retainedBytes > bucketHolder.retainedBytes) {
         i++;
       }
     }
 
-    totalRetainedKeys = newTotalRetainedKeys;
+    totalRetainedBytes = newTotalRetainedBytes;
   }
 
   private void assertRetainedKeyCountsAreTrackedCorrectly()

Review Comment:
   nit : Method rename to reflect bytes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org