You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2021/12/15 15:46:03 UTC

[GitHub] [druid] kfaraz opened a new pull request #12073: [WIP] Update Aggregator and AggregatorFactory interfaces to fix mem estimates

kfaraz opened a new pull request #12073:
URL: https://github.com/apache/druid/pull/12073


   Fixes #12022  
   
   ### Description
   
   The existing implementation in `OnHeapIncrementalIndex` tends to over-estimate memory usage
   thus leading to more persistence cycles than necessary during ingestion. A more accurate estimation
   of mem usage would also free it up for other purposes.
   
   The current estimation essentially involves getting the max row size for aggregator and multiplying it by the number of rows to get the total estimated size. This PR replaces the max multiplication mechanism with getting the actual incremental memory used by an aggregator at each row/invocation of aggregate.
   
   <hr>
   
   ### Changes
   - Add method `AggregatorFactory.factorizeWithSize()` that returns a `SizedAggregator`
     - `SizedAggregator` contains the aggregator instance and a long representing the initial memory in    bytes used by the aggregator.
   - Add method `Aggregator.aggregateWithSize()` which returns a long representing the incremental memory used by the aggregator in that invocation. The default impl of `aggregateWithSize()` calls `aggregate()` and returns 0.
   - Remove the method `DimensionIndexer.estimateEncodedKeyComponentSize()`
   - Update the method `DimensionIndexer.getUnsortedEncodedValueFromSorted()` to return generic class `EncodedDimensionValue<EncodedType>` which contains:
       - `EncodedType value`: e.g. int[] for StringDimIndexer, Long for LongDimIndexer
       - `long incrementalSize`: The delta in size required for the value.
   - Update `OnHeapIncrementalIndex` to use the new estimations only if `estimateMaxMemory` is false.
   <hr>
   
   This PR has:
   - [ ] been self-reviewed.
      - [ ] using the [concurrency checklist](https://github.com/apache/druid/blob/master/dev/code-review/concurrency.md) (Remove this item if the PR doesn't have any relation to concurrency.)
   - [ ] added documentation for new or modified features or behaviors.
   - [ ] added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
   - [ ] added or updated version, license, or notice information in [licenses.yaml](https://github.com/apache/druid/blob/master/dev/license.md)
   - [ ] added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
   - [ ] added unit tests or modified existing tests to cover new code paths, ensuring the threshold for [code coverage](https://github.com/apache/druid/blob/master/dev/code-review/code-coverage.md) is met.
   - [ ] added integration tests.
   - [ ] been tested in a test Druid cluster.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] clintropolis commented on a change in pull request #12073: Update Aggregator and AggregatorFactory interfaces to improve mem estimates

Posted by GitBox <gi...@apache.org>.
clintropolis commented on a change in pull request #12073:
URL: https://github.com/apache/druid/pull/12073#discussion_r791614256



##########
File path: processing/src/main/java/org/apache/druid/query/aggregation/AggregatorFactory.java
##########
@@ -69,6 +69,24 @@ public VectorAggregator factorizeVector(VectorColumnSelectorFactory selectorFact
     throw new UOE("Aggregator[%s] cannot vectorize", getClass().getName());
   }
 
+  /**
+   * Creates an {@link Aggregator} based on the provided column selector factory.
+   * The returned value is a holder object which contains both the aggregator
+   * and its initial size in bytes. The callers can then invoke
+   * {@link Aggregator#aggregateWithSize()} to perform aggregation and get back
+   * the incremental memory required in each aggregate call. Combined with the
+   * initial size, this gives the total on-heap memory required by the aggregator.
+   *
+   * This flow does not require invoking {@link #guessAggregatorHeapFootprint(long)}
+   * which tends to over-estimate the required memory.
+   *
+   * @return AggregatorAndSize which contains the actual aggregator and its initial size.
+   */
+  public AggregatorAndSize factorizeWithSize(ColumnSelectorFactory metricFactory)

Review comment:
       same question about contract about returned sizes. Also, I wonder if there is anything we could do to make sure this method is overridden if `aggregateWithSize` is implemented, so that the initial size is not the max size...

##########
File path: processing/src/main/java/org/apache/druid/segment/StringDimensionIndexer.java
##########
@@ -60,20 +60,29 @@ private static String emptyToNullIfNeeded(@Nullable Object o)
   private final MultiValueHandling multiValueHandling;
   private final boolean hasBitmapIndexes;
   private final boolean hasSpatialIndexes;
+  private final boolean useMaxMemoryEstimates;
   private volatile boolean hasMultipleValues = false;
 
-  public StringDimensionIndexer(MultiValueHandling multiValueHandling, boolean hasBitmapIndexes, boolean hasSpatialIndexes)
+  public StringDimensionIndexer(
+      MultiValueHandling multiValueHandling,
+      boolean hasBitmapIndexes,
+      boolean hasSpatialIndexes,
+      boolean useMaxMemoryEstimates
+  )
   {
+    super(useMaxMemoryEstimates ? new DimensionDictionary<>() : new StringDimensionDictionary());

Review comment:
       nit: this seems strange/confusing, why wouldn't `StringDimensionIndexer` always use `StringDimensionDictionary` here? it seems like could just pass in a value of `false` to control the value of `computeOnHeapSize` instead of sometimes not using `StringDimensionDictionary`

##########
File path: processing/src/main/java/org/apache/druid/segment/StringDimensionIndexer.java
##########
@@ -60,20 +60,29 @@ private static String emptyToNullIfNeeded(@Nullable Object o)
   private final MultiValueHandling multiValueHandling;
   private final boolean hasBitmapIndexes;
   private final boolean hasSpatialIndexes;
+  private final boolean useMaxMemoryEstimates;
   private volatile boolean hasMultipleValues = false;
 
-  public StringDimensionIndexer(MultiValueHandling multiValueHandling, boolean hasBitmapIndexes, boolean hasSpatialIndexes)
+  public StringDimensionIndexer(
+      MultiValueHandling multiValueHandling,
+      boolean hasBitmapIndexes,
+      boolean hasSpatialIndexes,
+      boolean useMaxMemoryEstimates
+  )
   {
+    super(useMaxMemoryEstimates ? new DimensionDictionary<>() : new StringDimensionDictionary());
     this.multiValueHandling = multiValueHandling == null ? MultiValueHandling.ofDefault() : multiValueHandling;
     this.hasBitmapIndexes = hasBitmapIndexes;
     this.hasSpatialIndexes = hasSpatialIndexes;
+    this.useMaxMemoryEstimates = useMaxMemoryEstimates;
   }
 
   @Override
-  public int[] processRowValsToUnsortedEncodedKeyComponent(@Nullable Object dimValues, boolean reportParseExceptions)
+  public EncodedKeyComponent<int[]> processRowValsToUnsortedEncodedKeyComponent(@Nullable Object dimValues, boolean reportParseExceptions)
   {
     final int[] encodedDimensionValues;
     final int oldDictSize = dimLookup.size();
+    final long oldDictSizeInBytes = useMaxMemoryEstimates ? 0 : dimLookup.sizeInBytes();

Review comment:
       nit: this is only used inside of the else near the end of the method

##########
File path: extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregator.java
##########
@@ -21,23 +21,43 @@
 
 import org.apache.datasketches.Family;
 import org.apache.datasketches.theta.SetOperation;
+import org.apache.datasketches.theta.Sketch;
 import org.apache.datasketches.theta.Union;
 import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.query.aggregation.Aggregator;
 import org.apache.druid.segment.BaseObjectColumnValueSelector;
 
 import javax.annotation.Nullable;
+import java.lang.reflect.Field;
 import java.util.List;
 
 public class SketchAggregator implements Aggregator
 {
+
   private final BaseObjectColumnValueSelector selector;
   private final int size;
 
   @Nullable
   private Union union;
 
+  @Nullable
+  private Sketch sketch;
+
+  @Nullable
+  private static final Field SKETCH_FIELD;
+
+  static {
+    try {
+      SKETCH_FIELD = Class.forName("org.apache.datasketches.theta.UnionImpl")
+                          .getDeclaredField("gadget_");
+      SKETCH_FIELD.setAccessible(true);
+    }
+    catch (NoSuchFieldException | ClassNotFoundException e) {
+      throw new ISE(e, "Could not initialize SketchAggregator");
+    }

Review comment:
       this seems worth a comment, and maybe a link to the code?

##########
File path: processing/src/main/java/org/apache/druid/query/aggregation/Aggregator.java
##########
@@ -42,6 +42,12 @@
 {
   void aggregate();
 
+  default long aggregateWithSize()

Review comment:
       what is the contract of this value, I don't see the word estimate in here, but think it probably should be... should implementors over-estimate if exact sizing is not possible or is under-estimating fine? Should there be a warning that the default estimate is used? (i imagine this would be very noisy if it is done per aggregate call... so don't really recommend doing it here or anything...)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] clintropolis commented on a change in pull request #12073: Update Aggregator and AggregatorFactory interfaces to improve mem estimates

Posted by GitBox <gi...@apache.org>.
clintropolis commented on a change in pull request #12073:
URL: https://github.com/apache/druid/pull/12073#discussion_r783009496



##########
File path: processing/src/main/java/org/apache/druid/segment/DimensionDictionary.java
##########
@@ -160,4 +173,16 @@ public int getIdForNull()
       lock.readLock().unlock();
     }
   }
+
+  private long getObjectSize(@Nonnull T object)

Review comment:
       hmm, this method is presumptuous and breaks the contract of this class being generic. I think a size estimator function should be passed into this method, it needs to be public so that callers can override it, or maybe it should be abstract and some `StringDimensionDictionary` should be implemented to minimize function calls since its going to be a pretty hot method

##########
File path: processing/src/main/java/org/apache/druid/query/aggregation/AggregatorAndSize.java
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+/**
+ * Encapsulates an {@link Aggregator} and the initial size in bytes required by
+ * the Aggregator.
+ */
+public class AggregatorAndSize
+{
+
+  // TODO: include default overhead for object sizes

Review comment:
       nit: unresolved todo

##########
File path: processing/src/main/java/org/apache/druid/segment/DimensionIndexer.java
##########
@@ -127,7 +127,7 @@
    * @return An array containing an encoded representation of the input row value.
    */
   @Nullable
-  EncodedKeyComponentType processRowValsToUnsortedEncodedKeyComponent(@Nullable Object dimValues, boolean reportParseExceptions);
+  EncodedDimensionValue<EncodedKeyComponentType> processRowValsToUnsortedEncodedKeyComponent(@Nullable Object dimValues, boolean reportParseExceptions);

Review comment:
       nit: javadoc params and return type needs updated.. but this isn't really new, it basically needed updated long ago, it is not always an array 😅 

##########
File path: server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java
##########
@@ -271,6 +271,7 @@ private Sink getSink(long timestamp)
           config.getAppendableIndexSpec(),
           config.getMaxRowsInMemory(),
           config.getMaxBytesInMemoryOrDefault(),
+          true,

Review comment:
       i've noticed a few places that aren't wired up to config are using true, so using the new behavior with no way out, is that intentional? this one in particular probably doesn't matter all that much these days (i hope), but i'm less sure about all of them, and it isn't consistent because i see some hard coded false in there too

##########
File path: processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
##########
@@ -557,12 +573,17 @@ IncrementalIndexRowResult toIncrementalIndexRow(InputRow row)
         DimensionIndexer indexer = desc.getIndexer();
         Object dimsKey = null;
         try {
-          dimsKey = indexer.processRowValsToUnsortedEncodedKeyComponent(row.getRaw(dimension), true);
+          final EncodedDimensionValue<?> encodedDimensionValue
+              = indexer.processRowValsToUnsortedEncodedKeyComponent(row.getRaw(dimension), true);

Review comment:
       i guess due to the way this refactor was done (compared to aggs) there is no real way to turn off calculating the estimates, even if we aren't using them. maybe it doesn't matter if there is no/minimal performance impact




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a change in pull request #12073: [WIP] Update Aggregator and AggregatorFactory interfaces to improve mem estimates

Posted by GitBox <gi...@apache.org>.
kfaraz commented on a change in pull request #12073:
URL: https://github.com/apache/druid/pull/12073#discussion_r777843763



##########
File path: extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregator.java
##########
@@ -21,23 +21,45 @@
 
 import org.apache.datasketches.Family;
 import org.apache.datasketches.theta.SetOperation;
+import org.apache.datasketches.theta.Sketch;
 import org.apache.datasketches.theta.Union;
 import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.query.aggregation.Aggregator;
 import org.apache.druid.segment.BaseObjectColumnValueSelector;
 
 import javax.annotation.Nullable;
+import java.lang.reflect.Field;
 import java.util.List;
 
 public class SketchAggregator implements Aggregator
 {
+  private static final Logger LOG = new Logger(SketchAggregator.class);
+
   private final BaseObjectColumnValueSelector selector;
   private final int size;
 
   @Nullable
   private Union union;
 
+  @Nullable
+  private Sketch sketch;
+
+  @Nullable
+  private static Field sketchField;
+  static {
+    try {
+      sketchField = Class.forName("org.apache.datasketches.theta.UnionImpl")
+                         .getDeclaredField("gadget_");
+      sketchField.setAccessible(true);
+    }
+    catch (NoSuchFieldException | ClassNotFoundException e) {
+      LOG.error(e, "Could not initialize 'sketchField'");

Review comment:
       Okay. For now, we can just fail loudly. The additional config can be done as a follow up.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a change in pull request #12073: [WIP] Update Aggregator and AggregatorFactory interfaces to improve mem estimates

Posted by GitBox <gi...@apache.org>.
kfaraz commented on a change in pull request #12073:
URL: https://github.com/apache/druid/pull/12073#discussion_r782359628



##########
File path: extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregator.java
##########
@@ -64,6 +86,25 @@ public void aggregate()
     }
   }
 
+  @Override
+  public long aggregateWithSize()

Review comment:
       Re-structured to remove dependence between the methods `getInitialSizeBytes()` and `aggregateWithSize()`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on pull request #12073: Update Aggregator and AggregatorFactory interfaces to improve mem estimates

Posted by GitBox <gi...@apache.org>.
kfaraz commented on pull request #12073:
URL: https://github.com/apache/druid/pull/12073#issuecomment-1014184047


   > You should also document the approach that was finalized in the proposal and this implementation is based on. 
   
   @abhishekagarwal87 , I have added an overview of the approach in `OnHeapIncrementalIndex`.
   I have also added javadocs for the methods in `Aggregator` and `AggregatorFactory`, but they contain information only relevant to those methods.
   
   Please let me know if this is sufficient.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] lgtm-com[bot] commented on pull request #12073: [WIP] Update Aggregator and AggregatorFactory interfaces to improve mem estimates

Posted by GitBox <gi...@apache.org>.
lgtm-com[bot] commented on pull request #12073:
URL: https://github.com/apache/druid/pull/12073#issuecomment-1010232910


   This pull request **introduces 3 alerts** when merging 348f17eb388b24644e2775201a416df25cebec19 into eb0bae49eca5013c7121994e20f9cd8146df8424 - [view on LGTM.com](https://lgtm.com/projects/g/apache/druid/rev/pr-83ec0295e46c0edc2c4bb3917aef680b8d037522)
   
   **new alerts:**
   
   * 2 for Unused format argument
   * 1 for Result of multiplication cast to wider type


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a change in pull request #12073: Update Aggregator and AggregatorFactory interfaces to improve mem estimates

Posted by GitBox <gi...@apache.org>.
kfaraz commented on a change in pull request #12073:
URL: https://github.com/apache/druid/pull/12073#discussion_r796310048



##########
File path: processing/src/main/java/org/apache/druid/query/aggregation/Aggregator.java
##########
@@ -42,6 +42,12 @@
 {
   void aggregate();
 
+  default long aggregateWithSize()

Review comment:
       > Should there be a warning that the default estimate is used?
   
   It would make sense to give this warning when `factorizeWithSize` is overridden but `aggregateWithSize` is not. In such a case, we might be significantly underestimating the memory usage.
   
   As you said, doing it here might be noisy. A viable approach could be to have `factorizeWithSize` return a wrapper Aggregator which does not allow the regular `aggregate` (will be addressed in the subsequent PR).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a change in pull request #12073: Update Aggregator and AggregatorFactory interfaces to improve mem estimates

Posted by GitBox <gi...@apache.org>.
kfaraz commented on a change in pull request #12073:
URL: https://github.com/apache/druid/pull/12073#discussion_r785662541



##########
File path: processing/src/main/java/org/apache/druid/segment/DimensionDictionary.java
##########
@@ -160,4 +173,16 @@ public int getIdForNull()
       lock.readLock().unlock();
     }
   }
+
+  private long getObjectSize(@Nonnull T object)

Review comment:
       Used the `StringDimensionDictionary` suggestion, although I have not made it abstract so that implementations using the `DimensionDictionary` can continue to use it as the base concrete class.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a change in pull request #12073: Update Aggregator and AggregatorFactory interfaces to improve mem estimates

Posted by GitBox <gi...@apache.org>.
kfaraz commented on a change in pull request #12073:
URL: https://github.com/apache/druid/pull/12073#discussion_r785746009



##########
File path: processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
##########
@@ -557,12 +573,17 @@ IncrementalIndexRowResult toIncrementalIndexRow(InputRow row)
         DimensionIndexer indexer = desc.getIndexer();
         Object dimsKey = null;
         try {
-          dimsKey = indexer.processRowValsToUnsortedEncodedKeyComponent(row.getRaw(dimension), true);
+          final EncodedDimensionValue<?> encodedDimensionValue
+              = indexer.processRowValsToUnsortedEncodedKeyComponent(row.getRaw(dimension), true);

Review comment:
       Updated `DimensionDictionary` and `StringDimensionIndexer` to turn off estimations if not needed.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a change in pull request #12073: Update Aggregator and AggregatorFactory interfaces to improve mem estimates

Posted by GitBox <gi...@apache.org>.
kfaraz commented on a change in pull request #12073:
URL: https://github.com/apache/druid/pull/12073#discussion_r785662074



##########
File path: processing/src/main/java/org/apache/druid/query/aggregation/AggregatorAndSize.java
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+/**
+ * Encapsulates an {@link Aggregator} and the initial size in bytes required by
+ * the Aggregator.
+ */
+public class AggregatorAndSize
+{
+
+  // TODO: include default overhead for object sizes

Review comment:
       Addressed in the caller.

##########
File path: processing/src/main/java/org/apache/druid/query/aggregation/AggregatorAndSize.java
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+/**
+ * Encapsulates an {@link Aggregator} and the initial size in bytes required by
+ * the Aggregator.
+ */
+public class AggregatorAndSize
+{
+
+  // TODO: include default overhead for object sizes

Review comment:
       Fixed. Addressed in the caller.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] lgtm-com[bot] commented on pull request #12073: Update Aggregator and AggregatorFactory interfaces to improve mem estimates

Posted by GitBox <gi...@apache.org>.
lgtm-com[bot] commented on pull request #12073:
URL: https://github.com/apache/druid/pull/12073#issuecomment-1014190344


   This pull request **introduces 1 alert** when merging 9908c0baa918b03b61f570329e0c412a404448db into b55f7a25febec8979263e4edf1d8b4ec7692f968 - [view on LGTM.com](https://lgtm.com/projects/g/apache/druid/rev/pr-7931d450869b2fdda439260cfff5c12b0c361fc7)
   
   **new alerts:**
   
   * 1 for Result of multiplication cast to wider type


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a change in pull request #12073: Update Aggregator and AggregatorFactory interfaces to improve mem estimates

Posted by GitBox <gi...@apache.org>.
kfaraz commented on a change in pull request #12073:
URL: https://github.com/apache/druid/pull/12073#discussion_r783081365



##########
File path: processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
##########
@@ -557,12 +573,17 @@ IncrementalIndexRowResult toIncrementalIndexRow(InputRow row)
         DimensionIndexer indexer = desc.getIndexer();
         Object dimsKey = null;
         try {
-          dimsKey = indexer.processRowValsToUnsortedEncodedKeyComponent(row.getRaw(dimension), true);
+          final EncodedDimensionValue<?> encodedDimensionValue
+              = indexer.processRowValsToUnsortedEncodedKeyComponent(row.getRaw(dimension), true);

Review comment:
       Yes, I was not too sure about this either. I will take another and see if we can separate the two flows without too much duplication.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a change in pull request #12073: Update Aggregator and AggregatorFactory interfaces to improve mem estimates

Posted by GitBox <gi...@apache.org>.
kfaraz commented on a change in pull request #12073:
URL: https://github.com/apache/druid/pull/12073#discussion_r783080415



##########
File path: server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java
##########
@@ -271,6 +271,7 @@ private Sink getSink(long timestamp)
           config.getAppendableIndexSpec(),
           config.getMaxRowsInMemory(),
           config.getMaxBytesInMemoryOrDefault(),
+          true,

Review comment:
       The value of the flag `useMaxMemoryEstimates = true` represents old behaviour.
   
   The hard coding has been done only for the following classes:
   - `RealtimePlumber` and related classes (hopefully not used anymore)
   - `OnHeapIncrementalIndexBenchmark` (accidentally hardcoded to false, fixing this)
   
   It should not be hard-coded anywhere else except maybe tests.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a change in pull request #12073: Update Aggregator and AggregatorFactory interfaces to improve mem estimates

Posted by GitBox <gi...@apache.org>.
kfaraz commented on a change in pull request #12073:
URL: https://github.com/apache/druid/pull/12073#discussion_r785396124



##########
File path: processing/src/main/java/org/apache/druid/query/aggregation/Aggregator.java
##########
@@ -42,6 +42,12 @@
 {
   void aggregate();
 
+  default long aggregateWithSize()

Review comment:
       Thanks for the reminder, @abhishekagarwal87. I am adding javadocs wherever missing.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a change in pull request #12073: [WIP] Update Aggregator and AggregatorFactory interfaces to improve mem estimates

Posted by GitBox <gi...@apache.org>.
kfaraz commented on a change in pull request #12073:
URL: https://github.com/apache/druid/pull/12073#discussion_r782359195



##########
File path: extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregator.java
##########
@@ -133,4 +174,34 @@ static void updateUnion(Union union, Object update)
       throw new ISE("Illegal type received while theta sketch merging [%s]", update.getClass());
     }
   }
+
+  /**
+   * Gets the initial size of this aggregator in bytes.
+   */
+  public long getInitialSizeBytes()
+  {
+    // SketchAggregator has 3 references and an int
+    // UnionImpl has a reference, a short, a long, a boolean
+    long sizeOfReferences = 3L * Long.BYTES + Integer.BYTES
+                            + Long.BYTES + Short.BYTES + Long.BYTES + 1;
+    if (sketchField == null) {
+      return sizeOfReferences;
+    }
+
+    // Initialize the sketch if not already initialized
+    if (union == null) {
+      initUnion();

Review comment:
       Re-structured the code to initialize lazily and account for the size only when initializing.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a change in pull request #12073: Update Aggregator and AggregatorFactory interfaces to improve mem estimates

Posted by GitBox <gi...@apache.org>.
kfaraz commented on a change in pull request #12073:
URL: https://github.com/apache/druid/pull/12073#discussion_r796301816



##########
File path: processing/src/main/java/org/apache/druid/segment/StringDimensionIndexer.java
##########
@@ -60,20 +60,29 @@ private static String emptyToNullIfNeeded(@Nullable Object o)
   private final MultiValueHandling multiValueHandling;
   private final boolean hasBitmapIndexes;
   private final boolean hasSpatialIndexes;
+  private final boolean useMaxMemoryEstimates;
   private volatile boolean hasMultipleValues = false;
 
-  public StringDimensionIndexer(MultiValueHandling multiValueHandling, boolean hasBitmapIndexes, boolean hasSpatialIndexes)
+  public StringDimensionIndexer(
+      MultiValueHandling multiValueHandling,
+      boolean hasBitmapIndexes,
+      boolean hasSpatialIndexes,
+      boolean useMaxMemoryEstimates
+  )
   {
+    super(useMaxMemoryEstimates ? new DimensionDictionary<>() : new StringDimensionDictionary());

Review comment:
       You are right, it is weird.
   Fixed it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a change in pull request #12073: Update Aggregator and AggregatorFactory interfaces to improve mem estimates

Posted by GitBox <gi...@apache.org>.
kfaraz commented on a change in pull request #12073:
URL: https://github.com/apache/druid/pull/12073#discussion_r783080415



##########
File path: server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java
##########
@@ -271,6 +271,7 @@ private Sink getSink(long timestamp)
           config.getAppendableIndexSpec(),
           config.getMaxRowsInMemory(),
           config.getMaxBytesInMemoryOrDefault(),
+          true,

Review comment:
       The value of the flag `useMaxMemoryEstimates = true` represents old behaviour.
   
   The hard coding has been done only for the following classes:
   - `RealtimePlumber` and related classes (hopefully not used anymore)
   - `OnHeapIncrementalIndexBenchmark` (accidentally hardcoded to false, fixing this)
   
   It should NOT be hardcoded to false in any of these cases.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a change in pull request #12073: Update Aggregator and AggregatorFactory interfaces to improve mem estimates

Posted by GitBox <gi...@apache.org>.
kfaraz commented on a change in pull request #12073:
URL: https://github.com/apache/druid/pull/12073#discussion_r783081365



##########
File path: processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
##########
@@ -557,12 +573,17 @@ IncrementalIndexRowResult toIncrementalIndexRow(InputRow row)
         DimensionIndexer indexer = desc.getIndexer();
         Object dimsKey = null;
         try {
-          dimsKey = indexer.processRowValsToUnsortedEncodedKeyComponent(row.getRaw(dimension), true);
+          final EncodedDimensionValue<?> encodedDimensionValue
+              = indexer.processRowValsToUnsortedEncodedKeyComponent(row.getRaw(dimension), true);

Review comment:
       Yes, I was not too sure about this either. I will take another look and see if we can separate the two flows without too much duplication.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] imply-cheddar commented on a change in pull request #12073: Update Aggregator and AggregatorFactory interfaces to improve mem estimates

Posted by GitBox <gi...@apache.org>.
imply-cheddar commented on a change in pull request #12073:
URL: https://github.com/apache/druid/pull/12073#discussion_r782717538



##########
File path: processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
##########
@@ -478,6 +479,11 @@ public IncrementalIndexAddResult add(InputRow row) throws IndexSizeExceededExcep
     return add(row, false);
   }
 
+  public IncrementalIndexAddResult add(InputRow row, boolean skipMaxRowsInMemoryCheck) throws IndexSizeExceededException
+  {
+    return add(row, false, true);

Review comment:
       This appears to be ignoring `skipMaxRowsInMemoryCheck` is that intentional?  Probably worth a comment as to why that's intentional if it is.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a change in pull request #12073: [WIP] Update Aggregator and AggregatorFactory interfaces to improve mem estimates

Posted by GitBox <gi...@apache.org>.
kfaraz commented on a change in pull request #12073:
URL: https://github.com/apache/druid/pull/12073#discussion_r782360092



##########
File path: extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregator.java
##########
@@ -21,23 +21,45 @@
 
 import org.apache.datasketches.Family;
 import org.apache.datasketches.theta.SetOperation;
+import org.apache.datasketches.theta.Sketch;
 import org.apache.datasketches.theta.Union;
 import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.query.aggregation.Aggregator;
 import org.apache.druid.segment.BaseObjectColumnValueSelector;
 
 import javax.annotation.Nullable;
+import java.lang.reflect.Field;
 import java.util.List;
 
 public class SketchAggregator implements Aggregator
 {
+  private static final Logger LOG = new Logger(SketchAggregator.class);
+
   private final BaseObjectColumnValueSelector selector;
   private final int size;
 
   @Nullable
   private Union union;
 
+  @Nullable
+  private Sketch sketch;
+
+  @Nullable
+  private static Field sketchField;

Review comment:
       Updated.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a change in pull request #12073: [WIP] Update Aggregator and AggregatorFactory interfaces to improve mem estimates

Posted by GitBox <gi...@apache.org>.
kfaraz commented on a change in pull request #12073:
URL: https://github.com/apache/druid/pull/12073#discussion_r782358659



##########
File path: processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
##########
@@ -225,13 +236,19 @@ protected AddToFactsResult addToFacts(
    * </ul>
    *
    * @param key                          TimeAndDims key
-   * @param maxBytesPerRowForAggregators max size per aggregator
-   *
+   * @param maxBytesPerRowForAggregators max size per row for aggregators
+   * @param actualRowSizeForAggregators  actual aggregator size for this row
    * @return estimated size of row
    */
-  private long estimateRowSizeInBytes(IncrementalIndexRow key, long maxBytesPerRowForAggregators)
+  private long estimateRowSizeInBytes(
+      IncrementalIndexRow key,
+      long maxBytesPerRowForAggregators,
+      long actualRowSizeForAggregators
+  )
   {
-    return ROUGH_OVERHEAD_PER_MAP_ENTRY + key.estimateBytesInMemory() + maxBytesPerRowForAggregators;
+    return ROUGH_OVERHEAD_PER_MAP_ENTRY
+           + key.estimateBytesInMemory()
+           + (useMaxMemoryEstimates ? maxBytesPerRowForAggregators : actualRowSizeForAggregators);

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a change in pull request #12073: [WIP] Update Aggregator and AggregatorFactory interfaces to improve mem estimates

Posted by GitBox <gi...@apache.org>.
kfaraz commented on a change in pull request #12073:
URL: https://github.com/apache/druid/pull/12073#discussion_r782359628



##########
File path: extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregator.java
##########
@@ -64,6 +86,25 @@ public void aggregate()
     }
   }
 
+  @Override
+  public long aggregateWithSize()

Review comment:
       Re-structured to remove dependence between the methods `initUnion()` and `aggregateWithSize()`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a change in pull request #12073: Update Aggregator and AggregatorFactory interfaces to improve mem estimates

Posted by GitBox <gi...@apache.org>.
kfaraz commented on a change in pull request #12073:
URL: https://github.com/apache/druid/pull/12073#discussion_r782722614



##########
File path: processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
##########
@@ -478,6 +479,11 @@ public IncrementalIndexAddResult add(InputRow row) throws IndexSizeExceededExcep
     return add(row, false);
   }
 
+  public IncrementalIndexAddResult add(InputRow row, boolean skipMaxRowsInMemoryCheck) throws IndexSizeExceededException
+  {
+    return add(row, false, true);

Review comment:
       Thanks a lot for catching this! I must have missed it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a change in pull request #12073: Update Aggregator and AggregatorFactory interfaces to improve mem estimates

Posted by GitBox <gi...@apache.org>.
kfaraz commented on a change in pull request #12073:
URL: https://github.com/apache/druid/pull/12073#discussion_r796302979



##########
File path: processing/src/main/java/org/apache/druid/segment/StringDimensionIndexer.java
##########
@@ -60,20 +60,29 @@ private static String emptyToNullIfNeeded(@Nullable Object o)
   private final MultiValueHandling multiValueHandling;
   private final boolean hasBitmapIndexes;
   private final boolean hasSpatialIndexes;
+  private final boolean useMaxMemoryEstimates;
   private volatile boolean hasMultipleValues = false;
 
-  public StringDimensionIndexer(MultiValueHandling multiValueHandling, boolean hasBitmapIndexes, boolean hasSpatialIndexes)
+  public StringDimensionIndexer(
+      MultiValueHandling multiValueHandling,
+      boolean hasBitmapIndexes,
+      boolean hasSpatialIndexes,
+      boolean useMaxMemoryEstimates
+  )
   {
+    super(useMaxMemoryEstimates ? new DimensionDictionary<>() : new StringDimensionDictionary());
     this.multiValueHandling = multiValueHandling == null ? MultiValueHandling.ofDefault() : multiValueHandling;
     this.hasBitmapIndexes = hasBitmapIndexes;
     this.hasSpatialIndexes = hasSpatialIndexes;
+    this.useMaxMemoryEstimates = useMaxMemoryEstimates;
   }
 
   @Override
-  public int[] processRowValsToUnsortedEncodedKeyComponent(@Nullable Object dimValues, boolean reportParseExceptions)
+  public EncodedKeyComponent<int[]> processRowValsToUnsortedEncodedKeyComponent(@Nullable Object dimValues, boolean reportParseExceptions)
   {
     final int[] encodedDimensionValues;
     final int oldDictSize = dimLookup.size();
+    final long oldDictSizeInBytes = useMaxMemoryEstimates ? 0 : dimLookup.sizeInBytes();

Review comment:
       We need the size of the dictionary before adding the dimension values.
   At the end, we take the final size of dictionary and check the diff.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on pull request #12073: Update Aggregator and AggregatorFactory interfaces to improve mem estimates

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on pull request #12073:
URL: https://github.com/apache/druid/pull/12073#issuecomment-1026779027


   @kfaraz CI failures might be legit. can you fix those before merging? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz merged pull request #12073: Update Aggregator and AggregatorFactory interfaces to improve mem estimates

Posted by GitBox <gi...@apache.org>.
kfaraz merged pull request #12073:
URL: https://github.com/apache/druid/pull/12073


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a change in pull request #12073: Update Aggregator and AggregatorFactory interfaces to improve mem estimates

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on a change in pull request #12073:
URL: https://github.com/apache/druid/pull/12073#discussion_r783146278



##########
File path: processing/src/main/java/org/apache/druid/query/aggregation/AggregatorAndSize.java
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+/**
+ * Encapsulates an {@link Aggregator} and the initial size in bytes required by
+ * the Aggregator.
+ */
+public class AggregatorAndSize
+{
+
+  // TODO: include default overhead for object sizes
+
+  private final Aggregator aggregator;
+  private final long initialSizeBytes;

Review comment:
       can you add more info like is this total on-heap footprint that includes JVM object overhead or that overhead is not considered in this initial size? 

##########
File path: processing/src/main/java/org/apache/druid/query/aggregation/Aggregator.java
##########
@@ -42,6 +42,12 @@
 {
   void aggregate();
 
+  default long aggregateWithSize()

Review comment:
       this method needs javadocs

##########
File path: processing/src/main/java/org/apache/druid/segment/EncodedDimensionValue.java
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import javax.annotation.Nullable;
+
+/**
+ * @param <K> Encoded key component type
+ */
+public class EncodedDimensionValue<K>
+{
+  @Nullable
+  private final K value;
+  private final long incrementalSizeBytes;

Review comment:
       can you add a comment on what this thing is the incremental size of

##########
File path: processing/src/main/java/org/apache/druid/segment/DimensionDictionary.java
##########
@@ -160,4 +173,16 @@ public int getIdForNull()
       lock.readLock().unlock();
     }
   }
+
+  private long getObjectSize(@Nonnull T object)

Review comment:
       +1. 

##########
File path: extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java
##########
@@ -80,6 +81,14 @@ public Aggregator factorize(ColumnSelectorFactory metricFactory)
     return new SketchAggregator(selector, size);
   }
 
+  @Override
+  public AggregatorAndSize factorizeWithSize(ColumnSelectorFactory metricFactory)

Review comment:
       this needs some documentation. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a change in pull request #12073: Update Aggregator and AggregatorFactory interfaces to improve mem estimates

Posted by GitBox <gi...@apache.org>.
kfaraz commented on a change in pull request #12073:
URL: https://github.com/apache/druid/pull/12073#discussion_r796304649



##########
File path: processing/src/main/java/org/apache/druid/query/aggregation/AggregatorFactory.java
##########
@@ -69,6 +69,24 @@ public VectorAggregator factorizeVector(VectorColumnSelectorFactory selectorFact
     throw new UOE("Aggregator[%s] cannot vectorize", getClass().getName());
   }
 
+  /**
+   * Creates an {@link Aggregator} based on the provided column selector factory.
+   * The returned value is a holder object which contains both the aggregator
+   * and its initial size in bytes. The callers can then invoke
+   * {@link Aggregator#aggregateWithSize()} to perform aggregation and get back
+   * the incremental memory required in each aggregate call. Combined with the
+   * initial size, this gives the total on-heap memory required by the aggregator.
+   *
+   * This flow does not require invoking {@link #guessAggregatorHeapFootprint(long)}
+   * which tends to over-estimate the required memory.
+   *
+   * @return AggregatorAndSize which contains the actual aggregator and its initial size.
+   */
+  public AggregatorAndSize factorizeWithSize(ColumnSelectorFactory metricFactory)

Review comment:
       Updated the javadoc to advise on the required estimation.
   
   
   > Also, I wonder if there is anything we could do to make sure this method is overridden if aggregateWithSize is implemented
   
   I guess it is okay even if it isn't overridden because we would only be overestimating which would not cause failures, only somewhat poorer estimates.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a change in pull request #12073: Update Aggregator and AggregatorFactory interfaces to improve mem estimates

Posted by GitBox <gi...@apache.org>.
kfaraz commented on a change in pull request #12073:
URL: https://github.com/apache/druid/pull/12073#discussion_r783082379



##########
File path: processing/src/main/java/org/apache/druid/segment/DimensionDictionary.java
##########
@@ -160,4 +173,16 @@ public int getIdForNull()
       lock.readLock().unlock();
     }
   }
+
+  private long getObjectSize(@Nonnull T object)

Review comment:
       Thanks for pointing this out! I will see how we can make this cleaner.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] imply-cheddar commented on a change in pull request #12073: [WIP] Update Aggregator and AggregatorFactory interfaces to improve mem estimates

Posted by GitBox <gi...@apache.org>.
imply-cheddar commented on a change in pull request #12073:
URL: https://github.com/apache/druid/pull/12073#discussion_r777824055



##########
File path: extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregator.java
##########
@@ -21,23 +21,45 @@
 
 import org.apache.datasketches.Family;
 import org.apache.datasketches.theta.SetOperation;
+import org.apache.datasketches.theta.Sketch;
 import org.apache.datasketches.theta.Union;
 import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.query.aggregation.Aggregator;
 import org.apache.druid.segment.BaseObjectColumnValueSelector;
 
 import javax.annotation.Nullable;
+import java.lang.reflect.Field;
 import java.util.List;
 
 public class SketchAggregator implements Aggregator
 {
+  private static final Logger LOG = new Logger(SketchAggregator.class);

Review comment:
       This is the nittiest of nits, but the code tends to keep the name of the logger lower case.

##########
File path: extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregator.java
##########
@@ -21,23 +21,45 @@
 
 import org.apache.datasketches.Family;
 import org.apache.datasketches.theta.SetOperation;
+import org.apache.datasketches.theta.Sketch;
 import org.apache.datasketches.theta.Union;
 import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.query.aggregation.Aggregator;
 import org.apache.druid.segment.BaseObjectColumnValueSelector;
 
 import javax.annotation.Nullable;
+import java.lang.reflect.Field;
 import java.util.List;
 
 public class SketchAggregator implements Aggregator
 {
+  private static final Logger LOG = new Logger(SketchAggregator.class);
+
   private final BaseObjectColumnValueSelector selector;
   private final int size;
 
   @Nullable
   private Union union;
 
+  @Nullable
+  private Sketch sketch;
+
+  @Nullable
+  private static Field sketchField;

Review comment:
       This can be `final`

##########
File path: processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
##########
@@ -225,13 +236,19 @@ protected AddToFactsResult addToFacts(
    * </ul>
    *
    * @param key                          TimeAndDims key
-   * @param maxBytesPerRowForAggregators max size per aggregator
-   *
+   * @param maxBytesPerRowForAggregators max size per row for aggregators
+   * @param actualRowSizeForAggregators  actual aggregator size for this row
    * @return estimated size of row
    */
-  private long estimateRowSizeInBytes(IncrementalIndexRow key, long maxBytesPerRowForAggregators)
+  private long estimateRowSizeInBytes(
+      IncrementalIndexRow key,
+      long maxBytesPerRowForAggregators,
+      long actualRowSizeForAggregators
+  )
   {
-    return ROUGH_OVERHEAD_PER_MAP_ENTRY + key.estimateBytesInMemory() + maxBytesPerRowForAggregators;
+    return ROUGH_OVERHEAD_PER_MAP_ENTRY
+           + key.estimateBytesInMemory()
+           + (useMaxMemoryEstimates ? maxBytesPerRowForAggregators : actualRowSizeForAggregators);

Review comment:
       In reading this code I found myself wondering why push down the choice of which value to use so low and add an extra parameter here instead of change the call site of `estimateRowSizeInBytes` to check and pass the "correct" value?

##########
File path: processing/src/main/java/org/apache/druid/query/aggregation/SizedAggregator.java
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+/**
+ * Encapsulates an {@link Aggregator} and the initial size in bytes required by
+ * the Aggregator.
+ */
+public class SizedAggregator

Review comment:
       Naming: given that this isn't actually an `Aggregator` object, it shouldn't end with `Aggregator`.  How about `AggregatorAndSize` instead?

##########
File path: extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregator.java
##########
@@ -64,6 +86,25 @@ public void aggregate()
     }
   }
 
+  @Override
+  public long aggregateWithSize()

Review comment:
       As I read this code, there's 2 different "lifecycle"s happening.  One where normal `aggregate()` is being called and using `initUnion()` and another where `aggregateWithSize()` is being called and expecting something external to have called `getInitialSizeBytes()`.
   
   I find myself wondering if it wouldn't be nicer to have two concrete classes each specialized to each lifecycle and not trying to mix the code together.  There's probably still a lot of room for re-use, but with the current mixing of the code, it seems relatively simple to accidentally use one when the other should've been used (i.e. if we separate out the classes, then the `aggregate()` method for the sized one can throw an `UnsupportedOperationException` which will ensure that it's never called when sizing is supposed to be used.

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
##########
@@ -849,6 +849,8 @@ private TaskStatus generateAndPublishSegments(
     final PartitionsSpec partitionsSpec = partitionAnalysis.getPartitionsSpec();
     final IndexTuningConfig tuningConfig = ingestionSchema.getTuningConfig();
     final long pushTimeout = tuningConfig.getPushTimeout();
+    tuningConfig.useMaxMemoryEstimates = getContextValue(

Review comment:
       If it's on the tuning config, then it can be taken as another parameter from the tuning config's JSON and doesn't need to be on the context.  Setting values on the config like this is a bit dangerous, so let's just take it on the constructor and stop looking at the context of the Task.

##########
File path: processing/src/main/java/org/apache/druid/segment/DimensionDictionary.java
##########
@@ -96,6 +99,12 @@ public int size()
     }
   }
 
+  public long sizeInBytes()
+  {
+    // TODO: size of map/list object itself?

Review comment:
       Make sure to do something to remove the TODOs before merge.  Either implement or add a comment that explains what's missing and why it's potentially an issue or just delete.

##########
File path: processing/src/main/java/org/apache/druid/segment/DimensionDictionary.java
##########
@@ -114,6 +123,11 @@ public int add(@Nullable T originalValue)
       final int index = idToValue.size();
       valueToId.put(originalValue, index);
       idToValue.add(originalValue);
+
+      // Add size of the String and two of its references to the total size
+      // TODO: map entry overhead?
+      sizeInBytes.addAndGet(getObjectSize(originalValue) + Long.BYTES + Long.BYTES);

Review comment:
       Instead of relying on the comment to be able to read what this code is doing, you can write it as
   
   ```
   long sizeOfReference = Long.BYTES;
   long sizeOfString = getObjectSize(originalValue);
   sizeInBytes.addAndGet(sizeOfString + (2 * sizeOfReference));
   ```
   
   Now, the actual code is saying the same thing that your comment was.

##########
File path: extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregator.java
##########
@@ -133,4 +174,34 @@ static void updateUnion(Union union, Object update)
       throw new ISE("Illegal type received while theta sketch merging [%s]", update.getClass());
     }
   }
+
+  /**
+   * Gets the initial size of this aggregator in bytes.
+   */
+  public long getInitialSizeBytes()
+  {
+    // SketchAggregator has 3 references and an int
+    // UnionImpl has a reference, a short, a long, a boolean
+    long sizeOfReferences = 3L * Long.BYTES + Integer.BYTES
+                            + Long.BYTES + Short.BYTES + Long.BYTES + 1;
+    if (sketchField == null) {
+      return sizeOfReferences;
+    }
+
+    // Initialize the sketch if not already initialized
+    if (union == null) {
+      initUnion();

Review comment:
       This feels a little bit wrong to me to init on the initial sizing.  If it starts out `null`, then its size is `0`.  It seems like we should either do this lazily, OR we should initialize the union on construction.

##########
File path: processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
##########
@@ -240,22 +257,46 @@ public int getLastRowIndex()
     return indexIncrement.get() - 1;
   }
 
-  private void factorizeAggs(
+  /**
+   * Creates aggregators for the given aggregator factories.
+   *
+   * @return Total initial size in bytes required by all the aggregators.
+   * This value is non-zero only when {@link #useMaxMemoryEstimates} is false.
+   */
+  private long factorizeAggs(
       AggregatorFactory[] metrics,
       Aggregator[] aggs,
       ThreadLocal<InputRow> rowContainer,
       InputRow row
   )
   {
+    long totalInitialSizeBytes = 0L;
     rowContainer.set(row);
     for (int i = 0; i < metrics.length; i++) {
       final AggregatorFactory agg = metrics[i];
-      aggs[i] = agg.factorize(selectors.get(agg.getName()));
+
+      if (useMaxMemoryEstimates) {
+        aggs[i] = agg.factorize(selectors.get(agg.getName()));
+      } else {
+        SizedAggregator sizedAggregator =
+            agg.factorizeSized(selectors.get(agg.getName()));
+        aggs[i] = sizedAggregator.getAggregator();
+        totalInitialSizeBytes += sizedAggregator.getInitialSizeBytes();
+      }
     }
     rowContainer.set(null);
+
+    return useMaxMemoryEstimates ? 0 : totalInitialSizeBytes;

Review comment:
       You've got the same check in multiple places.  I don't think you need the one here as `totalInitialSizeBytes` will be 0 in the `useMaxMemoryEstimates` case anyway.  But if you really want to ensure that the two code paths don't mix, move the `if(useMaxMemoryEstimates)` check to the top level and write the loop twice.  Once in a block that always returns `0` and another in a block that returns an aggregated sum.

##########
File path: processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
##########
@@ -265,11 +306,12 @@ private void doAggregate(
   {
     rowContainer.set(row);
 
+    long totalIncrementalBytes = 0L;
     for (int i = 0; i < aggs.length; i++) {
       final Aggregator agg = aggs[i];
       synchronized (agg) {
         try {
-          agg.aggregate();
+          totalIncrementalBytes += agg.aggregateWithSize();

Review comment:
       This is going to call the `WithSize` version even when it's not supposed to (when it's using the max size).  Please adjust to only call the `WithSize` version when the flag is set.

##########
File path: server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java
##########
@@ -34,6 +34,13 @@
 
   boolean isSkipBytesInMemoryOverheadCheck();
 
+  /**
+   * Whether maximum memory usage should be considered in estimation.
+   */
+  default boolean isUseMaxMemoryEstimates() {

Review comment:
       Should this really be a default method instead of becoming a real thing on the config object?

##########
File path: extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregator.java
##########
@@ -21,23 +21,45 @@
 
 import org.apache.datasketches.Family;
 import org.apache.datasketches.theta.SetOperation;
+import org.apache.datasketches.theta.Sketch;
 import org.apache.datasketches.theta.Union;
 import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.query.aggregation.Aggregator;
 import org.apache.druid.segment.BaseObjectColumnValueSelector;
 
 import javax.annotation.Nullable;
+import java.lang.reflect.Field;
 import java.util.List;
 
 public class SketchAggregator implements Aggregator
 {
+  private static final Logger LOG = new Logger(SketchAggregator.class);
+
   private final BaseObjectColumnValueSelector selector;
   private final int size;
 
   @Nullable
   private Union union;
 
+  @Nullable
+  private Sketch sketch;
+
+  @Nullable
+  private static Field sketchField;
+  static {
+    try {
+      sketchField = Class.forName("org.apache.datasketches.theta.UnionImpl")
+                         .getDeclaredField("gadget_");
+      sketchField.setAccessible(true);
+    }
+    catch (NoSuchFieldException | ClassNotFoundException e) {
+      LOG.error(e, "Could not initialize 'sketchField'");

Review comment:
       This will only happen if someone happens to have loaded a new/different version of sketches than is actually depended on by this current code.  If that happens, this error will put something into the logs that will be ignored (people don't look at logs until something actually explodes) and then silently ignore things.  When they are silently ignored, the estimation becomes incorrect and potentially starts causing OOMs where OOMs didn't exist previously.  If this happens, it will be super hard to track down why it happened.
   
   I would recommend that we actually explode loudly throwing an error out of the static initializer (which should effectively kill the process from actually starting in the first place).  If we want a way for someone to say "I know what I'm doing, ignore this please", we can add an extra config that the error message in the exception points to as a way to ignore things.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a change in pull request #12073: Update Aggregator and AggregatorFactory interfaces to improve mem estimates

Posted by GitBox <gi...@apache.org>.
kfaraz commented on a change in pull request #12073:
URL: https://github.com/apache/druid/pull/12073#discussion_r785661963



##########
File path: processing/src/main/java/org/apache/druid/segment/DimensionIndexer.java
##########
@@ -127,7 +127,7 @@
    * @return An array containing an encoded representation of the input row value.
    */
   @Nullable
-  EncodedKeyComponentType processRowValsToUnsortedEncodedKeyComponent(@Nullable Object dimValues, boolean reportParseExceptions);
+  EncodedDimensionValue<EncodedKeyComponentType> processRowValsToUnsortedEncodedKeyComponent(@Nullable Object dimValues, boolean reportParseExceptions);

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a change in pull request #12073: Update Aggregator and AggregatorFactory interfaces to improve mem estimates

Posted by GitBox <gi...@apache.org>.
kfaraz commented on a change in pull request #12073:
URL: https://github.com/apache/druid/pull/12073#discussion_r785656058



##########
File path: extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java
##########
@@ -80,6 +81,14 @@ public Aggregator factorize(ColumnSelectorFactory metricFactory)
     return new SketchAggregator(selector, size);
   }
 
+  @Override
+  public AggregatorAndSize factorizeWithSize(ColumnSelectorFactory metricFactory)

Review comment:
       Added.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a change in pull request #12073: [WIP] Update Aggregator and AggregatorFactory interfaces to improve mem estimates

Posted by GitBox <gi...@apache.org>.
kfaraz commented on a change in pull request #12073:
URL: https://github.com/apache/druid/pull/12073#discussion_r777844444



##########
File path: processing/src/main/java/org/apache/druid/query/aggregation/SizedAggregator.java
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+/**
+ * Encapsulates an {@link Aggregator} and the initial size in bytes required by
+ * the Aggregator.
+ */
+public class SizedAggregator

Review comment:
       Sounds much better 👍 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a change in pull request #12073: [WIP] Update Aggregator and AggregatorFactory interfaces to improve mem estimates

Posted by GitBox <gi...@apache.org>.
kfaraz commented on a change in pull request #12073:
URL: https://github.com/apache/druid/pull/12073#discussion_r782358233



##########
File path: processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
##########
@@ -265,11 +306,12 @@ private void doAggregate(
   {
     rowContainer.set(row);
 
+    long totalIncrementalBytes = 0L;
     for (int i = 0; i < aggs.length; i++) {
       final Aggregator agg = aggs[i];
       synchronized (agg) {
         try {
-          agg.aggregate();
+          totalIncrementalBytes += agg.aggregateWithSize();

Review comment:
       Fixed.

##########
File path: processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
##########
@@ -240,22 +257,46 @@ public int getLastRowIndex()
     return indexIncrement.get() - 1;
   }
 
-  private void factorizeAggs(
+  /**
+   * Creates aggregators for the given aggregator factories.
+   *
+   * @return Total initial size in bytes required by all the aggregators.
+   * This value is non-zero only when {@link #useMaxMemoryEstimates} is false.
+   */
+  private long factorizeAggs(
       AggregatorFactory[] metrics,
       Aggregator[] aggs,
       ThreadLocal<InputRow> rowContainer,
       InputRow row
   )
   {
+    long totalInitialSizeBytes = 0L;
     rowContainer.set(row);
     for (int i = 0; i < metrics.length; i++) {
       final AggregatorFactory agg = metrics[i];
-      aggs[i] = agg.factorize(selectors.get(agg.getName()));
+
+      if (useMaxMemoryEstimates) {
+        aggs[i] = agg.factorize(selectors.get(agg.getName()));
+      } else {
+        SizedAggregator sizedAggregator =
+            agg.factorizeSized(selectors.get(agg.getName()));
+        aggs[i] = sizedAggregator.getAggregator();
+        totalInitialSizeBytes += sizedAggregator.getInitialSizeBytes();
+      }
     }
     rowContainer.set(null);
+
+    return useMaxMemoryEstimates ? 0 : totalInitialSizeBytes;

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on pull request #12073: Update Aggregator and AggregatorFactory interfaces to improve mem estimates

Posted by GitBox <gi...@apache.org>.
kfaraz commented on pull request #12073:
URL: https://github.com/apache/druid/pull/12073#issuecomment-1011056846


   > have you done any measurement of the performance impact before/after this change so we know what we are getting ourselves into?
   
   Thanks for the review, @clintropolis !
   I don't have enough perf numbers yet which is why I have put the changes behind a flag for the time being.
   I am working on the perf evaluation. Once we are satisfied with the numbers, we can get rid of the flag altogether.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a change in pull request #12073: Update Aggregator and AggregatorFactory interfaces to improve mem estimates

Posted by GitBox <gi...@apache.org>.
kfaraz commented on a change in pull request #12073:
URL: https://github.com/apache/druid/pull/12073#discussion_r785662864



##########
File path: processing/src/main/java/org/apache/druid/query/aggregation/AggregatorAndSize.java
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+/**
+ * Encapsulates an {@link Aggregator} and the initial size in bytes required by
+ * the Aggregator.
+ */
+public class AggregatorAndSize
+{
+
+  // TODO: include default overhead for object sizes
+
+  private final Aggregator aggregator;
+  private final long initialSizeBytes;

Review comment:
       Added. It should account for JVM object overhead too.

##########
File path: processing/src/main/java/org/apache/druid/segment/EncodedDimensionValue.java
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import javax.annotation.Nullable;
+
+/**
+ * @param <K> Encoded key component type
+ */
+public class EncodedDimensionValue<K>
+{
+  @Nullable
+  private final K value;
+  private final long incrementalSizeBytes;

Review comment:
       Added.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] clintropolis commented on a change in pull request #12073: Update Aggregator and AggregatorFactory interfaces to improve mem estimates

Posted by GitBox <gi...@apache.org>.
clintropolis commented on a change in pull request #12073:
URL: https://github.com/apache/druid/pull/12073#discussion_r791614256



##########
File path: processing/src/main/java/org/apache/druid/query/aggregation/AggregatorFactory.java
##########
@@ -69,6 +69,24 @@ public VectorAggregator factorizeVector(VectorColumnSelectorFactory selectorFact
     throw new UOE("Aggregator[%s] cannot vectorize", getClass().getName());
   }
 
+  /**
+   * Creates an {@link Aggregator} based on the provided column selector factory.
+   * The returned value is a holder object which contains both the aggregator
+   * and its initial size in bytes. The callers can then invoke
+   * {@link Aggregator#aggregateWithSize()} to perform aggregation and get back
+   * the incremental memory required in each aggregate call. Combined with the
+   * initial size, this gives the total on-heap memory required by the aggregator.
+   *
+   * This flow does not require invoking {@link #guessAggregatorHeapFootprint(long)}
+   * which tends to over-estimate the required memory.
+   *
+   * @return AggregatorAndSize which contains the actual aggregator and its initial size.
+   */
+  public AggregatorAndSize factorizeWithSize(ColumnSelectorFactory metricFactory)

Review comment:
       same question about contract about returned sizes. Also, I wonder if there is anything we could do to make sure this method is overridden if `aggregateWithSize` is implemented, so that the initial size is not the max size...

##########
File path: processing/src/main/java/org/apache/druid/segment/StringDimensionIndexer.java
##########
@@ -60,20 +60,29 @@ private static String emptyToNullIfNeeded(@Nullable Object o)
   private final MultiValueHandling multiValueHandling;
   private final boolean hasBitmapIndexes;
   private final boolean hasSpatialIndexes;
+  private final boolean useMaxMemoryEstimates;
   private volatile boolean hasMultipleValues = false;
 
-  public StringDimensionIndexer(MultiValueHandling multiValueHandling, boolean hasBitmapIndexes, boolean hasSpatialIndexes)
+  public StringDimensionIndexer(
+      MultiValueHandling multiValueHandling,
+      boolean hasBitmapIndexes,
+      boolean hasSpatialIndexes,
+      boolean useMaxMemoryEstimates
+  )
   {
+    super(useMaxMemoryEstimates ? new DimensionDictionary<>() : new StringDimensionDictionary());

Review comment:
       nit: this seems strange/confusing, why wouldn't `StringDimensionIndexer` always use `StringDimensionDictionary` here? it seems like could just pass in a value of `false` to control the value of `computeOnHeapSize` instead of sometimes not using `StringDimensionDictionary`

##########
File path: processing/src/main/java/org/apache/druid/segment/StringDimensionIndexer.java
##########
@@ -60,20 +60,29 @@ private static String emptyToNullIfNeeded(@Nullable Object o)
   private final MultiValueHandling multiValueHandling;
   private final boolean hasBitmapIndexes;
   private final boolean hasSpatialIndexes;
+  private final boolean useMaxMemoryEstimates;
   private volatile boolean hasMultipleValues = false;
 
-  public StringDimensionIndexer(MultiValueHandling multiValueHandling, boolean hasBitmapIndexes, boolean hasSpatialIndexes)
+  public StringDimensionIndexer(
+      MultiValueHandling multiValueHandling,
+      boolean hasBitmapIndexes,
+      boolean hasSpatialIndexes,
+      boolean useMaxMemoryEstimates
+  )
   {
+    super(useMaxMemoryEstimates ? new DimensionDictionary<>() : new StringDimensionDictionary());
     this.multiValueHandling = multiValueHandling == null ? MultiValueHandling.ofDefault() : multiValueHandling;
     this.hasBitmapIndexes = hasBitmapIndexes;
     this.hasSpatialIndexes = hasSpatialIndexes;
+    this.useMaxMemoryEstimates = useMaxMemoryEstimates;
   }
 
   @Override
-  public int[] processRowValsToUnsortedEncodedKeyComponent(@Nullable Object dimValues, boolean reportParseExceptions)
+  public EncodedKeyComponent<int[]> processRowValsToUnsortedEncodedKeyComponent(@Nullable Object dimValues, boolean reportParseExceptions)
   {
     final int[] encodedDimensionValues;
     final int oldDictSize = dimLookup.size();
+    final long oldDictSizeInBytes = useMaxMemoryEstimates ? 0 : dimLookup.sizeInBytes();

Review comment:
       nit: this is only used inside of the else near the end of the method

##########
File path: extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregator.java
##########
@@ -21,23 +21,43 @@
 
 import org.apache.datasketches.Family;
 import org.apache.datasketches.theta.SetOperation;
+import org.apache.datasketches.theta.Sketch;
 import org.apache.datasketches.theta.Union;
 import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.query.aggregation.Aggregator;
 import org.apache.druid.segment.BaseObjectColumnValueSelector;
 
 import javax.annotation.Nullable;
+import java.lang.reflect.Field;
 import java.util.List;
 
 public class SketchAggregator implements Aggregator
 {
+
   private final BaseObjectColumnValueSelector selector;
   private final int size;
 
   @Nullable
   private Union union;
 
+  @Nullable
+  private Sketch sketch;
+
+  @Nullable
+  private static final Field SKETCH_FIELD;
+
+  static {
+    try {
+      SKETCH_FIELD = Class.forName("org.apache.datasketches.theta.UnionImpl")
+                          .getDeclaredField("gadget_");
+      SKETCH_FIELD.setAccessible(true);
+    }
+    catch (NoSuchFieldException | ClassNotFoundException e) {
+      throw new ISE(e, "Could not initialize SketchAggregator");
+    }

Review comment:
       this seems worth a comment, and maybe a link to the code?

##########
File path: processing/src/main/java/org/apache/druid/query/aggregation/Aggregator.java
##########
@@ -42,6 +42,12 @@
 {
   void aggregate();
 
+  default long aggregateWithSize()

Review comment:
       what is the contract of this value, I don't see the word estimate in here, but think it probably should be... should implementors over-estimate if exact sizing is not possible or is under-estimating fine? Should there be a warning that the default estimate is used? (i imagine this would be very noisy if it is done per aggregate call... so don't really recommend doing it here or anything...)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a change in pull request #12073: Update Aggregator and AggregatorFactory interfaces to improve mem estimates

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on a change in pull request #12073:
URL: https://github.com/apache/druid/pull/12073#discussion_r792352305



##########
File path: extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregator.java
##########
@@ -21,23 +21,43 @@
 
 import org.apache.datasketches.Family;
 import org.apache.datasketches.theta.SetOperation;
+import org.apache.datasketches.theta.Sketch;
 import org.apache.datasketches.theta.Union;
 import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.query.aggregation.Aggregator;
 import org.apache.druid.segment.BaseObjectColumnValueSelector;
 
 import javax.annotation.Nullable;
+import java.lang.reflect.Field;
 import java.util.List;
 
 public class SketchAggregator implements Aggregator
 {
+
   private final BaseObjectColumnValueSelector selector;
   private final int size;
 
   @Nullable
   private Union union;
 
+  @Nullable
+  private Sketch sketch;
+
+  @Nullable
+  private static final Field SKETCH_FIELD;
+
+  static {
+    try {
+      SKETCH_FIELD = Class.forName("org.apache.datasketches.theta.UnionImpl")
+                          .getDeclaredField("gadget_");
+      SKETCH_FIELD.setAccessible(true);
+    }
+    catch (NoSuchFieldException | ClassNotFoundException e) {
+      throw new ISE(e, "Could not initialize SketchAggregator");
+    }

Review comment:
       exception in static initialization blocks can surface as very weird errors. 
   http://javaeesupportpatterns.blogspot.com/2012/07/javalangnoclassdeffounderror-how-to.html.
   
   Maybe we can just move this initialization to the constructor? We need not worry too much about thread safety since it's ok even if SKETCH_FIELD gets constructed twice. 
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] lgtm-com[bot] commented on pull request #12073: [WIP] Update Aggregator and AggregatorFactory interfaces to improve mem estimates

Posted by GitBox <gi...@apache.org>.
lgtm-com[bot] commented on pull request #12073:
URL: https://github.com/apache/druid/pull/12073#issuecomment-1003881227


   This pull request **introduces 2 alerts** when merging 780e50da8202e31f69d9c6ceacdd42ed4f14e57c into fe71fc414fbe0627290487cce59055dd65794ff8 - [view on LGTM.com](https://lgtm.com/projects/g/apache/druid/rev/pr-7f379f318b591ec382ebdf0bf6436ad5d14e4860)
   
   **new alerts:**
   
   * 1 for Result of multiplication cast to wider type
   * 1 for Useless null check


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org