You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by gi...@apache.org on 2019/07/31 15:18:52 UTC
[incubator-druid] branch master updated: HllSketch Merge/Build
BufferAggregators: Speed up init with prebuilt sketch. (#8194)
This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new 6346131 HllSketch Merge/Build BufferAggregators: Speed up init with prebuilt sketch. (#8194)
6346131 is described below
commit 63461311f8d38fa86d7007756f2320d7829ccd9a
Author: Gian Merlino <gi...@gmail.com>
AuthorDate: Wed Jul 31 08:18:42 2019 -0700
HllSketch Merge/Build BufferAggregators: Speed up init with prebuilt sketch. (#8194)
* HllSketchMergeBufferAggregator: Speed up init by copying prebuilt sketch.
* Remove useless writableRegion call.
* POM variables.
* Fix missing reposition.
* Apply similar optimization to HllSketchBuildBufferAggregator.
* Rename emptySketch -> emptyUnion in merge flavor.
* Adjustments based on review.
* Comment update.
* Additional updates.
* Comment push.
---
.../druid/benchmark/DataSketchesHllBenchmark.java | 123 +++++++++++++++++++++
extensions-core/datasketches/pom.xml | 12 +-
.../hll/HllSketchBuildBufferAggregator.java | 34 +++++-
.../hll/HllSketchMergeBufferAggregator.java | 35 ++++--
4 files changed, 194 insertions(+), 10 deletions(-)
diff --git a/benchmarks/src/main/java/org/apache/druid/benchmark/DataSketchesHllBenchmark.java b/benchmarks/src/main/java/org/apache/druid/benchmark/DataSketchesHllBenchmark.java
new file mode 100644
index 0000000..3493f55
--- /dev/null
+++ b/benchmarks/src/main/java/org/apache/druid/benchmark/DataSketchesHllBenchmark.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.benchmark;
+
+import com.yahoo.sketches.hll.HllSketch;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.BufferAggregator;
+import org.apache.druid.query.aggregation.datasketches.hll.HllSketchMergeAggregatorFactory;
+import org.apache.druid.query.dimension.DimensionSpec;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.DimensionSelector;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.util.concurrent.TimeUnit;
+
+@BenchmarkMode(Mode.Throughput)
+@OutputTimeUnit(TimeUnit.SECONDS)
+@Warmup(iterations = 5)
+@Measurement(iterations = 15)
+@Fork(1)
+@State(Scope.Benchmark)
+public class DataSketchesHllBenchmark
+{
+ private final AggregatorFactory aggregatorFactory = new HllSketchMergeAggregatorFactory(
+ "hll",
+ "hll",
+ null,
+ null,
+ false
+ );
+
+ private final ByteBuffer buf = ByteBuffer.allocateDirect(aggregatorFactory.getMaxIntermediateSize());
+
+ private BufferAggregator aggregator;
+
+ @Setup(Level.Trial)
+ public void setUp()
+ {
+ aggregator = aggregatorFactory.factorizeBuffered(
+ new ColumnSelectorFactory()
+ {
+ @Override
+ public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec)
+ {
+ return null;
+ }
+
+ @Override
+ public ColumnValueSelector makeColumnValueSelector(String columnName)
+ {
+ return null;
+ }
+
+ @Nullable
+ @Override
+ public ColumnCapabilities getColumnCapabilities(String column)
+ {
+ return null;
+ }
+ }
+ );
+ }
+
+ @TearDown(Level.Trial)
+ public void tearDown()
+ {
+ aggregator.close();
+ aggregator = null;
+ }
+
+ @Benchmark
+ public void init(Blackhole bh)
+ {
+ aggregator.init(buf, 0);
+ }
+
+ @Benchmark
+ public Object initAndGet()
+ {
+ aggregator.init(buf, 0);
+ return aggregator.get(buf, 0);
+ }
+
+ @Benchmark
+ public Object initAndSerde()
+ {
+ aggregator.init(buf, 0);
+ return aggregatorFactory.deserialize(((HllSketch) aggregator.get(buf, 0)).toCompactByteArray());
+ }
+}
diff --git a/extensions-core/datasketches/pom.xml b/extensions-core/datasketches/pom.xml
index db91883..151b554 100644
--- a/extensions-core/datasketches/pom.xml
+++ b/extensions-core/datasketches/pom.xml
@@ -34,11 +34,16 @@
<relativePath>../../pom.xml</relativePath>
</parent>
+ <properties>
+ <datasketches.core.version>0.13.4</datasketches.core.version>
+ <datasketches.memory.version>0.12.2</datasketches.memory.version>
+ </properties>
+
<dependencies>
<dependency>
<groupId>com.yahoo.datasketches</groupId>
<artifactId>sketches-core</artifactId>
- <version>0.13.4</version>
+ <version>${datasketches.core.version}</version>
<exclusions>
<exclusion>
<groupId>com.google.code.findbugs</groupId>
@@ -47,6 +52,11 @@
</exclusions>
</dependency>
<dependency>
+ <groupId>com.yahoo.datasketches</groupId>
+ <artifactId>memory</artifactId>
+ <version>${datasketches.memory.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-math3</artifactId>
</dependency>
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregator.java
index 5532866..5789127 100644
--- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregator.java
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregator.java
@@ -23,6 +23,7 @@ import com.google.common.util.concurrent.Striped;
import com.yahoo.memory.WritableMemory;
import com.yahoo.sketches.hll.HllSketch;
import com.yahoo.sketches.hll.TgtHllType;
+import com.yahoo.sketches.hll.Union;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import org.apache.druid.query.aggregation.BufferAggregator;
@@ -42,7 +43,9 @@ import java.util.concurrent.locks.ReadWriteLock;
public class HllSketchBuildBufferAggregator implements BufferAggregator
{
- /** for locking per buffer position (power of 2 to make index computation faster) */
+ /**
+ * for locking per buffer position (power of 2 to make index computation faster)
+ */
private static final int NUM_STRIPES = 64;
private final ColumnValueSelector<Object> selector;
@@ -53,6 +56,13 @@ public class HllSketchBuildBufferAggregator implements BufferAggregator
private final IdentityHashMap<ByteBuffer, Int2ObjectMap<HllSketch>> sketchCache = new IdentityHashMap<>();
private final Striped<ReadWriteLock> stripedLock = Striped.readWriteLock(NUM_STRIPES);
+ /**
+ * Used by {@link #init(ByteBuffer, int)}. We initialize by copying a prebuilt empty HllSketch image.
+ * {@link HllSketchMergeBufferAggregator} does something similar, but different enough that we don't share code. The
+ * "build" flavor uses {@link HllSketch} objects and the "merge" flavor uses {@link Union} objects.
+ */
+ private final byte[] emptySketch;
+
public HllSketchBuildBufferAggregator(
final ColumnValueSelector<Object> selector,
final int lgK,
@@ -64,13 +74,29 @@ public class HllSketchBuildBufferAggregator implements BufferAggregator
this.lgK = lgK;
this.tgtHllType = tgtHllType;
this.size = size;
+ this.emptySketch = new byte[size];
+
+ //noinspection ResultOfObjectAllocationIgnored (HllSketch writes to "emptySketch" as a side effect of construction)
+ new HllSketch(lgK, tgtHllType, WritableMemory.wrap(emptySketch));
}
@Override
public void init(final ByteBuffer buf, final int position)
{
+ // Copy prebuilt empty sketch object.
+
+ final int oldPosition = buf.position();
+ try {
+ buf.position(position);
+ buf.put(emptySketch);
+ }
+ finally {
+ buf.position(oldPosition);
+ }
+
+ // Add an HllSketch for this chunk to our sketchCache.
final WritableMemory mem = getMemory(buf).writableRegion(position, size);
- putSketchIntoCache(buf, position, new HllSketch(lgK, tgtHllType, mem));
+ putSketchIntoCache(buf, position, HllSketch.writableWrap(mem));
}
/**
@@ -162,7 +188,9 @@ public class HllSketchBuildBufferAggregator implements BufferAggregator
/**
* compute lock index to avoid boxing in Striped.get() call
+ *
* @param position
+ *
* @return index
*/
static int lockIndex(final int position)
@@ -172,7 +200,9 @@ public class HllSketchBuildBufferAggregator implements BufferAggregator
/**
* see https://github.com/google/guava/blob/master/guava/src/com/google/common/util/concurrent/Striped.java#L536-L548
+ *
* @param hashCode
+ *
* @return smeared hashCode
*/
private static int smear(int hashCode)
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregator.java
index 0167e1a..aa27706 100644
--- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregator.java
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregator.java
@@ -40,7 +40,9 @@ import java.util.concurrent.locks.ReadWriteLock;
public class HllSketchMergeBufferAggregator implements BufferAggregator
{
- /** for locking per buffer position (power of 2 to make index computation faster) */
+ /**
+ * for locking per buffer position (power of 2 to make index computation faster)
+ */
private static final int NUM_STRIPES = 64;
private final ColumnValueSelector<HllSketch> selector;
@@ -49,6 +51,13 @@ public class HllSketchMergeBufferAggregator implements BufferAggregator
private final int size;
private final Striped<ReadWriteLock> stripedLock = Striped.readWriteLock(NUM_STRIPES);
+ /**
+ * Used by {@link #init(ByteBuffer, int)}. We initialize by copying a prebuilt empty Union image.
+ * {@link HllSketchBuildBufferAggregator} does something similar, but different enough that we don't share code. The
+ * "build" flavor uses {@link HllSketch} objects and the "merge" flavor uses {@link Union} objects.
+ */
+ private final byte[] emptyUnion;
+
public HllSketchMergeBufferAggregator(
final ColumnValueSelector<HllSketch> selector,
final int lgK,
@@ -60,17 +69,29 @@ public class HllSketchMergeBufferAggregator implements BufferAggregator
this.lgK = lgK;
this.tgtHllType = tgtHllType;
this.size = size;
+ this.emptyUnion = new byte[size];
+
+ //noinspection ResultOfObjectAllocationIgnored (Union writes to "emptyUnion" as a side effect of construction)
+ new Union(lgK, WritableMemory.wrap(emptyUnion));
}
- @SuppressWarnings("ResultOfObjectAllocationIgnored")
@Override
public void init(final ByteBuffer buf, final int position)
{
- final WritableMemory mem = WritableMemory.wrap(buf, ByteOrder.LITTLE_ENDIAN).writableRegion(position, size);
- // Not necessary to keep the constructed object since it is cheap to reconstruct by wrapping the memory.
- // The objects are not cached as in BuildBufferAggregator since they never exceed the max size and never move.
- // So it is easier to reconstruct them by wrapping memory then to keep position-to-object mappings.
- new Union(lgK, mem);
+ // Copy prebuilt empty union object.
+ // Not necessary to cache a Union wrapper around the initialized memory, because:
+ // - It is cheap to reconstruct by re-wrapping the memory in "aggregate" and "get".
+ // - Unlike the HllSketch objects used by HllSketchBuildBufferAggregator, our Union objects never exceed the
+ // max size and therefore do not need to be potentially moved in-heap.
+
+ final int oldPosition = buf.position();
+ try {
+ buf.position(position);
+ buf.put(emptyUnion);
+ }
+ finally {
+ buf.position(oldPosition);
+ }
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org