You are viewing a plain text version of this content. The canonical link for it is here.
Posted to by GitBox <> on 2018/10/15 03:57:02 UTC

[GitHub] b-slim commented on a change in pull request #6327: Druid based on OakIncrementalIndex

b-slim commented on a change in pull request #6327: Druid based on OakIncrementalIndex

 File path: benchmarks/src/main/java/org/apache/druid/benchmark/query/
 @@ -0,0 +1,420 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.druid.benchmark.query;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.benchmark.datagen.BenchmarkDataGenerator;
+import org.apache.druid.benchmark.datagen.BenchmarkSchemaInfo;
+import org.apache.druid.benchmark.datagen.BenchmarkSchemas;
+import org.apache.druid.collections.StupidPool;
+import org.apache.druid.hll.HyperLogLogHash;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.offheap.OffheapBufferGenerator;
+import org.apache.druid.query.FinalizeResultsQueryRunner;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryPlus;
+import org.apache.druid.query.QueryRunner;
+import org.apache.druid.query.QueryRunnerFactory;
+import org.apache.druid.query.QueryToolChest;
+import org.apache.druid.query.Result;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.DoubleMinAggregatorFactory;
+import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
+import org.apache.druid.query.aggregation.LongMaxAggregatorFactory;
+import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
+import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
+import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
+import org.apache.druid.query.ordering.StringComparators;
+import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
+import org.apache.druid.query.spec.QuerySegmentSpec;
+import org.apache.druid.query.topn.DimensionTopNMetricSpec;
+import org.apache.druid.query.topn.TopNQuery;
+import org.apache.druid.query.topn.TopNQueryBuilder;
+import org.apache.druid.query.topn.TopNQueryConfig;
+import org.apache.druid.query.topn.TopNQueryQueryToolChest;
+import org.apache.druid.query.topn.TopNQueryRunnerFactory;
+import org.apache.druid.query.topn.TopNResultValue;
+import org.apache.druid.segment.IncrementalIndexSegment;
+import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.IndexMergerV9;
+import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.QueryableIndex;
+import org.apache.druid.segment.QueryableIndexSegment;
+import org.apache.druid.segment.column.ColumnConfig;
+import org.apache.druid.segment.incremental.IncrementalIndex;
+import org.apache.druid.segment.serde.ComplexMetrics;
+import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.infra.Blackhole;
+import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import java.util.List;
+import java.util.Map;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+@Fork(value = 1)
+@Warmup(iterations = 10)
+@Measurement(iterations = 25)
+public class OakTopNBenchmark
+  @Param({"1"})
+  private int numSegments;
+  @Param({"750000"})
+  private int rowsPerSegment;
+  @Param({"true", "false"})
+  private boolean rollup;
+  @Param({"true", "false"})
+  private boolean onheap;
+  @Param({"basic.A", "basic.numericSort", "basic.alphanumericSort"})
+  private String schemaAndQuery;
+  @Param({"10"})
+  private int threshold;
+  private static final Logger log = new Logger(OakTopNBenchmark.class);
+  private static final int RNG_SEED = 9999;
+  private static final IndexMergerV9 INDEX_MERGER_V9;
+  private static final IndexIO INDEX_IO;
+  public static final ObjectMapper JSON_MAPPER;
+  private List<IncrementalIndex> incIndexes;
+  private List<QueryableIndex> qIndexes;
+  private QueryRunnerFactory factory;
+  private BenchmarkSchemaInfo schemaInfo;
+  private TopNQueryBuilder queryBuilder;
+  private TopNQuery query;
+  private File tmpDir;
+  private ExecutorService executorService;
+  static {
+    JSON_MAPPER = new DefaultObjectMapper();
+    INDEX_IO = new IndexIO(
+        JSON_MAPPER,
+        new ColumnConfig()
+        {
+          @Override
+          public int columnCacheSizeBytes()
+          {
+            return 0;
+          }
+        }
+    );
+    INDEX_MERGER_V9 = new IndexMergerV9(JSON_MAPPER, INDEX_IO, OffHeapMemorySegmentWriteOutMediumFactory.instance());
+  }
+  private static final Map<String, Map<String, TopNQueryBuilder>> SCHEMA_QUERY_MAP = new LinkedHashMap<>();
+  private void setupQueries()
+  {
+    // queries for the basic schema
+    Map<String, TopNQueryBuilder> basicQueries = new LinkedHashMap<>();
+    BenchmarkSchemaInfo basicSchema = BenchmarkSchemas.SCHEMA_MAP.get("basic");
+    { // basic.A
+      QuerySegmentSpec intervalSpec = new MultipleIntervalSegmentSpec(Collections.singletonList(basicSchema.getDataInterval()));
+      List<AggregatorFactory> queryAggs = new ArrayList<>();
+      queryAggs.add(new LongSumAggregatorFactory("sumLongSequential", "sumLongSequential"));
+      queryAggs.add(new LongMaxAggregatorFactory("maxLongUniform", "maxLongUniform"));
+      queryAggs.add(new DoubleSumAggregatorFactory("sumFloatNormal", "sumFloatNormal"));
+      queryAggs.add(new DoubleMinAggregatorFactory("minFloatZipf", "minFloatZipf"));
+      queryAggs.add(new HyperUniquesAggregatorFactory("hyperUniquesMet", "hyper"));
+      TopNQueryBuilder queryBuilderA = new TopNQueryBuilder()
+              .dataSource("blah")
+              .granularity(Granularities.ALL)
+              .dimension("dimSequential")
+              .metric("sumFloatNormal")
+              .intervals(intervalSpec)
+              .aggregators(queryAggs);
+      basicQueries.put("A", queryBuilderA);
+    }
+    { // basic.numericSort
+      QuerySegmentSpec intervalSpec = new MultipleIntervalSegmentSpec(Collections.singletonList(basicSchema.getDataInterval()));
+      List<AggregatorFactory> queryAggs = new ArrayList<>();
+      queryAggs.add(new LongSumAggregatorFactory("sumLongSequential", "sumLongSequential"));
+      TopNQueryBuilder queryBuilderA = new TopNQueryBuilder()
+              .dataSource("blah")
+              .granularity(Granularities.ALL)
+              .dimension("dimUniform")
+              .metric(new DimensionTopNMetricSpec(null, StringComparators.NUMERIC))
+              .intervals(intervalSpec)
+              .aggregators(queryAggs);
+      basicQueries.put("numericSort", queryBuilderA);
+    }
+    { // basic.alphanumericSort
+      QuerySegmentSpec intervalSpec = new MultipleIntervalSegmentSpec(Collections.singletonList(basicSchema.getDataInterval()));
+      List<AggregatorFactory> queryAggs = new ArrayList<>();
+      queryAggs.add(new LongSumAggregatorFactory("sumLongSequential", "sumLongSequential"));
+      TopNQueryBuilder queryBuilderA = new TopNQueryBuilder()
+              .dataSource("blah")
+              .granularity(Granularities.ALL)
+              .dimension("dimUniform")
+              .metric(new DimensionTopNMetricSpec(null, StringComparators.ALPHANUMERIC))
+              .intervals(intervalSpec)
+              .aggregators(queryAggs);
+      basicQueries.put("alphanumericSort", queryBuilderA);
+    }
+    SCHEMA_QUERY_MAP.put("basic", basicQueries);
+  }
+  @Setup
+  public void setup() throws IOException
+  {
+"SETUP CALLED AT " + System.currentTimeMillis());
+    if (ComplexMetrics.getSerdeForType("hyperUnique") == null) {
+      ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde(HyperLogLogHash.getDefault()));
+    }
+    executorService = Execs.multiThreaded(numSegments, "TopNThreadPool");
+    setupQueries();
+    String[] schemaQuery = schemaAndQuery.split("\\.");
+    String schemaName = schemaQuery[0];
+    String queryName = schemaQuery[1];
+    schemaInfo = BenchmarkSchemas.SCHEMA_MAP.get(schemaName);
+    queryBuilder = SCHEMA_QUERY_MAP.get(schemaName).get(queryName);
+    queryBuilder.threshold(threshold);
+    query =;
+    incIndexes = new ArrayList<>();
+    for (int i = 0; i < numSegments; i++) {
+"Generating rows for segment " + i);
+      BenchmarkDataGenerator gen = new BenchmarkDataGenerator(
+              schemaInfo.getColumnSchemas(),
+              RNG_SEED + i,
+              schemaInfo.getDataInterval(),
+              rowsPerSegment
+      );
+      IncrementalIndex incIndex = makeIncIndex();
+      for (int j = 0; j < rowsPerSegment; j++) {
+        InputRow row = gen.nextRow();
+        if (j % 10000 == 0) {
+ + " rows generated.");
+        }
+        incIndex.add(row);
+      }
+      incIndexes.add(incIndex);
+    }
+    tmpDir = Files.createTempDir();
+"Using temp dir: " + tmpDir.getAbsolutePath());
+    qIndexes = new ArrayList<>();
+    for (int i = 0; i < numSegments; i++) {
+      File indexFile = INDEX_MERGER_V9.persist(
+              incIndexes.get(i),
+              tmpDir,
+              new IndexSpec(),
+              null
+      );
+      QueryableIndex qIndex = INDEX_IO.loadIndex(indexFile);
+      qIndexes.add(qIndex);
+    }
+    factory = new TopNQueryRunnerFactory(
+            new StupidPool<>(
+                    "TopNBenchmark-compute-bufferPool",
+                    new OffheapBufferGenerator("compute", 250000000),
+                    0,
+                    Integer.MAX_VALUE
+            ),
+            new TopNQueryQueryToolChest(new TopNQueryConfig(), QueryBenchmarkUtil.NoopIntervalChunkingQueryRunnerDecorator()),
+            QueryBenchmarkUtil.NOOP_QUERYWATCHER
+    );
+  }
+  @TearDown
+  public void tearDown() throws IOException
+  {
+    FileUtils.deleteDirectory(tmpDir);
+  }
+  private IncrementalIndex makeIncIndex()
+  {
+    if (onheap) {
+      return new IncrementalIndex.Builder()
+              .setIndexSchema(
+                      new IncrementalIndexSchema.Builder()
+                              .withMetrics(schemaInfo.getAggsArray())
+                              .withRollup(rollup)
+                              .build()
+              )
+              .setReportParseExceptions(false)
+              .setMaxRowCount(rowsPerSegment * 16)
+              .buildOnheap();
+    } else {
+      return new IncrementalIndex.Builder()
+              .setIndexSchema(
+                      new IncrementalIndexSchema.Builder()
+                              .withMetrics(schemaInfo.getAggsArray())
+                              .withRollup(rollup)
+                              .build()
+              )
+              .setReportParseExceptions(false)
+              .setMaxRowCount(rowsPerSegment * 16)
+              .buildOffheapOak();
+    }
+  }
+  private static <T> List<T> runQuery(QueryRunnerFactory factory, QueryRunner runner, Query<T> query)
+  {
+    QueryToolChest toolChest = factory.getToolchest();
+    QueryRunner<T> theRunner = new FinalizeResultsQueryRunner<>(
+            toolChest.mergeResults(toolChest.preMergeQueryDecoration(runner)),
+            toolChest
+    );
+    Sequence<T> queryResult =, Maps.newHashMap());
+    return queryResult.toList();
+  }
+  @Benchmark
+  @BenchmarkMode(Mode.SingleShotTime)
+  @OutputTimeUnit(TimeUnit.SECONDS)
+  public void querySingleIncrementalIndex(Blackhole blackhole)
+  {
+    long time = System.currentTimeMillis();
+    QueryRunner<Result<TopNResultValue>> runner = QueryBenchmarkUtil.makeQueryRunner(
+            factory,
+            "incIndex",
+            new IncrementalIndexSegment(incIndexes.get(0), "incIndex")
+    );
+    List<Result<TopNResultValue>> results = OakTopNBenchmark.runQuery(factory, runner, query);
+    for (Result<TopNResultValue> result : results) {
+      blackhole.consume(result);
+    }
+    long duration = System.currentTimeMillis() - time;
+    double throughput = rowsPerSegment / (double) duration;
+"Throughput: " + throughput + " ops/ms");
+  }
+  @Benchmark
+  @BenchmarkMode(Mode.SingleShotTime)
+  @OutputTimeUnit(TimeUnit.SECONDS)
+  public void querySingleQueryableIndex(Blackhole blackhole)
+  {
+    long time = System.currentTimeMillis();
+    final QueryRunner<Result<TopNResultValue>> runner = QueryBenchmarkUtil.makeQueryRunner(
+            factory,
+            "qIndex",
+            new QueryableIndexSegment("qIndex", qIndexes.get(0))
+    );
+    List<Result<TopNResultValue>> results = OakTopNBenchmark.runQuery(factory, runner, query);
+    for (Result<TopNResultValue> result : results) {
+      blackhole.consume(result);
+    }
+    long duration = System.currentTimeMillis() - time;
+    double throughput = rowsPerSegment / (double) duration;
+"Throughput: " + throughput + " ops/ms");
+  }
+  @Benchmark
+  @BenchmarkMode(Mode.SingleShotTime)
+  @OutputTimeUnit(TimeUnit.MICROSECONDS)
+  public void queryMultiQueryableIndex(Blackhole blackhole)
+  {
+    long time = System.currentTimeMillis();
+    List<QueryRunner<Result<TopNResultValue>>> singleSegmentRunners = Lists.newArrayList();
+    QueryToolChest toolChest = factory.getToolchest();
+    for (int i = 0; i < numSegments; i++) {
+      String segmentName = "qIndex" + i;
+      QueryRunner<Result<TopNResultValue>> runner = QueryBenchmarkUtil.makeQueryRunner(
+              factory,
+              segmentName,
+              new QueryableIndexSegment(segmentName, qIndexes.get(i))
+      );
+      singleSegmentRunners.add(toolChest.preMergeQueryDecoration(runner));
+    }
+    QueryRunner theRunner = toolChest.postMergeQueryDecoration(
+            new FinalizeResultsQueryRunner<>(
+                    toolChest.mergeResults(factory.mergeRunners(executorService, singleSegmentRunners)),
+                    toolChest
+            )
+    );
+    Sequence<Result<TopNResultValue>> queryResult =
+            QueryPlus.wrap(query),
+            Maps.newHashMap()
+    );
+    List<Result<TopNResultValue>> results = queryResult.toList();
+    for (Result<TopNResultValue> result : results) {
+      blackhole.consume(result);
+    }
+    long duration = System.currentTimeMillis() - time;
+    double throughput = rowsPerSegment / (double) duration;
+"Throughput: " + throughput + " ops/ms");
+  }
 Review comment:
   Can you please attach the benchmark results before and after this PR? 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:

With regards,
Apache Git Services

To unsubscribe, e-mail:
For additional commands, e-mail: