You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2022/01/04 05:55:07 UTC

[GitHub] [druid] imply-cheddar commented on a change in pull request #12073: [WIP] Update Aggregator and AggregatorFactory interfaces to improve mem estimates

imply-cheddar commented on a change in pull request #12073:
URL: https://github.com/apache/druid/pull/12073#discussion_r777824055



##########
File path: extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregator.java
##########
@@ -21,23 +21,45 @@
 
 import org.apache.datasketches.Family;
 import org.apache.datasketches.theta.SetOperation;
+import org.apache.datasketches.theta.Sketch;
 import org.apache.datasketches.theta.Union;
 import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.query.aggregation.Aggregator;
 import org.apache.druid.segment.BaseObjectColumnValueSelector;
 
 import javax.annotation.Nullable;
+import java.lang.reflect.Field;
 import java.util.List;
 
 public class SketchAggregator implements Aggregator
 {
+  private static final Logger LOG = new Logger(SketchAggregator.class);

Review comment:
       This is the nittiest of nits, but the code tends to keep the name of the logger lower case.

##########
File path: extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregator.java
##########
@@ -21,23 +21,45 @@
 
 import org.apache.datasketches.Family;
 import org.apache.datasketches.theta.SetOperation;
+import org.apache.datasketches.theta.Sketch;
 import org.apache.datasketches.theta.Union;
 import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.query.aggregation.Aggregator;
 import org.apache.druid.segment.BaseObjectColumnValueSelector;
 
 import javax.annotation.Nullable;
+import java.lang.reflect.Field;
 import java.util.List;
 
 public class SketchAggregator implements Aggregator
 {
+  private static final Logger LOG = new Logger(SketchAggregator.class);
+
   private final BaseObjectColumnValueSelector selector;
   private final int size;
 
   @Nullable
   private Union union;
 
+  @Nullable
+  private Sketch sketch;
+
+  @Nullable
+  private static Field sketchField;

Review comment:
       This can be `final`

##########
File path: processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
##########
@@ -225,13 +236,19 @@ protected AddToFactsResult addToFacts(
    * </ul>
    *
    * @param key                          TimeAndDims key
-   * @param maxBytesPerRowForAggregators max size per aggregator
-   *
+   * @param maxBytesPerRowForAggregators max size per row for aggregators
+   * @param actualRowSizeForAggregators  actual aggregator size for this row
    * @return estimated size of row
    */
-  private long estimateRowSizeInBytes(IncrementalIndexRow key, long maxBytesPerRowForAggregators)
+  private long estimateRowSizeInBytes(
+      IncrementalIndexRow key,
+      long maxBytesPerRowForAggregators,
+      long actualRowSizeForAggregators
+  )
   {
-    return ROUGH_OVERHEAD_PER_MAP_ENTRY + key.estimateBytesInMemory() + maxBytesPerRowForAggregators;
+    return ROUGH_OVERHEAD_PER_MAP_ENTRY
+           + key.estimateBytesInMemory()
+           + (useMaxMemoryEstimates ? maxBytesPerRowForAggregators : actualRowSizeForAggregators);

Review comment:
       In reading this code I found myself wondering why push down the choice of which value to use so low and add an extra parameter here instead of change the call site of `estimateRowSizeInBytes` to check and pass the "correct" value?

##########
File path: processing/src/main/java/org/apache/druid/query/aggregation/SizedAggregator.java
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+/**
+ * Encapsulates an {@link Aggregator} and the initial size in bytes required by
+ * the Aggregator.
+ */
+public class SizedAggregator

Review comment:
       Naming: given that this isn't actually an `Aggregator` object, it shouldn't end with `Aggregator`.  How about `AggregatorAndSize` instead?

##########
File path: extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregator.java
##########
@@ -64,6 +86,25 @@ public void aggregate()
     }
   }
 
+  @Override
+  public long aggregateWithSize()

Review comment:
       As I read this code, there's 2 different "lifecycle"s happening.  One where normal `aggregate()` is being called and using `initUnion()` and another where `aggregateWithSize()` is being called and expecting something external to have called `getInitialSizeBytes()`.
   
   I find myself wondering if it wouldn't be nicer to have two concrete classes each specialized to each lifecycle and not trying to mix the code together.  There's probably still a lot of room for re-use, but with the current mixing of the code, it seems relatively simple to accidentally use one when the other should've been used (i.e. if we separate out the classes, then the `aggregate()` method for the sized one can throw an `UnsupportedOperationException` which will ensure that it's never called when sizing is supposed to be used.

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
##########
@@ -849,6 +849,8 @@ private TaskStatus generateAndPublishSegments(
     final PartitionsSpec partitionsSpec = partitionAnalysis.getPartitionsSpec();
     final IndexTuningConfig tuningConfig = ingestionSchema.getTuningConfig();
     final long pushTimeout = tuningConfig.getPushTimeout();
+    tuningConfig.useMaxMemoryEstimates = getContextValue(

Review comment:
       If it's on the tuning config, then it can be taken as another parameter from the tuning config's JSON and doesn't need to be on the context.  Setting values on the config like this is a bit dangerous, so let's just take it on the constructor and stop looking at the context of the Task.

##########
File path: processing/src/main/java/org/apache/druid/segment/DimensionDictionary.java
##########
@@ -96,6 +99,12 @@ public int size()
     }
   }
 
+  public long sizeInBytes()
+  {
+    // TODO: size of map/list object itself?

Review comment:
       Make sure to do something to remove the TODOs before merge.  Either implement or add a comment that explains what's missing and why it's potentially an issue or just delete.

##########
File path: processing/src/main/java/org/apache/druid/segment/DimensionDictionary.java
##########
@@ -114,6 +123,11 @@ public int add(@Nullable T originalValue)
       final int index = idToValue.size();
       valueToId.put(originalValue, index);
       idToValue.add(originalValue);
+
+      // Add size of the String and two of its references to the total size
+      // TODO: map entry overhead?
+      sizeInBytes.addAndGet(getObjectSize(originalValue) + Long.BYTES + Long.BYTES);

Review comment:
       Instead of relying on the comment to be able to read what this code is doing, you can write it as
   
   ```
   long sizeOfReference = Long.BYTES;
   long sizeOfString = getObjectSize(originalValue);
   sizeInBytes.addAndGet(sizeOfString + (2 * sizeOfReference));
   ```
   
   Now, the actual code is saying the same thing that your comment was.

##########
File path: extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregator.java
##########
@@ -133,4 +174,34 @@ static void updateUnion(Union union, Object update)
       throw new ISE("Illegal type received while theta sketch merging [%s]", update.getClass());
     }
   }
+
+  /**
+   * Gets the initial size of this aggregator in bytes.
+   */
+  public long getInitialSizeBytes()
+  {
+    // SketchAggregator has 3 references and an int
+    // UnionImpl has a reference, a short, a long, a boolean
+    long sizeOfReferences = 3L * Long.BYTES + Integer.BYTES
+                            + Long.BYTES + Short.BYTES + Long.BYTES + 1;
+    if (sketchField == null) {
+      return sizeOfReferences;
+    }
+
+    // Initialize the sketch if not already initialized
+    if (union == null) {
+      initUnion();

Review comment:
       This feels a little bit wrong to me to init on the initial sizing.  If it starts out `null`, then its size is `0`.  It seems like we should either do this lazily, OR we should initialize the union on construction.

##########
File path: processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
##########
@@ -240,22 +257,46 @@ public int getLastRowIndex()
     return indexIncrement.get() - 1;
   }
 
-  private void factorizeAggs(
+  /**
+   * Creates aggregators for the given aggregator factories.
+   *
+   * @return Total initial size in bytes required by all the aggregators.
+   * This value is non-zero only when {@link #useMaxMemoryEstimates} is false.
+   */
+  private long factorizeAggs(
       AggregatorFactory[] metrics,
       Aggregator[] aggs,
       ThreadLocal<InputRow> rowContainer,
       InputRow row
   )
   {
+    long totalInitialSizeBytes = 0L;
     rowContainer.set(row);
     for (int i = 0; i < metrics.length; i++) {
       final AggregatorFactory agg = metrics[i];
-      aggs[i] = agg.factorize(selectors.get(agg.getName()));
+
+      if (useMaxMemoryEstimates) {
+        aggs[i] = agg.factorize(selectors.get(agg.getName()));
+      } else {
+        SizedAggregator sizedAggregator =
+            agg.factorizeSized(selectors.get(agg.getName()));
+        aggs[i] = sizedAggregator.getAggregator();
+        totalInitialSizeBytes += sizedAggregator.getInitialSizeBytes();
+      }
     }
     rowContainer.set(null);
+
+    return useMaxMemoryEstimates ? 0 : totalInitialSizeBytes;

Review comment:
       You've got the same check in multiple places.  I don't think you need the one here as `totalInitialSizeBytes` will be 0 in the `useMaxMemoryEstimates` case anyway.  But if you really want to ensure that the two code paths don't mix, move the `if(useMaxMemoryEstimates)` check to the top level and write the loop twice.  Once in a block that always returns `0` and another in a block that returns an aggregated sum.

##########
File path: processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
##########
@@ -265,11 +306,12 @@ private void doAggregate(
   {
     rowContainer.set(row);
 
+    long totalIncrementalBytes = 0L;
     for (int i = 0; i < aggs.length; i++) {
       final Aggregator agg = aggs[i];
       synchronized (agg) {
         try {
-          agg.aggregate();
+          totalIncrementalBytes += agg.aggregateWithSize();

Review comment:
       This is going to call the `WithSize` version even when it's not supposed to (when it's using the max size).  Please adjust to only call the `WithSize` version when the flag is set.

##########
File path: server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java
##########
@@ -34,6 +34,13 @@
 
   boolean isSkipBytesInMemoryOverheadCheck();
 
+  /**
+   * Whether maximum memory usage should be considered in estimation.
+   */
+  default boolean isUseMaxMemoryEstimates() {

Review comment:
       Should this really be a default method instead of becoming a real thing on the config object?

##########
File path: extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregator.java
##########
@@ -21,23 +21,45 @@
 
 import org.apache.datasketches.Family;
 import org.apache.datasketches.theta.SetOperation;
+import org.apache.datasketches.theta.Sketch;
 import org.apache.datasketches.theta.Union;
 import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.query.aggregation.Aggregator;
 import org.apache.druid.segment.BaseObjectColumnValueSelector;
 
 import javax.annotation.Nullable;
+import java.lang.reflect.Field;
 import java.util.List;
 
 public class SketchAggregator implements Aggregator
 {
+  private static final Logger LOG = new Logger(SketchAggregator.class);
+
   private final BaseObjectColumnValueSelector selector;
   private final int size;
 
   @Nullable
   private Union union;
 
+  @Nullable
+  private Sketch sketch;
+
+  @Nullable
+  private static Field sketchField;
+  static {
+    try {
+      sketchField = Class.forName("org.apache.datasketches.theta.UnionImpl")
+                         .getDeclaredField("gadget_");
+      sketchField.setAccessible(true);
+    }
+    catch (NoSuchFieldException | ClassNotFoundException e) {
+      LOG.error(e, "Could not initialize 'sketchField'");

Review comment:
       This will only happen if someone happens to have loaded a new/different version of sketches than is actually depended on by this current code.  If that happens, this error will put something into the logs that will be ignored (people don't look at logs until something actually explodes) and then silently ignore things.  When they are silently ignored, the estimation becomes incorrect and potentially starts causing OOMs where OOMs didn't exist previously.  If this happens, it will be super hard to track down why it happened.
   
   I would recommend that we actually explode loudly throwing an error out of the static initializer (which should effectively kill the process from actually starting in the first place).  If we want a way for someone to say "I know what I'm doing, ignore this please", we can add an extra config that the error message in the exception points to as a way to ignore things.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org