You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by gi...@apache.org on 2019/07/31 15:18:52 UTC

[incubator-druid] branch master updated: HllSketch Merge/Build BufferAggregators: Speed up init with prebuilt sketch. (#8194)

This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 6346131  HllSketch Merge/Build BufferAggregators: Speed up init with prebuilt sketch. (#8194)
6346131 is described below

commit 63461311f8d38fa86d7007756f2320d7829ccd9a
Author: Gian Merlino <gi...@gmail.com>
AuthorDate: Wed Jul 31 08:18:42 2019 -0700

    HllSketch Merge/Build BufferAggregators: Speed up init with prebuilt sketch. (#8194)
    
    * HllSketchMergeBufferAggregator: Speed up init by copying prebuilt sketch.
    
    * Remove useless writableRegion call.
    
    * POM variables.
    
    * Fix missing reposition.
    
    * Apply similar optimization to HllSketchBuildBufferAggregator.
    
    * Rename emptySketch -> emptyUnion in merge flavor.
    
    * Adjustments based on review.
    
    * Comment update.
    
    * Additional updates.
    
    * Comment push.
---
 .../druid/benchmark/DataSketchesHllBenchmark.java  | 123 +++++++++++++++++++++
 extensions-core/datasketches/pom.xml               |  12 +-
 .../hll/HllSketchBuildBufferAggregator.java        |  34 +++++-
 .../hll/HllSketchMergeBufferAggregator.java        |  35 ++++--
 4 files changed, 194 insertions(+), 10 deletions(-)

diff --git a/benchmarks/src/main/java/org/apache/druid/benchmark/DataSketchesHllBenchmark.java b/benchmarks/src/main/java/org/apache/druid/benchmark/DataSketchesHllBenchmark.java
new file mode 100644
index 0000000..3493f55
--- /dev/null
+++ b/benchmarks/src/main/java/org/apache/druid/benchmark/DataSketchesHllBenchmark.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.benchmark;
+
+import com.yahoo.sketches.hll.HllSketch;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.BufferAggregator;
+import org.apache.druid.query.aggregation.datasketches.hll.HllSketchMergeAggregatorFactory;
+import org.apache.druid.query.dimension.DimensionSpec;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.DimensionSelector;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.util.concurrent.TimeUnit;
+
+@BenchmarkMode(Mode.Throughput)
+@OutputTimeUnit(TimeUnit.SECONDS)
+@Warmup(iterations = 5)
+@Measurement(iterations = 15)
+@Fork(1)
+@State(Scope.Benchmark)
+public class DataSketchesHllBenchmark
+{
+  private final AggregatorFactory aggregatorFactory = new HllSketchMergeAggregatorFactory(
+      "hll",
+      "hll",
+      null,
+      null,
+      false
+  );
+
+  private final ByteBuffer buf = ByteBuffer.allocateDirect(aggregatorFactory.getMaxIntermediateSize());
+
+  private BufferAggregator aggregator;
+
+  @Setup(Level.Trial)
+  public void setUp()
+  {
+    aggregator = aggregatorFactory.factorizeBuffered(
+        new ColumnSelectorFactory()
+        {
+          @Override
+          public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec)
+          {
+            return null;
+          }
+
+          @Override
+          public ColumnValueSelector makeColumnValueSelector(String columnName)
+          {
+            return null;
+          }
+
+          @Nullable
+          @Override
+          public ColumnCapabilities getColumnCapabilities(String column)
+          {
+            return null;
+          }
+        }
+    );
+  }
+
+  @TearDown(Level.Trial)
+  public void tearDown()
+  {
+    aggregator.close();
+    aggregator = null;
+  }
+
+  @Benchmark
+  public void init(Blackhole bh)
+  {
+    aggregator.init(buf, 0);
+  }
+
+  @Benchmark
+  public Object initAndGet()
+  {
+    aggregator.init(buf, 0);
+    return aggregator.get(buf, 0);
+  }
+
+  @Benchmark
+  public Object initAndSerde()
+  {
+    aggregator.init(buf, 0);
+    return aggregatorFactory.deserialize(((HllSketch) aggregator.get(buf, 0)).toCompactByteArray());
+  }
+}
diff --git a/extensions-core/datasketches/pom.xml b/extensions-core/datasketches/pom.xml
index db91883..151b554 100644
--- a/extensions-core/datasketches/pom.xml
+++ b/extensions-core/datasketches/pom.xml
@@ -34,11 +34,16 @@
     <relativePath>../../pom.xml</relativePath>
   </parent>
 
+  <properties>
+    <datasketches.core.version>0.13.4</datasketches.core.version>
+    <datasketches.memory.version>0.12.2</datasketches.memory.version>
+  </properties>
+
   <dependencies>
     <dependency>
       <groupId>com.yahoo.datasketches</groupId>
       <artifactId>sketches-core</artifactId>
-      <version>0.13.4</version>
+      <version>${datasketches.core.version}</version>
       <exclusions>
         <exclusion>
           <groupId>com.google.code.findbugs</groupId>
@@ -47,6 +52,11 @@
       </exclusions>
     </dependency>
     <dependency>
+      <groupId>com.yahoo.datasketches</groupId>
+      <artifactId>memory</artifactId>
+      <version>${datasketches.memory.version}</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.commons</groupId>
       <artifactId>commons-math3</artifactId>
     </dependency>
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregator.java
index 5532866..5789127 100644
--- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregator.java
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregator.java
@@ -23,6 +23,7 @@ import com.google.common.util.concurrent.Striped;
 import com.yahoo.memory.WritableMemory;
 import com.yahoo.sketches.hll.HllSketch;
 import com.yahoo.sketches.hll.TgtHllType;
+import com.yahoo.sketches.hll.Union;
 import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
 import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
 import org.apache.druid.query.aggregation.BufferAggregator;
@@ -42,7 +43,9 @@ import java.util.concurrent.locks.ReadWriteLock;
 public class HllSketchBuildBufferAggregator implements BufferAggregator
 {
 
-  /** for locking per buffer position (power of 2 to make index computation faster) */
+  /**
+   * for locking per buffer position (power of 2 to make index computation faster)
+   */
   private static final int NUM_STRIPES = 64;
 
   private final ColumnValueSelector<Object> selector;
@@ -53,6 +56,13 @@ public class HllSketchBuildBufferAggregator implements BufferAggregator
   private final IdentityHashMap<ByteBuffer, Int2ObjectMap<HllSketch>> sketchCache = new IdentityHashMap<>();
   private final Striped<ReadWriteLock> stripedLock = Striped.readWriteLock(NUM_STRIPES);
 
+  /**
+   * Used by {@link #init(ByteBuffer, int)}. We initialize by copying a prebuilt empty HllSketch image.
+   * {@link HllSketchMergeBufferAggregator} does something similar, but different enough that we don't share code. The
+   * "build" flavor uses {@link HllSketch} objects and the "merge" flavor uses {@link Union} objects.
+   */
+  private final byte[] emptySketch;
+
   public HllSketchBuildBufferAggregator(
       final ColumnValueSelector<Object> selector,
       final int lgK,
@@ -64,13 +74,29 @@ public class HllSketchBuildBufferAggregator implements BufferAggregator
     this.lgK = lgK;
     this.tgtHllType = tgtHllType;
     this.size = size;
+    this.emptySketch = new byte[size];
+
+    //noinspection ResultOfObjectAllocationIgnored (HllSketch writes to "emptySketch" as a side effect of construction)
+    new HllSketch(lgK, tgtHllType, WritableMemory.wrap(emptySketch));
   }
 
   @Override
   public void init(final ByteBuffer buf, final int position)
   {
+    // Copy prebuilt empty sketch object.
+
+    final int oldPosition = buf.position();
+    try {
+      buf.position(position);
+      buf.put(emptySketch);
+    }
+    finally {
+      buf.position(oldPosition);
+    }
+
+    // Add an HllSketch for this chunk to our sketchCache.
     final WritableMemory mem = getMemory(buf).writableRegion(position, size);
-    putSketchIntoCache(buf, position, new HllSketch(lgK, tgtHllType, mem));
+    putSketchIntoCache(buf, position, HllSketch.writableWrap(mem));
   }
 
   /**
@@ -162,7 +188,9 @@ public class HllSketchBuildBufferAggregator implements BufferAggregator
 
   /**
    * compute lock index to avoid boxing in Striped.get() call
+   *
    * @param position
+   *
    * @return index
    */
   static int lockIndex(final int position)
@@ -172,7 +200,9 @@ public class HllSketchBuildBufferAggregator implements BufferAggregator
 
   /**
    * see https://github.com/google/guava/blob/master/guava/src/com/google/common/util/concurrent/Striped.java#L536-L548
+   *
    * @param hashCode
+   *
    * @return smeared hashCode
    */
   private static int smear(int hashCode)
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregator.java
index 0167e1a..aa27706 100644
--- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregator.java
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregator.java
@@ -40,7 +40,9 @@ import java.util.concurrent.locks.ReadWriteLock;
 public class HllSketchMergeBufferAggregator implements BufferAggregator
 {
 
-  /** for locking per buffer position (power of 2 to make index computation faster) */
+  /**
+   * for locking per buffer position (power of 2 to make index computation faster)
+   */
   private static final int NUM_STRIPES = 64;
 
   private final ColumnValueSelector<HllSketch> selector;
@@ -49,6 +51,13 @@ public class HllSketchMergeBufferAggregator implements BufferAggregator
   private final int size;
   private final Striped<ReadWriteLock> stripedLock = Striped.readWriteLock(NUM_STRIPES);
 
+  /**
+   * Used by {@link #init(ByteBuffer, int)}. We initialize by copying a prebuilt empty Union image.
+   * {@link HllSketchBuildBufferAggregator} does something similar, but different enough that we don't share code. The
+   * "build" flavor uses {@link HllSketch} objects and the "merge" flavor uses {@link Union} objects.
+   */
+  private final byte[] emptyUnion;
+
   public HllSketchMergeBufferAggregator(
       final ColumnValueSelector<HllSketch> selector,
       final int lgK,
@@ -60,17 +69,29 @@ public class HllSketchMergeBufferAggregator implements BufferAggregator
     this.lgK = lgK;
     this.tgtHllType = tgtHllType;
     this.size = size;
+    this.emptyUnion = new byte[size];
+
+    //noinspection ResultOfObjectAllocationIgnored (Union writes to "emptyUnion" as a side effect of construction)
+    new Union(lgK, WritableMemory.wrap(emptyUnion));
   }
 
-  @SuppressWarnings("ResultOfObjectAllocationIgnored")
   @Override
   public void init(final ByteBuffer buf, final int position)
   {
-    final WritableMemory mem = WritableMemory.wrap(buf, ByteOrder.LITTLE_ENDIAN).writableRegion(position, size);
-    // Not necessary to keep the constructed object since it is cheap to reconstruct by wrapping the memory.
-    // The objects are not cached as in BuildBufferAggregator since they never exceed the max size and never move.
-    // So it is easier to reconstruct them by wrapping memory then to keep position-to-object mappings. 
-    new Union(lgK, mem);
+    // Copy prebuilt empty union object.
+    // Not necessary to cache a Union wrapper around the initialized memory, because:
+    //  - It is cheap to reconstruct by re-wrapping the memory in "aggregate" and "get".
+    //  - Unlike the HllSketch objects used by HllSketchBuildBufferAggregator, our Union objects never exceed the
+    //    max size and therefore do not need to be potentially moved in-heap.
+
+    final int oldPosition = buf.position();
+    try {
+      buf.position(position);
+      buf.put(emptyUnion);
+    }
+    finally {
+      buf.position(oldPosition);
+    }
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org