You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ab...@apache.org on 2021/11/28 07:51:54 UTC

[druid] branch master updated: Improve on-heap aggregator footprint estimates. (#11950)

This is an automated email from the ASF dual-hosted git repository.

abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 93aeaf4  Improve on-heap aggregator footprint estimates. (#11950)
93aeaf4 is described below

commit 93aeaf4801f65826f01cac0a3eedb4b4d3fdecef
Author: Gian Merlino <gi...@gmail.com>
AuthorDate: Sat Nov 27 23:51:24 2021 -0800

    Improve on-heap aggregator footprint estimates. (#11950)
    
    
    Add a "guessAggregatorHeapFootprint" method to AggregatorFactory that
    mitigates #6743 by enabling heap footprint estimates based on a specific
    number of rows. The idea is that at ingestion time, the number of rows
    that go into an aggregator will be 1 (if rollup is off) or will likely
    be a small number (if rollup is on).
    
    It's a heuristic, because of course nothing guarantees that the rollup
    ratio is a small number. But it's a common case, and I expect this logic
    to go wrong much less often than the current logic. Also, when it does
    go wrong, users can fix it by lowering maxRowsInMemory or
    maxBytesInMemory. The current situation is unintuitive: when the
    estimation goes wrong, users get an OOME, but actually they need to
    *raise* these limits to fix it.
---
 .../quantiles/DoublesSketchAggregatorFactory.java  |  6 ++++++
 .../theta/SketchAggregatorFactory.java             | 25 ++++++++++++++++++++++
 .../DoublesSketchAggregatorFactoryTest.java        | 15 +++++++++++++
 .../theta/SketchAggregatorFactoryTest.java         | 25 ++++++++++++++++++++++
 .../druid/query/aggregation/AggregatorFactory.java | 17 +++++++++++++++
 .../incremental/OnheapIncrementalIndex.java        | 23 +++++++++++++++-----
 6 files changed, 106 insertions(+), 5 deletions(-)

diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactory.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactory.java
index 4f7f4cd..145ffcf 100644
--- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactory.java
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactory.java
@@ -298,6 +298,12 @@ public class DoublesSketchAggregatorFactory extends AggregatorFactory
     return Collections.singletonList(fieldName);
   }
 
+  @Override
+  public int guessAggregatorHeapFootprint(long rows)
+  {
+    return DoublesSketch.getUpdatableStorageBytes(k, rows);
+  }
+
   // Quantiles sketches never stop growing, but they do so very slowly.
   // This size must suffice for overwhelming majority of sketches,
   // but some sketches may request more memory on heap and move there
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java
index 7a10a16..a57c547 100644
--- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java
@@ -21,6 +21,7 @@ package org.apache.druid.query.aggregation.datasketches.theta;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
 import org.apache.datasketches.Family;
 import org.apache.datasketches.Util;
 import org.apache.datasketches.theta.SetOperation;
@@ -48,6 +49,13 @@ public abstract class SketchAggregatorFactory extends AggregatorFactory
 {
   public static final int DEFAULT_MAX_SKETCH_SIZE = 16384;
 
+  // Smallest number of entries in an Aggregator. Each entry is a long. Based on the constructor of
+  // HeapQuickSelectSketch and used by guessAggregatorHeapFootprint.
+  private static final int MIN_ENTRIES_PER_AGGREGATOR = 1 << Util.MIN_LG_ARR_LONGS;
+
+  // Largest preamble size for the sketch stored in an Aggregator, in bytes. Based on Util.getMaxUnionBytes.
+  private static final int LONGEST_POSSIBLE_PREAMBLE_BYTES = Family.UNION.getMaxPreLongs() << 3;
+
   protected final String name;
   protected final String fieldName;
   protected final int size;
@@ -171,6 +179,23 @@ public abstract class SketchAggregatorFactory extends AggregatorFactory
   }
 
   @Override
+  public int guessAggregatorHeapFootprint(long rows)
+  {
+    final int maxEntries = size * 2;
+    final int expectedEntries;
+
+    if (rows > maxEntries) {
+      expectedEntries = maxEntries;
+    } else {
+      // rows is within int range since it's <= maxEntries, so casting is OK.
+      expectedEntries = Math.max(MIN_ENTRIES_PER_AGGREGATOR, Util.ceilingPowerOf2(Ints.checkedCast(rows)));
+    }
+
+    // 8 bytes per entry + largest possible preamble.
+    return Long.BYTES * expectedEntries + LONGEST_POSSIBLE_PREAMBLE_BYTES;
+  }
+
+  @Override
   public int getMaxIntermediateSize()
   {
     return SetOperation.getMaxUnionBytes(size);
diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactoryTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactoryTest.java
index cf1f4f9..d4867c7 100644
--- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactoryTest.java
+++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactoryTest.java
@@ -84,6 +84,21 @@ public class DoublesSketchAggregatorFactoryTest
   }
 
   @Test
+  public void testGuessAggregatorHeapFootprint()
+  {
+    DoublesSketchAggregatorFactory factory = new DoublesSketchAggregatorFactory(
+        "myFactory",
+        "myField",
+        128,
+        null
+    );
+    Assert.assertEquals(64, factory.guessAggregatorHeapFootprint(1));
+    Assert.assertEquals(1056, factory.guessAggregatorHeapFootprint(100));
+    Assert.assertEquals(4128, factory.guessAggregatorHeapFootprint(1000));
+    Assert.assertEquals(34848, factory.guessAggregatorHeapFootprint(1_000_000_000_000L));
+  }
+
+  @Test
   public void testMaxIntermediateSize()
   {
     DoublesSketchAggregatorFactory factory = new DoublesSketchAggregatorFactory(
diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregatorFactoryTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregatorFactoryTest.java
index bd495ce..61efd5f 100644
--- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregatorFactoryTest.java
+++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregatorFactoryTest.java
@@ -36,6 +36,31 @@ import org.junit.Test;
 
 public class SketchAggregatorFactoryTest
 {
+  private static final SketchMergeAggregatorFactory AGGREGATOR_16384 =
+      new SketchMergeAggregatorFactory("x", "x", 16384, null, false, null);
+
+  private static final SketchMergeAggregatorFactory AGGREGATOR_32768 =
+      new SketchMergeAggregatorFactory("x", "x", 32768, null, false, null);
+
+  @Test
+  public void testGuessAggregatorHeapFootprint()
+  {
+    Assert.assertEquals(288, AGGREGATOR_16384.guessAggregatorHeapFootprint(1));
+    Assert.assertEquals(1056, AGGREGATOR_16384.guessAggregatorHeapFootprint(100));
+    Assert.assertEquals(262176, AGGREGATOR_16384.guessAggregatorHeapFootprint(1_000_000_000_000L));
+
+    Assert.assertEquals(288, AGGREGATOR_32768.guessAggregatorHeapFootprint(1));
+    Assert.assertEquals(1056, AGGREGATOR_32768.guessAggregatorHeapFootprint(100));
+    Assert.assertEquals(524320, AGGREGATOR_32768.guessAggregatorHeapFootprint(1_000_000_000_000L));
+  }
+
+  @Test
+  public void testMaxIntermediateSize()
+  {
+    Assert.assertEquals(262176, AGGREGATOR_16384.getMaxIntermediateSize());
+    Assert.assertEquals(524320, AGGREGATOR_32768.getMaxIntermediateSize());
+  }
+
   @Test
   public void testResultArraySignature()
   {
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorFactory.java
index f0e3837..45f9328 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorFactory.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorFactory.java
@@ -303,6 +303,23 @@ public abstract class AggregatorFactory implements Cacheable
   }
 
   /**
+   * Returns a best guess as to how much memory the on-heap {@link Aggregator} returned by {@link #factorize} will
+   * require when a certain number of rows have been aggregated into it.
+   *
+   * The main user of this method is {@link org.apache.druid.segment.incremental.OnheapIncrementalIndex}, which
+   * uses it to determine when to persist the current in-memory data to disk.
+   *
+   * Important note for callers! In nearly all cases, callers that wish to constrain memory would be better off
+   * using {@link #factorizeBuffered} or {@link #factorizeVector}, which offer precise control over how much memory
+   * is being used.
+   */
+  public int guessAggregatorHeapFootprint(long rows)
+  {
+    // By default, guess that on-heap footprint is equal to off-heap footprint.
+    return getMaxIntermediateSizeWithNulls();
+  }
+
+  /**
    * Return a potentially optimized form of this AggregatorFactory for per-segment queries.
    */
   public AggregatorFactory optimizeForSegment(PerSegmentQueryOptimizationContext optimizationContext)
diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
index 8ebbeaf..bb24ae4 100644
--- a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
+++ b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
@@ -44,7 +44,6 @@ import org.apache.druid.utils.JvmUtils;
 import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -59,6 +58,15 @@ import java.util.concurrent.atomic.AtomicLong;
 public class OnheapIncrementalIndex extends IncrementalIndex
 {
   private static final Logger log = new Logger(OnheapIncrementalIndex.class);
+
+  /**
+   * Constant factor provided to {@link AggregatorFactory#guessAggregatorHeapFootprint(long)} for footprint estimates.
+   * This figure is large enough to catch most common rollup ratios, but not so large that it will cause persists to
+   * happen too often. If an actual workload involves a much higher rollup ratio, then this may lead to excessive
+   * heap usage. Users would have to work around that by lowering maxRowsInMemory or maxBytesInMemory.
+   */
+  private static final long ROLLUP_RATIO_FOR_AGGREGATOR_FOOTPRINT_ESTIMATION = 100;
+
   /**
    * overhead per {@link ConcurrentHashMap.Node}  or {@link java.util.concurrent.ConcurrentSkipListMap.Node} object
    */
@@ -113,11 +121,16 @@ public class OnheapIncrementalIndex extends IncrementalIndex
    */
   private static long getMaxBytesPerRowForAggregators(IncrementalIndexSchema incrementalIndexSchema)
   {
+    final long rowsPerAggregator =
+        incrementalIndexSchema.isRollup() ? ROLLUP_RATIO_FOR_AGGREGATOR_FOOTPRINT_ESTIMATION : 1;
+
     long maxAggregatorIntermediateSize = ((long) Integer.BYTES) * incrementalIndexSchema.getMetrics().length;
-    maxAggregatorIntermediateSize += Arrays.stream(incrementalIndexSchema.getMetrics())
-                                           .mapToLong(aggregator -> aggregator.getMaxIntermediateSizeWithNulls()
-                                                                    + Long.BYTES * 2L)
-                                           .sum();
+
+    for (final AggregatorFactory aggregator : incrementalIndexSchema.getMetrics()) {
+      maxAggregatorIntermediateSize +=
+          (long) aggregator.guessAggregatorHeapFootprint(rowsPerAggregator) + 2L * Long.BYTES;
+    }
+
     return maxAggregatorIntermediateSize;
   }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org