You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by gi...@apache.org on 2021/01/08 06:19:04 UTC
[druid] branch master updated: IncrementalIndex Tests and
Benchmarks Parametrization (#10593)
This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 08ab82f IncrementalIndex Tests and Benchmarks Parametrization (#10593)
08ab82f is described below
commit 08ab82f55ca856d60dfe1088c1c0393428b0bb6d
Author: Liran Funaro <li...@verizonmedia.com>
AuthorDate: Fri Jan 8 08:18:47 2021 +0200
IncrementalIndex Tests and Benchmarks Parametrization (#10593)
* Remove redundant IncrementalIndex.Builder
* Parametrize incremental index tests and benchmarks
- Reveal and fix a bug in OffheapIncrementalIndex
* Fix forbiddenapis error: Forbidden method invocation: java.lang.String#format(java.lang.String,java.lang.Object[]) [Uses default locale]
* Fix Intellij errors: declared exception is never thrown
* Add documentation and validate before closing objects on tearDown.
* Add documentation to OffheapIncrementalIndexTestSpec
* Doc corrections and minor changes.
* Add logging for generated rows.
* Refactor new tests/benchmarks.
* Improve IncrementalIndexCreator documentation
* Add required tests for DataGenerator
* Revert "rollupOpportunity" to be a string
---
.../druid/benchmark/FilterPartitionBenchmark.java | 5 +-
.../benchmark/FilteredAggregatorBenchmark.java | 184 +++++++++++-----
.../benchmark/GroupByTypeInterfaceBenchmark.java | 5 +-
.../IncrementalIndexRowTypeBenchmark.java | 58 +++--
.../benchmark/TopNTypeInterfaceBenchmark.java | 5 +-
.../indexing/IncrementalIndexReadBenchmark.java | 35 +--
.../indexing/IndexIngestionBenchmark.java | 51 +++--
.../benchmark/indexing/IndexMergeBenchmark.java | 16 +-
.../benchmark/indexing/IndexPersistBenchmark.java | 105 +++++----
.../druid/benchmark/query/GroupByBenchmark.java | 230 ++++++++++---------
.../druid/benchmark/query/ScanBenchmark.java | 180 +++++++++------
.../druid/benchmark/query/SearchBenchmark.java | 174 +++++++++------
.../druid/benchmark/query/TimeseriesBenchmark.java | 189 ++++++++++------
.../druid/benchmark/query/TopNBenchmark.java | 173 +++++++++------
.../query/timecompare/TimeCompareBenchmark.java | 5 +-
.../DistinctCountGroupByQueryTest.java | 5 +-
.../DistinctCountTimeseriesQueryTest.java | 5 +-
.../distinctcount/DistinctCountTopNQueryTest.java | 5 +-
.../druid/segment/MapVirtualColumnTestBase.java | 5 +-
.../overlord/sampler/InputSourceSampler.java | 5 +-
.../firehose/IngestSegmentFirehoseFactoryTest.java | 5 +-
.../IngestSegmentFirehoseFactoryTimelineTest.java | 5 +-
.../druid/segment/generator/DataGenerator.java | 91 +++++++-
.../incremental/AppendableIndexBuilder.java | 2 +-
.../segment/incremental/IncrementalIndex.java | 60 -----
.../incremental/OffheapIncrementalIndex.java | 35 +--
.../org/apache/druid/query/DoubleStorageTest.java | 5 +-
.../druid/query/MultiValuedDimensionTest.java | 9 +-
.../query/aggregation/AggregationTestHelper.java | 13 +-
.../first/StringFirstTimeseriesQueryTest.java | 5 +-
.../last/StringLastTimeseriesQueryTest.java | 5 +-
.../DataSourceMetadataQueryTest.java | 5 +-
...GroupByLimitPushDownInsufficientBufferTest.java | 5 +-
.../GroupByLimitPushDownMultiNodeMergeTest.java | 5 +-
.../query/groupby/GroupByMultiSegmentTest.java | 5 +-
.../groupby/GroupByQueryRunnerFactoryTest.java | 5 +-
.../query/groupby/NestedQueryPushDownTest.java | 5 +-
.../druid/query/metadata/SegmentAnalyzerTest.java | 5 +-
.../query/scan/MultiSegmentScanQueryTest.java | 5 +-
.../druid/query/search/SearchQueryRunnerTest.java | 5 +-
.../timeboundary/TimeBoundaryQueryRunnerTest.java | 5 +-
.../timeseries/TimeseriesQueryRunnerBonusTest.java | 5 +-
.../org/apache/druid/segment/EmptyIndexTest.java | 5 +-
.../org/apache/druid/segment/IndexBuilder.java | 5 +-
.../java/org/apache/druid/segment/IndexIOTest.java | 9 +-
.../apache/druid/segment/IndexMergerTestBase.java | 77 +++----
.../segment/IndexMergerV9CompatibilityTest.java | 5 +-
.../segment/IndexMergerV9WithSpatialIndexTest.java | 17 +-
.../apache/druid/segment/SchemalessIndexTest.java | 13 +-
.../java/org/apache/druid/segment/TestIndex.java | 5 +-
.../druid/segment/data/IncrementalIndexTest.java | 174 ++++++---------
.../segment/filter/SpatialFilterBonusTest.java | 17 +-
.../druid/segment/filter/SpatialFilterTest.java | 17 +-
.../druid/segment/generator/DataGeneratorTest.java | 131 ++++++++++-
.../incremental/IncrementalIndexAdapterTest.java | 34 ++-
.../incremental/IncrementalIndexCreator.java | 244 +++++++++++++++++++++
...est.java => IncrementalIndexIngestionTest.java} | 127 ++++++-----
.../IncrementalIndexMultiValueSpecTest.java | 32 ++-
.../incremental/IncrementalIndexRowCompTest.java | 33 ++-
.../incremental/IncrementalIndexRowSizeTest.java | 53 +++--
.../IncrementalIndexStorageAdapterTest.java | 57 ++---
.../segment/incremental/IncrementalIndexTest.java | 104 ++-------
.../OffheapIncrementalIndexTestSpec.java | 107 +++++++++
.../segment/virtual/ExpressionSelectorsTest.java | 3 +-
.../firehose/IngestSegmentFirehoseTest.java | 9 +-
65 files changed, 1932 insertions(+), 1076 deletions(-)
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/FilterPartitionBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/FilterPartitionBenchmark.java
index f7a690f..6908b72 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/FilterPartitionBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/FilterPartitionBenchmark.java
@@ -70,6 +70,7 @@ import org.apache.druid.segment.generator.DataGenerator;
import org.apache.druid.segment.generator.GeneratorBasicSchemas;
import org.apache.druid.segment.generator.GeneratorSchemaInfo;
import org.apache.druid.segment.incremental.IncrementalIndex;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.segment.serde.ComplexMetrics;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.joda.time.Interval;
@@ -227,10 +228,10 @@ public class FilterPartitionBenchmark
private IncrementalIndex makeIncIndex()
{
- return new IncrementalIndex.Builder()
+ return new OnheapIncrementalIndex.Builder()
.setSimpleTestingIndexSchema(schemaInfo.getAggsArray())
.setMaxRowCount(rowsPerSegment)
- .buildOnheap();
+ .build();
}
@Benchmark
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/FilteredAggregatorBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/FilteredAggregatorBenchmark.java
index 560148b..fab0831 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/FilteredAggregatorBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/FilteredAggregatorBenchmark.java
@@ -19,6 +19,7 @@
package org.apache.druid.benchmark;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.benchmark.query.QueryBenchmarkUtil;
@@ -68,13 +69,17 @@ import org.apache.druid.segment.column.ColumnConfig;
import org.apache.druid.segment.generator.DataGenerator;
import org.apache.druid.segment.generator.GeneratorBasicSchemas;
import org.apache.druid.segment.generator.GeneratorSchemaInfo;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
import org.apache.druid.segment.incremental.IncrementalIndex;
+import org.apache.druid.segment.incremental.IncrementalIndexCreator;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.segment.serde.ComplexMetrics;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.timeline.SegmentId;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
@@ -88,7 +93,6 @@ import org.openjdk.jmh.infra.Blackhole;
import java.io.File;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -113,22 +117,22 @@ public class FilteredAggregatorBenchmark
@Param({"false", "true"})
private String vectorize;
+ @Param({"true", "false"})
+ private boolean descending;
+
private static final Logger log = new Logger(FilteredAggregatorBenchmark.class);
private static final int RNG_SEED = 9999;
private static final IndexMergerV9 INDEX_MERGER_V9;
private static final IndexIO INDEX_IO;
public static final ObjectMapper JSON_MAPPER;
- private IncrementalIndex incIndex;
- private IncrementalIndex incIndexFilteredAgg;
- private AggregatorFactory[] filteredMetrics;
- private QueryableIndex qIndex;
- private File indexFile;
+
+ private AppendableIndexSpec appendableIndexSpec;
+ private AggregatorFactory filteredMetric;
private DimFilter filter;
- private List<InputRow> inputRows;
+ private DataGenerator generator;
private QueryRunnerFactory factory;
private GeneratorSchemaInfo schemaInfo;
private TimeseriesQuery query;
- private File tmpDir;
static {
JSON_MAPPER = new DefaultObjectMapper();
@@ -146,8 +150,11 @@ public class FilteredAggregatorBenchmark
INDEX_MERGER_V9 = new IndexMergerV9(JSON_MAPPER, INDEX_IO, OffHeapMemorySegmentWriteOutMediumFactory.instance());
}
+ /**
+ * Setup everything common for benchmarking both the incremental-index and the queriable-index.
+ */
@Setup
- public void setup() throws IOException
+ public void setup()
{
log.info("SETUP CALLED AT " + System.currentTimeMillis());
@@ -155,15 +162,13 @@ public class FilteredAggregatorBenchmark
schemaInfo = GeneratorBasicSchemas.SCHEMA_MAP.get(schema);
- DataGenerator gen = new DataGenerator(
+ generator = new DataGenerator(
schemaInfo.getColumnSchemas(),
RNG_SEED,
schemaInfo.getDataInterval(),
rowsPerSegment
);
- incIndex = makeIncIndex(schemaInfo.getAggsArray());
-
filter = new OrDimFilter(
Arrays.asList(
new BoundDimFilter("dimSequential", "-1", "-1", true, true, null, null, StringComparators.ALPHANUMERIC),
@@ -172,30 +177,7 @@ public class FilteredAggregatorBenchmark
new InDimFilter("dimSequential", Collections.singletonList("X"), null)
)
);
- filteredMetrics = new AggregatorFactory[1];
- filteredMetrics[0] = new FilteredAggregatorFactory(new CountAggregatorFactory("rows"), filter);
- incIndexFilteredAgg = makeIncIndex(filteredMetrics);
-
- inputRows = new ArrayList<>();
- for (int j = 0; j < rowsPerSegment; j++) {
- InputRow row = gen.nextRow();
- if (j % 10000 == 0) {
- log.info(j + " rows generated.");
- }
- incIndex.add(row);
- inputRows.add(row);
- }
-
- tmpDir = FileUtils.createTempDir();
- log.info("Using temp dir: " + tmpDir.getAbsolutePath());
-
- indexFile = INDEX_MERGER_V9.persist(
- incIndex,
- tmpDir,
- new IndexSpec(),
- null
- );
- qIndex = INDEX_IO.loadIndex(indexFile);
+ filteredMetric = new FilteredAggregatorFactory(new CountAggregatorFactory("rows"), filter);
factory = new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(),
@@ -205,30 +187,127 @@ public class FilteredAggregatorBenchmark
GeneratorSchemaInfo basicSchema = GeneratorBasicSchemas.SCHEMA_MAP.get("basic");
QuerySegmentSpec intervalSpec = new MultipleIntervalSegmentSpec(Collections.singletonList(basicSchema.getDataInterval()));
- List<AggregatorFactory> queryAggs = new ArrayList<>();
- queryAggs.add(filteredMetrics[0]);
+ List<AggregatorFactory> queryAggs = Collections.singletonList(filteredMetric);
query = Druids.newTimeseriesQueryBuilder()
.dataSource("blah")
.granularity(Granularities.ALL)
.intervals(intervalSpec)
.aggregators(queryAggs)
- .descending(false)
+ .descending(descending)
.build();
}
- @TearDown
- public void tearDown() throws IOException
+ /**
+ * Setup/teardown everything specific for benchmarking the incremental-index.
+ */
+ @State(Scope.Benchmark)
+ public static class IncrementalIndexState
+ {
+ @Param({"onheap", "offheap"})
+ private String indexType;
+
+ IncrementalIndex<?> incIndex;
+
+ @Setup
+ public void setup(FilteredAggregatorBenchmark global) throws JsonProcessingException
+ {
+ // Creates an AppendableIndexSpec that corresponds to the indexType parametrization.
+ // It is used in {@code global.makeIncIndex()} to instanciate an incremental-index of the specified type.
+ global.appendableIndexSpec = IncrementalIndexCreator.parseIndexType(indexType);
+ incIndex = global.makeIncIndex(global.schemaInfo.getAggsArray());
+ global.generator.addToIndex(incIndex, global.rowsPerSegment);
+ }
+
+ @TearDown
+ public void tearDown()
+ {
+ if (incIndex != null) {
+ incIndex.close();
+ }
+ }
+ }
+
+ /**
+ * Setup/teardown everything specific for benchmarking the ingestion of the incremental-index.
+ */
+ @State(Scope.Benchmark)
+ public static class IncrementalIndexIngestState
+ {
+ @Param({"onheap", "offheap"})
+ private String indexType;
+
+ IncrementalIndex<?> incIndex;
+ List<InputRow> inputRows;
+
+ @Setup(Level.Invocation)
+ public void setup(FilteredAggregatorBenchmark global) throws JsonProcessingException
+ {
+ // Creates an AppendableIndexSpec that corresponds to the indexType parametrization.
+ // It is used in {@code global.makeIncIndex()} to instanciate an incremental-index of the specified type.
+ global.appendableIndexSpec = IncrementalIndexCreator.parseIndexType(indexType);
+ inputRows = global.generator.toList(global.rowsPerSegment);
+ incIndex = global.makeIncIndex(new AggregatorFactory[]{global.filteredMetric});
+ }
+
+ @TearDown(Level.Invocation)
+ public void tearDown()
+ {
+ if (incIndex != null) {
+ incIndex.close();
+ }
+ }
+ }
+
+ /**
+ * Setup/teardown everything specific for benchmarking the queriable-index.
+ */
+ @State(Scope.Benchmark)
+ public static class QueryableIndexState
{
- FileUtils.deleteDirectory(tmpDir);
+ private File qIndexesDir;
+ private QueryableIndex qIndex;
+
+ @Setup
+ public void setup(FilteredAggregatorBenchmark global) throws IOException
+ {
+ global.appendableIndexSpec = new OnheapIncrementalIndex.Spec();
+
+ IncrementalIndex<?> incIndex = global.makeIncIndex(global.schemaInfo.getAggsArray());
+ global.generator.addToIndex(incIndex, global.rowsPerSegment);
+
+ qIndexesDir = FileUtils.createTempDir();
+ log.info("Using temp dir: " + qIndexesDir.getAbsolutePath());
+
+ File indexFile = INDEX_MERGER_V9.persist(
+ incIndex,
+ qIndexesDir,
+ new IndexSpec(),
+ null
+ );
+ incIndex.close();
+
+ qIndex = INDEX_IO.loadIndex(indexFile);
+ }
+
+ @TearDown
+ public void tearDown()
+ {
+ if (qIndex != null) {
+ qIndex.close();
+ }
+ if (qIndexesDir != null) {
+ qIndexesDir.delete();
+ }
+ }
}
- private IncrementalIndex makeIncIndex(AggregatorFactory[] metrics)
+ private IncrementalIndex<?> makeIncIndex(AggregatorFactory[] metrics)
{
- return new IncrementalIndex.Builder()
+ return appendableIndexSpec.builder()
.setSimpleTestingIndexSchema(metrics)
.setMaxRowCount(rowsPerSegment)
- .buildOnheap();
+ .build();
}
private static <T> List<T> runQuery(QueryRunnerFactory factory, QueryRunner runner, Query<T> query, String vectorize)
@@ -254,11 +333,10 @@ public class FilteredAggregatorBenchmark
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
- public void ingest(Blackhole blackhole) throws Exception
+ public void ingest(Blackhole blackhole, IncrementalIndexIngestState state) throws Exception
{
- incIndexFilteredAgg = makeIncIndex(filteredMetrics);
- for (InputRow row : inputRows) {
- int rv = incIndexFilteredAgg.add(row).getRowCount();
+ for (InputRow row : state.inputRows) {
+ int rv = state.incIndex.add(row).getRowCount();
blackhole.consume(rv);
}
}
@@ -266,12 +344,12 @@ public class FilteredAggregatorBenchmark
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
- public void querySingleIncrementalIndex(Blackhole blackhole)
+ public void querySingleIncrementalIndex(Blackhole blackhole, IncrementalIndexState state)
{
QueryRunner<Result<TimeseriesResultValue>> runner = QueryBenchmarkUtil.makeQueryRunner(
factory,
SegmentId.dummy("incIndex"),
- new IncrementalIndexSegment(incIndex, SegmentId.dummy("incIndex"))
+ new IncrementalIndexSegment(state.incIndex, SegmentId.dummy("incIndex"))
);
List<Result<TimeseriesResultValue>> results = FilteredAggregatorBenchmark.runQuery(
@@ -288,12 +366,12 @@ public class FilteredAggregatorBenchmark
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
- public void querySingleQueryableIndex(Blackhole blackhole)
+ public void querySingleQueryableIndex(Blackhole blackhole, QueryableIndexState state)
{
final QueryRunner<Result<TimeseriesResultValue>> runner = QueryBenchmarkUtil.makeQueryRunner(
factory,
SegmentId.dummy("qIndex"),
- new QueryableIndexSegment(qIndex, SegmentId.dummy("qIndex"))
+ new QueryableIndexSegment(state.qIndex, SegmentId.dummy("qIndex"))
);
List<Result<TimeseriesResultValue>> results = FilteredAggregatorBenchmark.runQuery(
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByTypeInterfaceBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByTypeInterfaceBenchmark.java
index 1921b43..19fe385 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByTypeInterfaceBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByTypeInterfaceBenchmark.java
@@ -70,6 +70,7 @@ import org.apache.druid.segment.generator.DataGenerator;
import org.apache.druid.segment.generator.GeneratorBasicSchemas;
import org.apache.druid.segment.generator.GeneratorSchemaInfo;
import org.apache.druid.segment.incremental.IncrementalIndex;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.segment.serde.ComplexMetrics;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.timeline.SegmentId;
@@ -413,11 +414,11 @@ public class GroupByTypeInterfaceBenchmark
private IncrementalIndex makeIncIndex()
{
- return new IncrementalIndex.Builder()
+ return new OnheapIncrementalIndex.Builder()
.setSimpleTestingIndexSchema(schemaInfo.getAggsArray())
.setConcurrentEventAdd(true)
.setMaxRowCount(rowsPerSegment)
- .buildOnheap();
+ .build();
}
@TearDown(Level.Trial)
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/IncrementalIndexRowTypeBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/IncrementalIndexRowTypeBenchmark.java
index cc8d4a3..572d0cb 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/IncrementalIndexRowTypeBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/IncrementalIndexRowTypeBenchmark.java
@@ -19,6 +19,7 @@
package org.apache.druid.benchmark;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.InputRow;
@@ -28,13 +29,15 @@ import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
import org.apache.druid.segment.incremental.IncrementalIndex;
+import org.apache.druid.segment.incremental.IncrementalIndexCreator;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Mode;
-import org.openjdk.jmh.annotations.OperationsPerInvocation;
import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
@@ -53,12 +56,16 @@ public class IncrementalIndexRowTypeBenchmark
NullHandling.initializeForTests();
}
- private IncrementalIndex incIndex;
- private IncrementalIndex incFloatIndex;
- private IncrementalIndex incStrIndex;
+ @Param({"250000"})
+ private int rowsPerSegment;
+
+ @Param({"onheap", "offheap"})
+ private String indexType;
+
+ private AppendableIndexSpec appendableIndexSpec;
+ IncrementalIndex<?> incIndex;
private static AggregatorFactory[] aggs;
static final int DIMENSION_COUNT = 8;
- static final int MAX_ROWS = 250000;
private ArrayList<InputRow> longRows = new ArrayList<InputRow>();
private ArrayList<InputRow> floatRows = new ArrayList<InputRow>();
@@ -124,46 +131,51 @@ public class IncrementalIndexRowTypeBenchmark
return new MapBasedInputRow(timestamp, dimensionList, builder.build());
}
- private IncrementalIndex makeIncIndex()
+ private IncrementalIndex<?> makeIncIndex()
{
- return new IncrementalIndex.Builder()
+ return appendableIndexSpec.builder()
.setSimpleTestingIndexSchema(aggs)
.setDeserializeComplexMetrics(false)
- .setMaxRowCount(MAX_ROWS)
- .buildOnheap();
+ .setMaxRowCount(rowsPerSegment)
+ .build();
}
@Setup
- public void setup()
+ public void setup() throws JsonProcessingException
{
- for (int i = 0; i < MAX_ROWS; i++) {
+ appendableIndexSpec = IncrementalIndexCreator.parseIndexType(indexType);
+
+ for (int i = 0; i < rowsPerSegment; i++) {
longRows.add(getLongRow(0, DIMENSION_COUNT));
}
- for (int i = 0; i < MAX_ROWS; i++) {
+ for (int i = 0; i < rowsPerSegment; i++) {
floatRows.add(getFloatRow(0, DIMENSION_COUNT));
}
- for (int i = 0; i < MAX_ROWS; i++) {
+ for (int i = 0; i < rowsPerSegment; i++) {
stringRows.add(getStringRow(0, DIMENSION_COUNT));
}
}
- @Setup(Level.Iteration)
+ @Setup(Level.Invocation)
public void setup2()
{
incIndex = makeIncIndex();
- incFloatIndex = makeIncIndex();
- incStrIndex = makeIncIndex();
+ }
+
+ @Setup(Level.Invocation)
+ public void tearDown()
+ {
+ incIndex.close();
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
- @OperationsPerInvocation(MAX_ROWS)
public void normalLongs(Blackhole blackhole) throws Exception
{
- for (int i = 0; i < MAX_ROWS; i++) {
+ for (int i = 0; i < rowsPerSegment; i++) {
InputRow row = longRows.get(i);
int rv = incIndex.add(row).getRowCount();
blackhole.consume(rv);
@@ -173,12 +185,11 @@ public class IncrementalIndexRowTypeBenchmark
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
- @OperationsPerInvocation(MAX_ROWS)
public void normalFloats(Blackhole blackhole) throws Exception
{
- for (int i = 0; i < MAX_ROWS; i++) {
+ for (int i = 0; i < rowsPerSegment; i++) {
InputRow row = floatRows.get(i);
- int rv = incFloatIndex.add(row).getRowCount();
+ int rv = incIndex.add(row).getRowCount();
blackhole.consume(rv);
}
}
@@ -186,12 +197,11 @@ public class IncrementalIndexRowTypeBenchmark
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
- @OperationsPerInvocation(MAX_ROWS)
public void normalStrings(Blackhole blackhole) throws Exception
{
- for (int i = 0; i < MAX_ROWS; i++) {
+ for (int i = 0; i < rowsPerSegment; i++) {
InputRow row = stringRows.get(i);
- int rv = incStrIndex.add(row).getRowCount();
+ int rv = incIndex.add(row).getRowCount();
blackhole.consume(rv);
}
}
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/TopNTypeInterfaceBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/TopNTypeInterfaceBenchmark.java
index fabb86a..b6078ae 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/TopNTypeInterfaceBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/TopNTypeInterfaceBenchmark.java
@@ -67,6 +67,7 @@ import org.apache.druid.segment.generator.DataGenerator;
import org.apache.druid.segment.generator.GeneratorBasicSchemas;
import org.apache.druid.segment.generator.GeneratorSchemaInfo;
import org.apache.druid.segment.incremental.IncrementalIndex;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.segment.serde.ComplexMetrics;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.timeline.SegmentId;
@@ -308,10 +309,10 @@ public class TopNTypeInterfaceBenchmark
private IncrementalIndex makeIncIndex()
{
- return new IncrementalIndex.Builder()
+ return new OnheapIncrementalIndex.Builder()
.setSimpleTestingIndexSchema(schemaInfo.getAggsArray())
.setMaxRowCount(rowsPerSegment)
- .buildOnheap();
+ .build();
}
private static <T> List<T> runQuery(QueryRunnerFactory factory, QueryRunner runner, Query<T> query)
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IncrementalIndexReadBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IncrementalIndexReadBenchmark.java
index a49e122..c07c34f 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IncrementalIndexReadBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IncrementalIndexReadBenchmark.java
@@ -20,7 +20,6 @@
package org.apache.druid.benchmark.indexing;
import org.apache.druid.common.config.NullHandling;
-import org.apache.druid.data.input.InputRow;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.logger.Logger;
@@ -43,7 +42,9 @@ import org.apache.druid.segment.data.IndexedInts;
import org.apache.druid.segment.generator.DataGenerator;
import org.apache.druid.segment.generator.GeneratorBasicSchemas;
import org.apache.druid.segment.generator.GeneratorSchemaInfo;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
import org.apache.druid.segment.incremental.IncrementalIndex;
+import org.apache.druid.segment.incremental.IncrementalIndexCreator;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.incremental.IncrementalIndexStorageAdapter;
import org.apache.druid.segment.serde.ComplexMetrics;
@@ -57,6 +58,7 @@ import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;
@@ -82,6 +84,9 @@ public class IncrementalIndexReadBenchmark
@Param({"true", "false"})
private boolean rollup;
+ @Param({"onheap", "offheap"})
+ private String indexType;
+
private static final Logger log = new Logger(IncrementalIndexReadBenchmark.class);
private static final int RNG_SEED = 9999;
@@ -89,8 +94,8 @@ public class IncrementalIndexReadBenchmark
NullHandling.initializeForTests();
}
- private IncrementalIndex incIndex;
-
+ private AppendableIndexSpec appendableIndexSpec;
+ private IncrementalIndex<?> incIndex;
private GeneratorSchemaInfo schemaInfo;
@Setup
@@ -102,6 +107,10 @@ public class IncrementalIndexReadBenchmark
schemaInfo = GeneratorBasicSchemas.SCHEMA_MAP.get(schema);
+ // Creates an AppendableIndexSpec that corresponds to the indexType parametrization.
+ // It is used in {@code makeIncIndex()} to instanciate an incremental-index of the specified type.
+ appendableIndexSpec = IncrementalIndexCreator.parseIndexType(indexType);
+
DataGenerator gen = new DataGenerator(
schemaInfo.getColumnSchemas(),
RNG_SEED,
@@ -110,20 +119,20 @@ public class IncrementalIndexReadBenchmark
);
incIndex = makeIncIndex();
+ gen.addToIndex(incIndex, rowsPerSegment);
+ }
- for (int j = 0; j < rowsPerSegment; j++) {
- InputRow row = gen.nextRow();
- if (j % 10000 == 0) {
- log.info(j + " rows generated.");
- }
- incIndex.add(row);
+ @TearDown
+ public void tearDown()
+ {
+ if (incIndex != null) {
+ incIndex.close();
}
-
}
- private IncrementalIndex makeIncIndex()
+ private IncrementalIndex<?> makeIncIndex()
{
- return new IncrementalIndex.Builder()
+ return appendableIndexSpec.builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withMetrics(schemaInfo.getAggsArray())
@@ -131,7 +140,7 @@ public class IncrementalIndexReadBenchmark
.build()
)
.setMaxRowCount(rowsPerSegment)
- .buildOnheap();
+ .build();
}
@Benchmark
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexIngestionBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexIngestionBenchmark.java
index 8448358..33819ef 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexIngestionBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexIngestionBenchmark.java
@@ -19,6 +19,7 @@
package org.apache.druid.benchmark.indexing;
+import com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.java.util.common.logger.Logger;
@@ -26,7 +27,9 @@ import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
import org.apache.druid.segment.generator.DataGenerator;
import org.apache.druid.segment.generator.GeneratorBasicSchemas;
import org.apache.druid.segment.generator.GeneratorSchemaInfo;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
import org.apache.druid.segment.incremental.IncrementalIndex;
+import org.apache.druid.segment.incremental.IncrementalIndexCreator;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.serde.ComplexMetrics;
import org.openjdk.jmh.annotations.Benchmark;
@@ -40,10 +43,11 @@ import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;
-import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.TimeUnit;
@State(Scope.Benchmark)
@@ -61,6 +65,12 @@ public class IndexIngestionBenchmark
@Param({"true", "false"})
private boolean rollup;
+ @Param({"none", "moderate", "high"})
+ private String rollupOpportunity;
+
+ @Param({"onheap", "offheap"})
+ private String indexType;
+
private static final Logger log = new Logger(IndexIngestionBenchmark.class);
private static final int RNG_SEED = 9999;
@@ -68,32 +78,31 @@ public class IndexIngestionBenchmark
NullHandling.initializeForTests();
}
- private IncrementalIndex incIndex;
- private ArrayList<InputRow> rows;
+ private AppendableIndexSpec appendableIndexSpec;
+ private IncrementalIndex<?> incIndex;
+ private List<InputRow> rows;
private GeneratorSchemaInfo schemaInfo;
@Setup
- public void setup()
+ public void setup() throws JsonProcessingException
{
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde());
- rows = new ArrayList<InputRow>();
schemaInfo = GeneratorBasicSchemas.SCHEMA_MAP.get(schema);
+ // Creates an AppendableIndexSpec that corresponds to the indexType parametrization.
+ // It is used in {@code makeIncIndex()} to instanciate an incremental-index of the specified type.
+ appendableIndexSpec = IncrementalIndexCreator.parseIndexType(indexType);
+
DataGenerator gen = new DataGenerator(
schemaInfo.getColumnSchemas(),
RNG_SEED,
- schemaInfo.getDataInterval(),
- rowsPerSegment
+ schemaInfo.getDataInterval().getStartMillis(),
+ IndexPersistBenchmark.getValuesPerTimestamp(rollupOpportunity),
+ 1000.0
);
- for (int i = 0; i < rowsPerSegment; i++) {
- InputRow row = gen.nextRow();
- if (i % 10000 == 0) {
- log.info(i + " rows generated.");
- }
- rows.add(row);
- }
+ rows = gen.toList(rowsPerSegment);
}
@Setup(Level.Invocation)
@@ -102,9 +111,17 @@ public class IndexIngestionBenchmark
incIndex = makeIncIndex();
}
- private IncrementalIndex makeIncIndex()
+ @TearDown(Level.Invocation)
+ public void tearDown()
+ {
+ if (incIndex != null) {
+ incIndex.close();
+ }
+ }
+
+ private IncrementalIndex<?> makeIncIndex()
{
- return new IncrementalIndex.Builder()
+ return appendableIndexSpec.builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withMetrics(schemaInfo.getAggsArray())
@@ -112,7 +129,7 @@ public class IndexIngestionBenchmark
.build()
)
.setMaxRowCount(rowsPerSegment * 2)
- .buildOnheap();
+ .build();
}
@Benchmark
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexMergeBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexMergeBenchmark.java
index ce34070..44be3cc 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexMergeBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexMergeBenchmark.java
@@ -22,7 +22,6 @@ package org.apache.druid.benchmark.indexing;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.common.config.NullHandling;
-import org.apache.druid.data.input.InputRow;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.logger.Logger;
@@ -37,6 +36,7 @@ import org.apache.druid.segment.generator.GeneratorBasicSchemas;
import org.apache.druid.segment.generator.GeneratorSchemaInfo;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.segment.serde.ComplexMetrics;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.segment.writeout.OnHeapMemorySegmentWriteOutMediumFactory;
@@ -131,15 +131,9 @@ public class IndexMergeBenchmark
rowsPerSegment
);
- IncrementalIndex incIndex = makeIncIndex();
+ IncrementalIndex<?> incIndex = makeIncIndex();
- for (int j = 0; j < rowsPerSegment; j++) {
- InputRow row = gen.nextRow();
- if (j % 10000 == 0) {
- log.info(j + " rows generated.");
- }
- incIndex.add(row);
- }
+ gen.addToIndex(incIndex, rowsPerSegment);
tmpDir = FileUtils.createTempDir();
log.info("Using temp dir: " + tmpDir.getAbsolutePath());
@@ -212,7 +206,7 @@ public class IndexMergeBenchmark
private IncrementalIndex makeIncIndex()
{
- return new IncrementalIndex.Builder()
+ return new OnheapIncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withMetrics(schemaInfo.getAggsArray())
@@ -220,6 +214,6 @@ public class IndexMergeBenchmark
.build()
)
.setMaxRowCount(rowsPerSegment)
- .buildOnheap();
+ .build();
}
}
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexPersistBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexPersistBenchmark.java
index 755947d..3679ada 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexPersistBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexPersistBenchmark.java
@@ -19,6 +19,7 @@
package org.apache.druid.benchmark.indexing;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.InputRow;
@@ -32,7 +33,9 @@ import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.generator.DataGenerator;
import org.apache.druid.segment.generator.GeneratorBasicSchemas;
import org.apache.druid.segment.generator.GeneratorSchemaInfo;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
import org.apache.druid.segment.incremental.IncrementalIndex;
+import org.apache.druid.segment.incremental.IncrementalIndexCreator;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.serde.ComplexMetrics;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
@@ -53,7 +56,7 @@ import org.openjdk.jmh.infra.Blackhole;
import java.io.File;
import java.io.IOException;
-import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.TimeUnit;
@State(Scope.Benchmark)
@@ -90,68 +93,84 @@ public class IndexPersistBenchmark
@Param({"none", "moderate", "high"})
private String rollupOpportunity;
- private IncrementalIndex incIndex;
- private ArrayList<InputRow> rows;
+ @Param({"onheap", "offheap"})
+ private String indexType;
+
+ private AppendableIndexSpec appendableIndexSpec;
+ private IncrementalIndex<?> incIndex;
+ private List<InputRow> rows;
private GeneratorSchemaInfo schemaInfo;
+ private File tmpDir;
@Setup
- public void setup()
+ public void setup() throws JsonProcessingException
{
log.info("SETUP CALLED AT " + System.currentTimeMillis());
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde());
- rows = new ArrayList<InputRow>();
schemaInfo = GeneratorBasicSchemas.SCHEMA_MAP.get(schema);
- int valuesPerTimestamp = 1;
- switch (rollupOpportunity) {
- case "moderate":
- valuesPerTimestamp = 1000;
- break;
- case "high":
- valuesPerTimestamp = 10000;
- break;
-
- }
+ // Creates an AppendableIndexSpec that corresponds to the indexType parametrization.
+ // It is used in {@code makeIncIndex()} to instanciate an incremental-index of the specified type.
+ appendableIndexSpec = IncrementalIndexCreator.parseIndexType(indexType);
DataGenerator gen = new DataGenerator(
schemaInfo.getColumnSchemas(),
RNG_SEED,
schemaInfo.getDataInterval().getStartMillis(),
- valuesPerTimestamp,
+ getValuesPerTimestamp(rollupOpportunity),
1000.0
);
- for (int i = 0; i < rowsPerSegment; i++) {
- InputRow row = gen.nextRow();
- if (i % 10000 == 0) {
- log.info(i + " rows generated.");
- }
- rows.add(row);
+ rows = gen.toList(rowsPerSegment);
+ }
+
+ public static int getValuesPerTimestamp(String rollupOpportunity)
+ {
+ switch (rollupOpportunity) {
+ case "moderate":
+ return 1000;
+ case "high":
+ return 10000;
+ case "none":
+ return 1;
+ default:
+ throw new IllegalArgumentException("Rollup opportunity must be moderate, high or none.");
}
}
@Setup(Level.Iteration)
- public void setup2() throws IOException
+ public void setup2()
{
incIndex = makeIncIndex();
- for (int i = 0; i < rowsPerSegment; i++) {
- InputRow row = rows.get(i);
- incIndex.add(row);
- }
+ DataGenerator.addStreamToIndex(rows.stream(), incIndex);
}
@TearDown(Level.Iteration)
public void teardown()
{
- incIndex.close();
- incIndex = null;
+ if (incIndex != null) {
+ incIndex.close();
+ }
}
- private IncrementalIndex makeIncIndex()
+ @Setup(Level.Invocation)
+ public void setupTemp()
{
- return new IncrementalIndex.Builder()
+ tmpDir = FileUtils.createTempDir();
+ log.info("Using temp dir: " + tmpDir.getAbsolutePath());
+ }
+
+ @TearDown(Level.Invocation)
+ public void teardownTemp() throws IOException
+ {
+ FileUtils.deleteDirectory(tmpDir);
+ }
+
+ private IncrementalIndex<?> makeIncIndex()
+ {
+ return appendableIndexSpec.builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withMetrics(schemaInfo.getAggsArray())
@@ -159,7 +178,7 @@ public class IndexPersistBenchmark
.build()
)
.setMaxRowCount(rowsPerSegment)
- .buildOnheap();
+ .build();
}
@Benchmark
@@ -167,21 +186,13 @@ public class IndexPersistBenchmark
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void persistV9(Blackhole blackhole) throws Exception
{
- File tmpDir = FileUtils.createTempDir();
- log.info("Using temp dir: " + tmpDir.getAbsolutePath());
- try {
- File indexFile = INDEX_MERGER_V9.persist(
- incIndex,
- tmpDir,
- new IndexSpec(),
- null
- );
-
- blackhole.consume(indexFile);
+ File indexFile = INDEX_MERGER_V9.persist(
+ incIndex,
+ tmpDir,
+ new IndexSpec(),
+ null
+ );
- }
- finally {
- FileUtils.deleteDirectory(tmpDir);
- }
+ blackhole.consume(indexFile);
}
}
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/GroupByBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/GroupByBenchmark.java
index 1e93d16..b9808c9 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/GroupByBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/GroupByBenchmark.java
@@ -19,6 +19,7 @@
package org.apache.druid.benchmark.query;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
@@ -31,7 +32,6 @@ import org.apache.druid.collections.DefaultBlockingPool;
import org.apache.druid.collections.NonBlockingPool;
import org.apache.druid.collections.StupidPool;
import org.apache.druid.common.config.NullHandling;
-import org.apache.druid.data.input.InputRow;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
@@ -85,8 +85,11 @@ import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.generator.DataGenerator;
import org.apache.druid.segment.generator.GeneratorBasicSchemas;
import org.apache.druid.segment.generator.GeneratorSchemaInfo;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
import org.apache.druid.segment.incremental.IncrementalIndex;
+import org.apache.druid.segment.incremental.IncrementalIndexCreator;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.segment.serde.ComplexMetrics;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.timeline.SegmentId;
@@ -122,9 +125,6 @@ import java.util.concurrent.TimeUnit;
@Measurement(iterations = 25)
public class GroupByBenchmark
{
- @Param({"4"})
- private int numSegments;
-
@Param({"2", "4"})
private int numProcessingThreads;
@@ -156,17 +156,12 @@ public class GroupByBenchmark
NullHandling.initializeForTests();
}
- private File tmpDir;
- private IncrementalIndex anIncrementalIndex;
- private List<QueryableIndex> queryableIndexes;
-
+ private AppendableIndexSpec appendableIndexSpec;
+ private DataGenerator generator;
private QueryRunnerFactory<ResultRow, GroupByQuery> factory;
-
private GeneratorSchemaInfo schemaInfo;
private GroupByQuery query;
- private ExecutorService executorService;
-
static {
JSON_MAPPER = new DefaultObjectMapper();
INDEX_IO = new IndexIO(
@@ -429,15 +424,16 @@ public class GroupByBenchmark
SCHEMA_QUERY_MAP.put("nulls", nullQueries);
}
+ /**
+ * Setup everything common for benchmarking both the incremental-index and the queriable-index.
+ */
@Setup(Level.Trial)
- public void setup() throws IOException
+ public void setup()
{
log.info("SETUP CALLED AT " + +System.currentTimeMillis());
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde());
- executorService = Execs.multiThreaded(numProcessingThreads, "GroupByThreadPool[%d]");
-
setupQueries();
String[] schemaQuery = schemaAndQuery.split("\\.");
@@ -447,58 +443,13 @@ public class GroupByBenchmark
schemaInfo = GeneratorBasicSchemas.SCHEMA_MAP.get(schemaName);
query = SCHEMA_QUERY_MAP.get(schemaName).get(queryName);
- final DataGenerator dataGenerator = new DataGenerator(
+ generator = new DataGenerator(
schemaInfo.getColumnSchemas(),
- RNG_SEED + 1,
+ RNG_SEED,
schemaInfo.getDataInterval(),
rowsPerSegment
);
- tmpDir = FileUtils.createTempDir();
- log.info("Using temp dir: %s", tmpDir.getAbsolutePath());
-
- // queryableIndexes -> numSegments worth of on-disk segments
- // anIncrementalIndex -> the last incremental index
- anIncrementalIndex = null;
- queryableIndexes = new ArrayList<>(numSegments);
-
- for (int i = 0; i < numSegments; i++) {
- log.info("Generating rows for segment %d/%d", i + 1, numSegments);
-
- final IncrementalIndex index = makeIncIndex(schemaInfo.isWithRollup());
-
- for (int j = 0; j < rowsPerSegment; j++) {
- final InputRow row = dataGenerator.nextRow();
- if (j % 20000 == 0) {
- log.info("%,d/%,d rows generated.", i * rowsPerSegment + j, rowsPerSegment * numSegments);
- }
- index.add(row);
- }
-
- log.info(
- "%,d/%,d rows generated, persisting segment %d/%d.",
- (i + 1) * rowsPerSegment,
- rowsPerSegment * numSegments,
- i + 1,
- numSegments
- );
-
- final File file = INDEX_MERGER_V9.persist(
- index,
- new File(tmpDir, String.valueOf(i)),
- new IndexSpec(),
- null
- );
-
- queryableIndexes.add(INDEX_IO.loadIndex(file));
-
- if (i == numSegments - 1) {
- anIncrementalIndex = index;
- } else {
- index.close();
- }
- }
-
NonBlockingPool<ByteBuffer> bufferPool = new StupidPool<>(
"GroupByBenchmark-computeBufferPool",
new OffheapBufferGenerator("compute", 250_000_000),
@@ -576,43 +527,114 @@ public class GroupByBenchmark
);
}
- private IncrementalIndex makeIncIndex(boolean withRollup)
+ /**
+ * Setup/teardown everything specific for benchmarking the incremental-index.
+ */
+ @State(Scope.Benchmark)
+ public static class IncrementalIndexState
{
- return new IncrementalIndex.Builder()
- .setIndexSchema(
- new IncrementalIndexSchema.Builder()
- .withDimensionsSpec(schemaInfo.getDimensionsSpec())
- .withMetrics(schemaInfo.getAggsArray())
- .withRollup(withRollup)
- .build()
- )
- .setConcurrentEventAdd(true)
- .setMaxRowCount(rowsPerSegment)
- .buildOnheap();
+ @Param({"onheap", "offheap"})
+ private String indexType;
+
+ IncrementalIndex<?> incIndex;
+
+ @Setup(Level.Trial)
+ public void setup(GroupByBenchmark global) throws JsonProcessingException
+ {
+ // Creates an AppendableIndexSpec that corresponds to the indexType parametrization.
+ // It is used in {@code global.makeIncIndex()} to instanciate an incremental-index of the specified type.
+ global.appendableIndexSpec = IncrementalIndexCreator.parseIndexType(indexType);
+ incIndex = global.makeIncIndex(global.schemaInfo.isWithRollup());
+ global.generator.addToIndex(incIndex, global.rowsPerSegment);
+ }
+
+ @TearDown(Level.Trial)
+ public void tearDown()
+ {
+ if (incIndex != null) {
+ incIndex.close();
+ }
+ }
}
- @TearDown(Level.Trial)
- public void tearDown()
+ /**
+ * Setup/teardown everything specific for benchmarking the queriable-index.
+ */
+ @State(Scope.Benchmark)
+ public static class QueryableIndexState
{
- try {
- if (anIncrementalIndex != null) {
- anIncrementalIndex.close();
+ @Param({"4"})
+ private int numSegments;
+
+ private ExecutorService executorService;
+ private File qIndexesDir;
+ private List<QueryableIndex> queryableIndexes;
+
+ @Setup(Level.Trial)
+ public void setup(GroupByBenchmark global) throws IOException
+ {
+ global.appendableIndexSpec = new OnheapIncrementalIndex.Spec();
+
+ executorService = Execs.multiThreaded(global.numProcessingThreads, "GroupByThreadPool[%d]");
+
+ qIndexesDir = FileUtils.createTempDir();
+
+ // numSegments worth of on-disk segments
+ queryableIndexes = new ArrayList<>();
+
+ for (int i = 0; i < numSegments; i++) {
+ log.info("Generating rows for segment %d/%d", i + 1, numSegments);
+
+ final IncrementalIndex<?> incIndex = global.makeIncIndex(global.schemaInfo.isWithRollup());
+ global.generator.reset(RNG_SEED + i).addToIndex(incIndex, global.rowsPerSegment);
+
+ log.info(
+ "%,d/%,d rows generated, persisting segment %d/%d.",
+ (i + 1) * global.rowsPerSegment,
+ global.rowsPerSegment * numSegments,
+ i + 1,
+ numSegments
+ );
+
+ File indexFile = INDEX_MERGER_V9.persist(
+ incIndex,
+ new File(qIndexesDir, String.valueOf(i)),
+ new IndexSpec(),
+ null
+ );
+ incIndex.close();
+
+ queryableIndexes.add(INDEX_IO.loadIndex(indexFile));
}
+ }
- if (queryableIndexes != null) {
- for (QueryableIndex index : queryableIndexes) {
+ @TearDown(Level.Trial)
+ public void tearDown()
+ {
+ for (QueryableIndex index : queryableIndexes) {
+ if (index != null) {
index.close();
}
}
-
- if (tmpDir != null) {
- FileUtils.deleteDirectory(tmpDir);
+ if (qIndexesDir != null) {
+ qIndexesDir.delete();
}
}
- catch (IOException e) {
- log.warn(e, "Failed to tear down, temp dir was: %s", tmpDir);
- throw new RuntimeException(e);
- }
+ }
+
+ private IncrementalIndex<?> makeIncIndex(boolean withRollup)
+ {
+ return appendableIndexSpec.builder()
+ .setIndexSchema(
+ new IncrementalIndexSchema.Builder()
+ .withDimensionsSpec(schemaInfo.getDimensionsSpec())
+ .withMetrics(schemaInfo.getAggsArray())
+ .withRollup(withRollup)
+ .build()
+ )
+ .setConcurrentEventAdd(true)
+ .setMaxRowCount(rowsPerSegment)
+ .build();
}
private static <T> Sequence<T> runQuery(QueryRunnerFactory factory, QueryRunner runner, Query<T> query)
@@ -629,12 +651,12 @@ public class GroupByBenchmark
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
- public void querySingleIncrementalIndex(Blackhole blackhole)
+ public void querySingleIncrementalIndex(Blackhole blackhole, IncrementalIndexState state)
{
QueryRunner<ResultRow> runner = QueryBenchmarkUtil.makeQueryRunner(
factory,
SegmentId.dummy("incIndex"),
- new IncrementalIndexSegment(anIncrementalIndex, SegmentId.dummy("incIndex"))
+ new IncrementalIndexSegment(state.incIndex, SegmentId.dummy("incIndex"))
);
final Sequence<ResultRow> results = GroupByBenchmark.runQuery(factory, runner, query);
@@ -649,12 +671,12 @@ public class GroupByBenchmark
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
- public void querySingleQueryableIndex(Blackhole blackhole)
+ public void querySingleQueryableIndex(Blackhole blackhole, QueryableIndexState state)
{
QueryRunner<ResultRow> runner = QueryBenchmarkUtil.makeQueryRunner(
factory,
SegmentId.dummy("qIndex"),
- new QueryableIndexSegment(queryableIndexes.get(0), SegmentId.dummy("qIndex"))
+ new QueryableIndexSegment(state.queryableIndexes.get(0), SegmentId.dummy("qIndex"))
);
final Sequence<ResultRow> results = GroupByBenchmark.runQuery(factory, runner, query);
@@ -669,12 +691,12 @@ public class GroupByBenchmark
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
- public void queryMultiQueryableIndexX(Blackhole blackhole)
+ public void queryMultiQueryableIndexX(Blackhole blackhole, QueryableIndexState state)
{
QueryToolChest<ResultRow, GroupByQuery> toolChest = factory.getToolchest();
QueryRunner<ResultRow> theRunner = new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(
- factory.mergeRunners(executorService, makeMultiRunners())
+ factory.mergeRunners(state.executorService, makeMultiRunners(state))
),
(QueryToolChest) toolChest
);
@@ -691,12 +713,12 @@ public class GroupByBenchmark
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
- public void queryMultiQueryableIndexTTFR(Blackhole blackhole) throws IOException
+ public void queryMultiQueryableIndexTTFR(Blackhole blackhole, QueryableIndexState state) throws IOException
{
QueryToolChest<ResultRow, GroupByQuery> toolChest = factory.getToolchest();
QueryRunner<ResultRow> theRunner = new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(
- factory.mergeRunners(executorService, makeMultiRunners())
+ factory.mergeRunners(state.executorService, makeMultiRunners(state))
),
(QueryToolChest) toolChest
);
@@ -709,12 +731,12 @@ public class GroupByBenchmark
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
- public void queryMultiQueryableIndexWithSpilling(Blackhole blackhole)
+ public void queryMultiQueryableIndexWithSpilling(Blackhole blackhole, QueryableIndexState state)
{
QueryToolChest<ResultRow, GroupByQuery> toolChest = factory.getToolchest();
QueryRunner<ResultRow> theRunner = new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(
- factory.mergeRunners(executorService, makeMultiRunners())
+ factory.mergeRunners(state.executorService, makeMultiRunners(state))
),
(QueryToolChest) toolChest
);
@@ -735,12 +757,12 @@ public class GroupByBenchmark
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
- public void queryMultiQueryableIndexWithSpillingTTFR(Blackhole blackhole) throws IOException
+ public void queryMultiQueryableIndexWithSpillingTTFR(Blackhole blackhole, QueryableIndexState state) throws IOException
{
QueryToolChest<ResultRow, GroupByQuery> toolChest = factory.getToolchest();
QueryRunner<ResultRow> theRunner = new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(
- factory.mergeRunners(executorService, makeMultiRunners())
+ factory.mergeRunners(state.executorService, makeMultiRunners(state))
),
(QueryToolChest) toolChest
);
@@ -756,7 +778,7 @@ public class GroupByBenchmark
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
- public void queryMultiQueryableIndexWithSerde(Blackhole blackhole)
+ public void queryMultiQueryableIndexWithSerde(Blackhole blackhole, QueryableIndexState state)
{
QueryToolChest<ResultRow, GroupByQuery> toolChest = factory.getToolchest();
//noinspection unchecked
@@ -766,7 +788,7 @@ public class GroupByBenchmark
new DefaultObjectMapper(new SmileFactory()),
ResultRow.class,
toolChest.mergeResults(
- factory.mergeRunners(executorService, makeMultiRunners())
+ factory.mergeRunners(state.executorService, makeMultiRunners(state))
)
)
),
@@ -778,15 +800,15 @@ public class GroupByBenchmark
blackhole.consume(results);
}
- private List<QueryRunner<ResultRow>> makeMultiRunners()
+ private List<QueryRunner<ResultRow>> makeMultiRunners(QueryableIndexState state)
{
List<QueryRunner<ResultRow>> runners = new ArrayList<>();
- for (int i = 0; i < numSegments; i++) {
+ for (int i = 0; i < state.numSegments; i++) {
String segmentName = "qIndex " + i;
QueryRunner<ResultRow> runner = QueryBenchmarkUtil.makeQueryRunner(
factory,
SegmentId.dummy(segmentName),
- new QueryableIndexSegment(queryableIndexes.get(i), SegmentId.dummy(segmentName))
+ new QueryableIndexSegment(state.queryableIndexes.get(i), SegmentId.dummy(segmentName))
);
runners.add(factory.getToolchest().preMergeQueryDecoration(runner));
}
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/ScanBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/ScanBenchmark.java
index b543076..10c31b5 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/ScanBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/ScanBenchmark.java
@@ -19,11 +19,11 @@
package org.apache.druid.benchmark.query;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.common.config.NullHandling;
-import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.Row;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.FileUtils;
@@ -67,7 +67,10 @@ import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.generator.DataGenerator;
import org.apache.druid.segment.generator.GeneratorBasicSchemas;
import org.apache.druid.segment.generator.GeneratorSchemaInfo;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
import org.apache.druid.segment.incremental.IncrementalIndex;
+import org.apache.druid.segment.incremental.IncrementalIndexCreator;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.segment.serde.ComplexMetrics;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.timeline.SegmentId;
@@ -102,12 +105,6 @@ import java.util.concurrent.TimeUnit;
@Measurement(iterations = 25)
public class ScanBenchmark
{
- @Param({"2", "4"})
- private int numSegments;
-
- @Param({"2"})
- private int numProcessingThreads;
-
@Param({"200000"})
private int rowsPerSegment;
@@ -121,6 +118,7 @@ public class ScanBenchmark
private static ScanQuery.Order ordering;
private static final Logger log = new Logger(ScanBenchmark.class);
+ private static final int RNG_SEED = 9999;
private static final ObjectMapper JSON_MAPPER;
private static final IndexMergerV9 INDEX_MERGER_V9;
private static final IndexIO INDEX_IO;
@@ -129,16 +127,12 @@ public class ScanBenchmark
NullHandling.initializeForTests();
}
- private List<IncrementalIndex> incIndexes;
- private List<QueryableIndex> qIndexes;
-
+ private AppendableIndexSpec appendableIndexSpec;
+ private DataGenerator generator;
private QueryRunnerFactory factory;
private GeneratorSchemaInfo schemaInfo;
private Druids.ScanQueryBuilder queryBuilder;
private ScanQuery query;
- private File tmpDir;
-
- private ExecutorService executorService;
static {
JSON_MAPPER = new DefaultObjectMapper();
@@ -242,15 +236,16 @@ public class ScanBenchmark
.order(ordering);
}
+ /**
+ * Setup everything common for benchmarking both the incremental-index and the queriable-index.
+ */
@Setup
- public void setup() throws IOException
+ public void setup()
{
log.info("SETUP CALLED AT " + +System.currentTimeMillis());
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde());
- executorService = Execs.multiThreaded(numProcessingThreads, "ScanThreadPool");
-
setupQueries();
String[] schemaQuery = schemaAndQuery.split("\\.");
@@ -262,43 +257,12 @@ public class ScanBenchmark
queryBuilder.limit(limit);
query = queryBuilder.build();
- incIndexes = new ArrayList<>();
- for (int i = 0; i < numSegments; i++) {
- log.info("Generating rows for segment " + i);
- DataGenerator gen = new DataGenerator(
- schemaInfo.getColumnSchemas(),
- System.currentTimeMillis(),
- schemaInfo.getDataInterval(),
- rowsPerSegment
- );
-
- IncrementalIndex incIndex = makeIncIndex();
-
- for (int j = 0; j < rowsPerSegment; j++) {
- InputRow row = gen.nextRow();
- if (j % 10000 == 0) {
- log.info(j + " rows generated.");
- }
- incIndex.add(row);
- }
- incIndexes.add(incIndex);
- }
-
- tmpDir = FileUtils.createTempDir();
- log.info("Using temp dir: " + tmpDir.getAbsolutePath());
-
- qIndexes = new ArrayList<>();
- for (int i = 0; i < numSegments; i++) {
- File indexFile = INDEX_MERGER_V9.persist(
- incIndexes.get(i),
- tmpDir,
- new IndexSpec(),
- null
- );
-
- QueryableIndex qIndex = INDEX_IO.loadIndex(indexFile);
- qIndexes.add(qIndex);
- }
+ generator = new DataGenerator(
+ schemaInfo.getColumnSchemas(),
+ System.currentTimeMillis(),
+ schemaInfo.getDataInterval(),
+ rowsPerSegment
+ );
final ScanQueryConfig config = new ScanQueryConfig().setLegacy(false);
factory = new ScanQueryRunnerFactory(
@@ -311,18 +275,100 @@ public class ScanBenchmark
);
}
- @TearDown
- public void tearDown() throws IOException
+ /**
+ * Setup/teardown everything specific for benchmarking the incremental-index.
+ */
+ @State(Scope.Benchmark)
+ public static class IncrementalIndexState
{
- FileUtils.deleteDirectory(tmpDir);
+ @Param({"onheap", "offheap"})
+ private String indexType;
+
+ IncrementalIndex<?> incIndex;
+
+ @Setup
+ public void setup(ScanBenchmark global) throws JsonProcessingException
+ {
+ // Creates an AppendableIndexSpec that corresponds to the indexType parametrization.
+ // It is used in {@code global.makeIncIndex()} to instanciate an incremental-index of the specified type.
+ global.appendableIndexSpec = IncrementalIndexCreator.parseIndexType(indexType);
+ incIndex = global.makeIncIndex();
+ global.generator.addToIndex(incIndex, global.rowsPerSegment);
+ }
+
+ @TearDown
+ public void tearDown()
+ {
+ if (incIndex != null) {
+ incIndex.close();
+ }
+ }
+ }
+
+ /**
+ * Setup/teardown everything specific for benchmarking the queriable-index.
+ */
+ @State(Scope.Benchmark)
+ public static class QueryableIndexState
+ {
+ @Param({"2", "4"})
+ private int numSegments;
+
+ @Param({"2"})
+ private int numProcessingThreads;
+
+ private ExecutorService executorService;
+ private File qIndexesDir;
+ private List<QueryableIndex> qIndexes;
+
+ @Setup
+ public void setup(ScanBenchmark global) throws IOException
+ {
+ global.appendableIndexSpec = new OnheapIncrementalIndex.Spec();
+
+ executorService = Execs.multiThreaded(numProcessingThreads, "ScanThreadPool");
+
+ qIndexesDir = FileUtils.createTempDir();
+ qIndexes = new ArrayList<>();
+
+ for (int i = 0; i < numSegments; i++) {
+ log.info("Generating rows for segment " + i);
+
+ IncrementalIndex<?> incIndex = global.makeIncIndex();
+ global.generator.reset(RNG_SEED + i).addToIndex(incIndex, global.rowsPerSegment);
+
+ File indexFile = INDEX_MERGER_V9.persist(
+ incIndex,
+ new File(qIndexesDir, String.valueOf(i)),
+ new IndexSpec(),
+ null
+ );
+ incIndex.close();
+
+ qIndexes.add(INDEX_IO.loadIndex(indexFile));
+ }
+ }
+
+ @TearDown
+ public void tearDown()
+ {
+ for (QueryableIndex index : qIndexes) {
+ if (index != null) {
+ index.close();
+ }
+ }
+ if (qIndexesDir != null) {
+ qIndexesDir.delete();
+ }
+ }
}
- private IncrementalIndex makeIncIndex()
+ private IncrementalIndex<?> makeIncIndex()
{
- return new IncrementalIndex.Builder()
+ return appendableIndexSpec.builder()
.setSimpleTestingIndexSchema(schemaInfo.getAggsArray())
.setMaxRowCount(rowsPerSegment)
- .buildOnheap();
+ .build();
}
private static <T> List<T> runQuery(QueryRunnerFactory factory, QueryRunner runner, Query<T> query)
@@ -340,12 +386,12 @@ public class ScanBenchmark
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
- public void querySingleIncrementalIndex(Blackhole blackhole)
+ public void querySingleIncrementalIndex(Blackhole blackhole, IncrementalIndexState state)
{
QueryRunner<ScanResultValue> runner = QueryBenchmarkUtil.makeQueryRunner(
factory,
SegmentId.dummy("incIndex"),
- new IncrementalIndexSegment(incIndexes.get(0), SegmentId.dummy("incIndex"))
+ new IncrementalIndexSegment(state.incIndex, SegmentId.dummy("incIndex"))
);
Query effectiveQuery = query
@@ -372,12 +418,12 @@ public class ScanBenchmark
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
- public void querySingleQueryableIndex(Blackhole blackhole)
+ public void querySingleQueryableIndex(Blackhole blackhole, QueryableIndexState state)
{
final QueryRunner<Result<ScanResultValue>> runner = QueryBenchmarkUtil.makeQueryRunner(
factory,
SegmentId.dummy("qIndex"),
- new QueryableIndexSegment(qIndexes.get(0), SegmentId.dummy("qIndex"))
+ new QueryableIndexSegment(state.qIndexes.get(0), SegmentId.dummy("qIndex"))
);
Query effectiveQuery = query
@@ -404,17 +450,17 @@ public class ScanBenchmark
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
- public void queryMultiQueryableIndex(Blackhole blackhole)
+ public void queryMultiQueryableIndex(Blackhole blackhole, QueryableIndexState state)
{
List<SegmentDescriptor> segmentDescriptors = new ArrayList<>();
List<QueryRunner<Row>> runners = new ArrayList<>();
QueryToolChest toolChest = factory.getToolchest();
- for (int i = 0; i < numSegments; i++) {
+ for (int i = 0; i < state.numSegments; i++) {
String segmentName = "qIndex";
final QueryRunner<Result<ScanResultValue>> runner = QueryBenchmarkUtil.makeQueryRunner(
factory,
SegmentId.dummy(segmentName),
- new QueryableIndexSegment(qIndexes.get(i), SegmentId.dummy(segmentName, i))
+ new QueryableIndexSegment(state.qIndexes.get(i), SegmentId.dummy(segmentName, i))
);
segmentDescriptors.add(
new SegmentDescriptor(
@@ -428,7 +474,7 @@ public class ScanBenchmark
QueryRunner theRunner = toolChest.postMergeQueryDecoration(
new FinalizeResultsQueryRunner<>(
- toolChest.mergeResults(factory.mergeRunners(executorService, runners)),
+ toolChest.mergeResults(factory.mergeRunners(state.executorService, runners)),
toolChest
)
);
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SearchBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SearchBenchmark.java
index 680a179f..2060591 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SearchBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SearchBenchmark.java
@@ -20,12 +20,12 @@
package org.apache.druid.benchmark.query;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.druid.common.config.NullHandling;
-import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.Row;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.FileUtils;
@@ -74,7 +74,10 @@ import org.apache.druid.segment.column.ColumnConfig;
import org.apache.druid.segment.generator.DataGenerator;
import org.apache.druid.segment.generator.GeneratorBasicSchemas;
import org.apache.druid.segment.generator.GeneratorSchemaInfo;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
import org.apache.druid.segment.incremental.IncrementalIndex;
+import org.apache.druid.segment.incremental.IncrementalIndexCreator;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.segment.serde.ComplexMetrics;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.timeline.SegmentId;
@@ -108,9 +111,6 @@ import java.util.concurrent.TimeUnit;
@Measurement(iterations = 25)
public class SearchBenchmark
{
- @Param({"1"})
- private int numSegments;
-
@Param({"750000"})
private int rowsPerSegment;
@@ -121,6 +121,7 @@ public class SearchBenchmark
private int limit;
private static final Logger log = new Logger(SearchBenchmark.class);
+ private static final int RNG_SEED = 9999;
private static final IndexMergerV9 INDEX_MERGER_V9;
private static final IndexIO INDEX_IO;
public static final ObjectMapper JSON_MAPPER;
@@ -129,16 +130,12 @@ public class SearchBenchmark
NullHandling.initializeForTests();
}
- private List<IncrementalIndex> incIndexes;
- private List<QueryableIndex> qIndexes;
-
+ private AppendableIndexSpec appendableIndexSpec;
+ private DataGenerator generator;
private QueryRunnerFactory factory;
private GeneratorSchemaInfo schemaInfo;
private Druids.SearchQueryBuilder queryBuilder;
private SearchQuery query;
- private File tmpDir;
-
- private ExecutorService executorService;
static {
JSON_MAPPER = new DefaultObjectMapper();
@@ -312,15 +309,16 @@ public class SearchBenchmark
.filters(new AndDimFilter(dimFilters));
}
+ /**
+ * Setup everything common for benchmarking both the incremental-index and the queriable-index.
+ */
@Setup
- public void setup() throws IOException
+ public void setup()
{
log.info("SETUP CALLED AT " + +System.currentTimeMillis());
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde());
- executorService = Execs.multiThreaded(numSegments, "SearchThreadPool");
-
setupQueries();
String[] schemaQuery = schemaAndQuery.split("\\.");
@@ -332,43 +330,12 @@ public class SearchBenchmark
queryBuilder.limit(limit);
query = queryBuilder.build();
- incIndexes = new ArrayList<>();
- for (int i = 0; i < numSegments; i++) {
- log.info("Generating rows for segment " + i);
- DataGenerator gen = new DataGenerator(
- schemaInfo.getColumnSchemas(),
- System.currentTimeMillis(),
- schemaInfo.getDataInterval(),
- rowsPerSegment
- );
-
- IncrementalIndex incIndex = makeIncIndex();
-
- for (int j = 0; j < rowsPerSegment; j++) {
- InputRow row = gen.nextRow();
- if (j % 10000 == 0) {
- log.info(j + " rows generated.");
- }
- incIndex.add(row);
- }
- incIndexes.add(incIndex);
- }
-
- tmpDir = FileUtils.createTempDir();
- log.info("Using temp dir: " + tmpDir.getAbsolutePath());
-
- qIndexes = new ArrayList<>();
- for (int i = 0; i < numSegments; i++) {
- File indexFile = INDEX_MERGER_V9.persist(
- incIndexes.get(i),
- tmpDir,
- new IndexSpec(),
- null
- );
-
- QueryableIndex qIndex = INDEX_IO.loadIndex(indexFile);
- qIndexes.add(qIndex);
- }
+ generator = new DataGenerator(
+ schemaInfo.getColumnSchemas(),
+ RNG_SEED,
+ schemaInfo.getDataInterval(),
+ rowsPerSegment
+ );
final SearchQueryConfig config = new SearchQueryConfig().withOverrides(query);
factory = new SearchQueryRunnerFactory(
@@ -378,18 +345,97 @@ public class SearchBenchmark
);
}
- @TearDown
- public void tearDown() throws IOException
+ /**
+ * Setup/teardown everything specific for benchmarking the incremental-index.
+ */
+ @State(Scope.Benchmark)
+ public static class IncrementalIndexState
+ {
+ @Param({"onheap", "offheap"})
+ private String indexType;
+
+ IncrementalIndex<?> incIndex;
+
+ @Setup
+ public void setup(SearchBenchmark global) throws JsonProcessingException
+ {
+ // Creates an AppendableIndexSpec that corresponds to the indexType parametrization.
+ // It is used in {@code global.makeIncIndex()} to instanciate an incremental-index of the specified type.
+ global.appendableIndexSpec = IncrementalIndexCreator.parseIndexType(indexType);
+ incIndex = global.makeIncIndex();
+ global.generator.addToIndex(incIndex, global.rowsPerSegment);
+ }
+
+ @TearDown
+ public void tearDown()
+ {
+ if (incIndex != null) {
+ incIndex.close();
+ }
+ }
+ }
+
+ /**
+ * Setup/teardown everything specific for benchmarking the queriable-index.
+ */
+ @State(Scope.Benchmark)
+ public static class QueryableIndexState
{
- FileUtils.deleteDirectory(tmpDir);
+ @Param({"1"})
+ private int numSegments;
+
+ private ExecutorService executorService;
+ private File qIndexesDir;
+ private List<QueryableIndex> qIndexes;
+
+ @Setup
+ public void setup(SearchBenchmark global) throws IOException
+ {
+ global.appendableIndexSpec = new OnheapIncrementalIndex.Spec();
+
+ executorService = Execs.multiThreaded(numSegments, "SearchThreadPool");
+
+ qIndexesDir = FileUtils.createTempDir();
+ qIndexes = new ArrayList<>();
+
+ for (int i = 0; i < numSegments; i++) {
+ log.info("Generating rows for segment " + i);
+
+ IncrementalIndex<?> incIndex = global.makeIncIndex();
+ global.generator.reset(RNG_SEED + i).addToIndex(incIndex, global.rowsPerSegment);
+
+ File indexFile = INDEX_MERGER_V9.persist(
+ incIndex,
+ new File(qIndexesDir, String.valueOf(i)),
+ new IndexSpec(),
+ null
+ );
+ incIndex.close();
+
+ qIndexes.add(INDEX_IO.loadIndex(indexFile));
+ }
+ }
+
+ @TearDown
+ public void tearDown()
+ {
+ for (QueryableIndex index : qIndexes) {
+ if (index != null) {
+ index.close();
+ }
+ }
+ if (qIndexesDir != null) {
+ qIndexesDir.delete();
+ }
+ }
}
- private IncrementalIndex makeIncIndex()
+ private IncrementalIndex<?> makeIncIndex()
{
- return new IncrementalIndex.Builder()
+ return appendableIndexSpec.builder()
.setSimpleTestingIndexSchema(schemaInfo.getAggsArray())
.setMaxRowCount(rowsPerSegment)
- .buildOnheap();
+ .build();
}
private static <T> List<T> runQuery(QueryRunnerFactory factory, QueryRunner runner, Query<T> query)
@@ -407,12 +453,12 @@ public class SearchBenchmark
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
- public void querySingleIncrementalIndex(Blackhole blackhole)
+ public void querySingleIncrementalIndex(Blackhole blackhole, IncrementalIndexState state)
{
QueryRunner<SearchHit> runner = QueryBenchmarkUtil.makeQueryRunner(
factory,
SegmentId.dummy("incIndex"),
- new IncrementalIndexSegment(incIndexes.get(0), SegmentId.dummy("incIndex"))
+ new IncrementalIndexSegment(state.incIndex, SegmentId.dummy("incIndex"))
);
List<Result<SearchResultValue>> results = SearchBenchmark.runQuery(factory, runner, query);
@@ -422,12 +468,12 @@ public class SearchBenchmark
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
- public void querySingleQueryableIndex(Blackhole blackhole)
+ public void querySingleQueryableIndex(Blackhole blackhole, QueryableIndexState state)
{
final QueryRunner<Result<SearchResultValue>> runner = QueryBenchmarkUtil.makeQueryRunner(
factory,
SegmentId.dummy("qIndex"),
- new QueryableIndexSegment(qIndexes.get(0), SegmentId.dummy("qIndex"))
+ new QueryableIndexSegment(state.qIndexes.get(0), SegmentId.dummy("qIndex"))
);
List<Result<SearchResultValue>> results = SearchBenchmark.runQuery(factory, runner, query);
@@ -438,23 +484,23 @@ public class SearchBenchmark
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
- public void queryMultiQueryableIndex(Blackhole blackhole)
+ public void queryMultiQueryableIndex(Blackhole blackhole, QueryableIndexState state)
{
List<QueryRunner<Row>> singleSegmentRunners = new ArrayList<>();
QueryToolChest toolChest = factory.getToolchest();
- for (int i = 0; i < numSegments; i++) {
+ for (int i = 0; i < state.numSegments; i++) {
String segmentName = "qIndex " + i;
final QueryRunner<Result<SearchResultValue>> runner = QueryBenchmarkUtil.makeQueryRunner(
factory,
SegmentId.dummy(segmentName),
- new QueryableIndexSegment(qIndexes.get(i), SegmentId.dummy(segmentName))
+ new QueryableIndexSegment(state.qIndexes.get(i), SegmentId.dummy(segmentName))
);
singleSegmentRunners.add(toolChest.preMergeQueryDecoration(runner));
}
QueryRunner theRunner = toolChest.postMergeQueryDecoration(
new FinalizeResultsQueryRunner<>(
- toolChest.mergeResults(factory.mergeRunners(executorService, singleSegmentRunners)),
+ toolChest.mergeResults(factory.mergeRunners(state.executorService, singleSegmentRunners)),
toolChest
)
);
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/TimeseriesBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/TimeseriesBenchmark.java
index 875192a..98b3dd5 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/TimeseriesBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/TimeseriesBenchmark.java
@@ -19,9 +19,9 @@
package org.apache.druid.benchmark.query;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.common.config.NullHandling;
-import org.apache.druid.data.input.InputRow;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.Intervals;
@@ -68,7 +68,10 @@ import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.generator.DataGenerator;
import org.apache.druid.segment.generator.GeneratorBasicSchemas;
import org.apache.druid.segment.generator.GeneratorSchemaInfo;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
import org.apache.druid.segment.incremental.IncrementalIndex;
+import org.apache.druid.segment.incremental.IncrementalIndexCreator;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.segment.serde.ComplexMetrics;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.timeline.SegmentId;
@@ -102,15 +105,15 @@ import java.util.concurrent.TimeUnit;
@Measurement(iterations = 25)
public class TimeseriesBenchmark
{
- @Param({"1"})
- private int numSegments;
-
@Param({"750000"})
private int rowsPerSegment;
@Param({"basic.A", "basic.timeFilterNumeric", "basic.timeFilterAlphanumeric", "basic.timeFilterByInterval"})
private String schemaAndQuery;
+ @Param({"true", "false"})
+ private boolean descending;
+
private static final Logger log = new Logger(TimeseriesBenchmark.class);
private static final int RNG_SEED = 9999;
private static final IndexMergerV9 INDEX_MERGER_V9;
@@ -121,16 +124,12 @@ public class TimeseriesBenchmark
NullHandling.initializeForTests();
}
- private List<IncrementalIndex> incIndexes;
- private List<QueryableIndex> qIndexes;
- private File tmpDir;
-
+ private AppendableIndexSpec appendableIndexSpec;
+ private DataGenerator generator;
private QueryRunnerFactory factory;
private GeneratorSchemaInfo schemaInfo;
private TimeseriesQuery query;
- private ExecutorService executorService;
-
static {
JSON_MAPPER = new DefaultObjectMapper();
INDEX_IO = new IndexIO(
@@ -171,7 +170,7 @@ public class TimeseriesBenchmark
.granularity(Granularities.ALL)
.intervals(intervalSpec)
.aggregators(queryAggs)
- .descending(false)
+ .descending(descending)
.build();
basicQueries.put("A", queryA);
@@ -191,7 +190,7 @@ public class TimeseriesBenchmark
.granularity(Granularities.ALL)
.intervals(intervalSpec)
.aggregators(queryAggs)
- .descending(false)
+ .descending(descending)
.build();
basicQueries.put("timeFilterNumeric", timeFilterQuery);
@@ -211,7 +210,7 @@ public class TimeseriesBenchmark
.granularity(Granularities.ALL)
.intervals(intervalSpec)
.aggregators(queryAggs)
- .descending(false)
+ .descending(descending)
.build();
basicQueries.put("timeFilterAlphanumeric", timeFilterQuery);
@@ -228,7 +227,7 @@ public class TimeseriesBenchmark
.granularity(Granularities.ALL)
.intervals(intervalSpec)
.aggregators(queryAggs)
- .descending(false)
+ .descending(descending)
.build();
basicQueries.put("timeFilterByInterval", timeFilterQuery);
@@ -238,15 +237,16 @@ public class TimeseriesBenchmark
SCHEMA_QUERY_MAP.put("basic", basicQueries);
}
+ /**
+ * Setup everything common for benchmarking both the incremental-index and the queriable-index.
+ */
@Setup
- public void setup() throws IOException
+ public void setup()
{
log.info("SETUP CALLED AT " + System.currentTimeMillis());
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde());
- executorService = Execs.multiThreaded(numSegments, "TimeseriesThreadPool");
-
setupQueries();
String[] schemaQuery = schemaAndQuery.split("\\.");
@@ -256,44 +256,12 @@ public class TimeseriesBenchmark
schemaInfo = GeneratorBasicSchemas.SCHEMA_MAP.get(schemaName);
query = SCHEMA_QUERY_MAP.get(schemaName).get(queryName);
- incIndexes = new ArrayList<>();
- for (int i = 0; i < numSegments; i++) {
- log.info("Generating rows for segment " + i);
- DataGenerator gen = new DataGenerator(
- schemaInfo.getColumnSchemas(),
- RNG_SEED + i,
- schemaInfo.getDataInterval(),
- rowsPerSegment
- );
-
- IncrementalIndex incIndex = makeIncIndex();
-
- for (int j = 0; j < rowsPerSegment; j++) {
- InputRow row = gen.nextRow();
- if (j % 10000 == 0) {
- log.info(j + " rows generated.");
- }
- incIndex.add(row);
- }
- log.info(rowsPerSegment + " rows generated");
- incIndexes.add(incIndex);
- }
-
- tmpDir = FileUtils.createTempDir();
- log.info("Using temp dir: " + tmpDir.getAbsolutePath());
-
- qIndexes = new ArrayList<>();
- for (int i = 0; i < numSegments; i++) {
- File indexFile = INDEX_MERGER_V9.persist(
- incIndexes.get(i),
- tmpDir,
- new IndexSpec(),
- null
- );
-
- QueryableIndex qIndex = INDEX_IO.loadIndex(indexFile);
- qIndexes.add(qIndex);
- }
+ generator = new DataGenerator(
+ schemaInfo.getColumnSchemas(),
+ RNG_SEED,
+ schemaInfo.getDataInterval(),
+ rowsPerSegment
+ );
factory = new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(),
@@ -302,18 +270,97 @@ public class TimeseriesBenchmark
);
}
- @TearDown
- public void tearDown() throws IOException
+ /**
+ * Setup/teardown everything specific for benchmarking the incremental-index.
+ */
+ @State(Scope.Benchmark)
+ public static class IncrementalIndexState
{
- FileUtils.deleteDirectory(tmpDir);
+ @Param({"onheap", "offheap"})
+ private String indexType;
+
+ IncrementalIndex<?> incIndex;
+
+ @Setup
+ public void setup(TimeseriesBenchmark global) throws JsonProcessingException
+ {
+ // Creates an AppendableIndexSpec that corresponds to the indexType parametrization.
+ // It is used in {@code global.makeIncIndex()} to instanciate an incremental-index of the specified type.
+ global.appendableIndexSpec = IncrementalIndexCreator.parseIndexType(indexType);
+ incIndex = global.makeIncIndex();
+ global.generator.addToIndex(incIndex, global.rowsPerSegment);
+ }
+
+ @TearDown
+ public void tearDown()
+ {
+ if (incIndex != null) {
+ incIndex.close();
+ }
+ }
+ }
+
+ /**
+ * Setup/teardown everything specific for benchmarking the queriable-index.
+ */
+ @State(Scope.Benchmark)
+ public static class QueryableIndexState
+ {
+ @Param({"1"})
+ private int numSegments;
+
+ private ExecutorService executorService;
+ private File qIndexesDir;
+ private List<QueryableIndex> qIndexes;
+
+ @Setup
+ public void setup(TimeseriesBenchmark global) throws IOException
+ {
+ global.appendableIndexSpec = new OnheapIncrementalIndex.Spec();
+
+ executorService = Execs.multiThreaded(numSegments, "TimeseriesThreadPool");
+
+ qIndexesDir = FileUtils.createTempDir();
+ qIndexes = new ArrayList<>();
+
+ for (int i = 0; i < numSegments; i++) {
+ log.info("Generating rows for segment " + i);
+
+ IncrementalIndex<?> incIndex = global.makeIncIndex();
+ global.generator.reset(RNG_SEED + i).addToIndex(incIndex, global.rowsPerSegment);
+
+ File indexFile = INDEX_MERGER_V9.persist(
+ incIndex,
+ new File(qIndexesDir, String.valueOf(i)),
+ new IndexSpec(),
+ null
+ );
+ incIndex.close();
+
+ qIndexes.add(INDEX_IO.loadIndex(indexFile));
+ }
+ }
+
+ @TearDown
+ public void tearDown()
+ {
+ for (QueryableIndex index : qIndexes) {
+ if (index != null) {
+ index.close();
+ }
+ }
+ if (qIndexesDir != null) {
+ qIndexesDir.delete();
+ }
+ }
}
- private IncrementalIndex makeIncIndex()
+ private IncrementalIndex<?> makeIncIndex()
{
- return new IncrementalIndex.Builder()
+ return appendableIndexSpec.builder()
.setSimpleTestingIndexSchema(schemaInfo.getAggsArray())
.setMaxRowCount(rowsPerSegment)
- .buildOnheap();
+ .build();
}
private static <T> List<T> runQuery(QueryRunnerFactory factory, QueryRunner runner, Query<T> query)
@@ -331,12 +378,12 @@ public class TimeseriesBenchmark
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
- public void querySingleIncrementalIndex(Blackhole blackhole)
+ public void querySingleIncrementalIndex(Blackhole blackhole, IncrementalIndexState state)
{
QueryRunner<Result<TimeseriesResultValue>> runner = QueryBenchmarkUtil.makeQueryRunner(
factory,
SegmentId.dummy("incIndex"),
- new IncrementalIndexSegment(incIndexes.get(0), SegmentId.dummy("incIndex"))
+ new IncrementalIndexSegment(state.incIndex, SegmentId.dummy("incIndex"))
);
List<Result<TimeseriesResultValue>> results = TimeseriesBenchmark.runQuery(factory, runner, query);
@@ -346,12 +393,12 @@ public class TimeseriesBenchmark
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
- public void querySingleQueryableIndex(Blackhole blackhole)
+ public void querySingleQueryableIndex(Blackhole blackhole, QueryableIndexState state)
{
final QueryRunner<Result<TimeseriesResultValue>> runner = QueryBenchmarkUtil.makeQueryRunner(
factory,
SegmentId.dummy("qIndex"),
- new QueryableIndexSegment(qIndexes.get(0), SegmentId.dummy("qIndex"))
+ new QueryableIndexSegment(state.qIndexes.get(0), SegmentId.dummy("qIndex"))
);
List<Result<TimeseriesResultValue>> results = TimeseriesBenchmark.runQuery(factory, runner, query);
@@ -361,12 +408,12 @@ public class TimeseriesBenchmark
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
- public void queryFilteredSingleQueryableIndex(Blackhole blackhole)
+ public void queryFilteredSingleQueryableIndex(Blackhole blackhole, QueryableIndexState state)
{
final QueryRunner<Result<TimeseriesResultValue>> runner = QueryBenchmarkUtil.makeQueryRunner(
factory,
SegmentId.dummy("qIndex"),
- new QueryableIndexSegment(qIndexes.get(0), SegmentId.dummy("qIndex"))
+ new QueryableIndexSegment(state.qIndexes.get(0), SegmentId.dummy("qIndex"))
);
DimFilter filter = new SelectorDimFilter("dimSequential", "399", null);
@@ -379,23 +426,23 @@ public class TimeseriesBenchmark
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
- public void queryMultiQueryableIndex(Blackhole blackhole)
+ public void queryMultiQueryableIndex(Blackhole blackhole, QueryableIndexState state)
{
List<QueryRunner<Result<TimeseriesResultValue>>> singleSegmentRunners = new ArrayList<>();
QueryToolChest toolChest = factory.getToolchest();
- for (int i = 0; i < numSegments; i++) {
+ for (int i = 0; i < state.numSegments; i++) {
SegmentId segmentId = SegmentId.dummy("qIndex " + i);
QueryRunner<Result<TimeseriesResultValue>> runner = QueryBenchmarkUtil.makeQueryRunner(
factory,
segmentId,
- new QueryableIndexSegment(qIndexes.get(i), segmentId)
+ new QueryableIndexSegment(state.qIndexes.get(i), segmentId)
);
singleSegmentRunners.add(toolChest.preMergeQueryDecoration(runner));
}
QueryRunner theRunner = toolChest.postMergeQueryDecoration(
new FinalizeResultsQueryRunner<>(
- toolChest.mergeResults(factory.mergeRunners(executorService, singleSegmentRunners)),
+ toolChest.mergeResults(factory.mergeRunners(state.executorService, singleSegmentRunners)),
toolChest
)
);
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/TopNBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/TopNBenchmark.java
index b290f30..6587024 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/TopNBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/TopNBenchmark.java
@@ -19,10 +19,10 @@
package org.apache.druid.benchmark.query;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.collections.StupidPool;
import org.apache.druid.common.config.NullHandling;
-import org.apache.druid.data.input.InputRow;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
@@ -65,7 +65,10 @@ import org.apache.druid.segment.column.ColumnConfig;
import org.apache.druid.segment.generator.DataGenerator;
import org.apache.druid.segment.generator.GeneratorBasicSchemas;
import org.apache.druid.segment.generator.GeneratorSchemaInfo;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
import org.apache.druid.segment.incremental.IncrementalIndex;
+import org.apache.druid.segment.incremental.IncrementalIndexCreator;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.segment.serde.ComplexMetrics;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.timeline.SegmentId;
@@ -99,9 +102,6 @@ import java.util.concurrent.TimeUnit;
@Measurement(iterations = 25)
public class TopNBenchmark
{
- @Param({"1"})
- private int numSegments;
-
@Param({"750000"})
private int rowsPerSegment;
@@ -121,16 +121,12 @@ public class TopNBenchmark
NullHandling.initializeForTests();
}
- private List<IncrementalIndex> incIndexes;
- private List<QueryableIndex> qIndexes;
-
+ private AppendableIndexSpec appendableIndexSpec;
+ private DataGenerator generator;
private QueryRunnerFactory factory;
private GeneratorSchemaInfo schemaInfo;
private TopNQueryBuilder queryBuilder;
private TopNQuery query;
- private File tmpDir;
-
- private ExecutorService executorService;
static {
JSON_MAPPER = new DefaultObjectMapper();
@@ -212,16 +208,16 @@ public class TopNBenchmark
SCHEMA_QUERY_MAP.put("basic", basicQueries);
}
-
+ /**
+ * Setup everything common for benchmarking both the incremental-index and the queriable-index.
+ */
@Setup
- public void setup() throws IOException
+ public void setup()
{
log.info("SETUP CALLED AT " + System.currentTimeMillis());
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde());
- executorService = Execs.multiThreaded(numSegments, "TopNThreadPool");
-
setupQueries();
String[] schemaQuery = schemaAndQuery.split("\\.");
@@ -233,44 +229,12 @@ public class TopNBenchmark
queryBuilder.threshold(threshold);
query = queryBuilder.build();
- incIndexes = new ArrayList<>();
- for (int i = 0; i < numSegments; i++) {
- log.info("Generating rows for segment " + i);
-
- DataGenerator gen = new DataGenerator(
- schemaInfo.getColumnSchemas(),
- RNG_SEED + i,
- schemaInfo.getDataInterval(),
- rowsPerSegment
- );
-
- IncrementalIndex incIndex = makeIncIndex();
-
- for (int j = 0; j < rowsPerSegment; j++) {
- InputRow row = gen.nextRow();
- if (j % 10000 == 0) {
- log.info(j + " rows generated.");
- }
- incIndex.add(row);
- }
- incIndexes.add(incIndex);
- }
-
- tmpDir = FileUtils.createTempDir();
- log.info("Using temp dir: " + tmpDir.getAbsolutePath());
-
- qIndexes = new ArrayList<>();
- for (int i = 0; i < numSegments; i++) {
- File indexFile = INDEX_MERGER_V9.persist(
- incIndexes.get(i),
- tmpDir,
- new IndexSpec(),
- null
- );
-
- QueryableIndex qIndex = INDEX_IO.loadIndex(indexFile);
- qIndexes.add(qIndex);
- }
+ generator = new DataGenerator(
+ schemaInfo.getColumnSchemas(),
+ RNG_SEED,
+ schemaInfo.getDataInterval(),
+ rowsPerSegment
+ );
factory = new TopNQueryRunnerFactory(
new StupidPool<>(
@@ -284,18 +248,95 @@ public class TopNBenchmark
);
}
- @TearDown
- public void tearDown() throws IOException
+ /**
+ * Setup/teardown everything specific for benchmarking the incremental-index.
+ */
+ @State(Scope.Benchmark)
+ public static class IncrementalIndexState
{
- FileUtils.deleteDirectory(tmpDir);
+ @Param({"onheap", "offheap"})
+ private String indexType;
+
+ IncrementalIndex<?> incIndex;
+
+ @Setup
+ public void setup(TopNBenchmark global) throws JsonProcessingException
+ {
+ // Creates an AppendableIndexSpec that corresponds to the indexType parametrization.
+ // It is used in {@code global.makeIncIndex()} to instanciate an incremental-index of the specified type.
+ global.appendableIndexSpec = IncrementalIndexCreator.parseIndexType(indexType);
+ incIndex = global.makeIncIndex();
+ global.generator.addToIndex(incIndex, global.rowsPerSegment);
+ }
+
+ @TearDown
+ public void tearDown()
+ {
+ incIndex.close();
+ }
+ }
+
+ /**
+ * Setup/teardown everything specific for benchmarking the queriable-index.
+ */
+ @State(Scope.Benchmark)
+ public static class QueryableIndexState
+ {
+ @Param({"1"})
+ private int numSegments;
+
+ private ExecutorService executorService;
+ private File qIndexesDir;
+ private List<QueryableIndex> qIndexes;
+
+ @Setup
+ public void setup(TopNBenchmark global) throws IOException
+ {
+ global.appendableIndexSpec = new OnheapIncrementalIndex.Spec();
+
+ executorService = Execs.multiThreaded(numSegments, "TopNThreadPool");
+
+ qIndexesDir = FileUtils.createTempDir();
+ qIndexes = new ArrayList<>();
+
+ for (int i = 0; i < numSegments; i++) {
+ log.info("Generating rows for segment " + i);
+
+ IncrementalIndex<?> incIndex = global.makeIncIndex();
+ global.generator.reset(RNG_SEED + i).addToIndex(incIndex, global.rowsPerSegment);
+
+ File indexFile = INDEX_MERGER_V9.persist(
+ incIndex,
+ new File(qIndexesDir, String.valueOf(i)),
+ new IndexSpec(),
+ null
+ );
+ incIndex.close();
+
+ qIndexes.add(INDEX_IO.loadIndex(indexFile));
+ }
+ }
+
+ @TearDown
+ public void tearDown()
+ {
+ for (QueryableIndex index : qIndexes) {
+ if (index != null) {
+ index.close();
+ }
+ }
+ if (qIndexesDir != null) {
+ qIndexesDir.delete();
+ }
+ }
}
- private IncrementalIndex makeIncIndex()
+ private IncrementalIndex<?> makeIncIndex()
{
- return new IncrementalIndex.Builder()
+ return appendableIndexSpec.builder()
.setSimpleTestingIndexSchema(schemaInfo.getAggsArray())
.setMaxRowCount(rowsPerSegment)
- .buildOnheap();
+ .build();
}
private static <T> List<T> runQuery(QueryRunnerFactory factory, QueryRunner runner, Query<T> query)
@@ -314,12 +355,12 @@ public class TopNBenchmark
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
- public void querySingleIncrementalIndex(Blackhole blackhole)
+ public void querySingleIncrementalIndex(Blackhole blackhole, IncrementalIndexState state)
{
QueryRunner<Result<TopNResultValue>> runner = QueryBenchmarkUtil.makeQueryRunner(
factory,
SegmentId.dummy("incIndex"),
- new IncrementalIndexSegment(incIndexes.get(0), SegmentId.dummy("incIndex"))
+ new IncrementalIndexSegment(state.incIndex, SegmentId.dummy("incIndex"))
);
List<Result<TopNResultValue>> results = TopNBenchmark.runQuery(factory, runner, query);
@@ -329,12 +370,12 @@ public class TopNBenchmark
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
- public void querySingleQueryableIndex(Blackhole blackhole)
+ public void querySingleQueryableIndex(Blackhole blackhole, QueryableIndexState state)
{
final QueryRunner<Result<TopNResultValue>> runner = QueryBenchmarkUtil.makeQueryRunner(
factory,
SegmentId.dummy("qIndex"),
- new QueryableIndexSegment(qIndexes.get(0), SegmentId.dummy("qIndex"))
+ new QueryableIndexSegment(state.qIndexes.get(0), SegmentId.dummy("qIndex"))
);
List<Result<TopNResultValue>> results = TopNBenchmark.runQuery(factory, runner, query);
@@ -344,23 +385,23 @@ public class TopNBenchmark
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
- public void queryMultiQueryableIndex(Blackhole blackhole)
+ public void queryMultiQueryableIndex(Blackhole blackhole, QueryableIndexState state)
{
List<QueryRunner<Result<TopNResultValue>>> singleSegmentRunners = new ArrayList<>();
QueryToolChest toolChest = factory.getToolchest();
- for (int i = 0; i < numSegments; i++) {
+ for (int i = 0; i < state.numSegments; i++) {
SegmentId segmentId = SegmentId.dummy("qIndex " + i);
QueryRunner<Result<TopNResultValue>> runner = QueryBenchmarkUtil.makeQueryRunner(
factory,
segmentId,
- new QueryableIndexSegment(qIndexes.get(i), segmentId)
+ new QueryableIndexSegment(state.qIndexes.get(i), segmentId)
);
singleSegmentRunners.add(toolChest.preMergeQueryDecoration(runner));
}
QueryRunner theRunner = toolChest.postMergeQueryDecoration(
new FinalizeResultsQueryRunner<>(
- toolChest.mergeResults(factory.mergeRunners(executorService, singleSegmentRunners)),
+ toolChest.mergeResults(factory.mergeRunners(state.executorService, singleSegmentRunners)),
toolChest
)
);
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/timecompare/TimeCompareBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/timecompare/TimeCompareBenchmark.java
index 406e627..0208fe1 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/timecompare/TimeCompareBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/timecompare/TimeCompareBenchmark.java
@@ -73,6 +73,7 @@ import org.apache.druid.segment.generator.DataGenerator;
import org.apache.druid.segment.generator.GeneratorBasicSchemas;
import org.apache.druid.segment.generator.GeneratorSchemaInfo;
import org.apache.druid.segment.incremental.IncrementalIndex;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.segment.serde.ComplexMetrics;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.timeline.SegmentId;
@@ -404,10 +405,10 @@ public class TimeCompareBenchmark
private IncrementalIndex makeIncIndex()
{
- return new IncrementalIndex.Builder()
+ return new OnheapIncrementalIndex.Builder()
.setSimpleTestingIndexSchema(schemaInfo.getAggsArray())
.setMaxRowCount(rowsPerSegment)
- .buildOnheap();
+ .build();
}
@Benchmark
diff --git a/extensions-contrib/distinctcount/src/test/java/org/apache/druid/query/aggregation/distinctcount/DistinctCountGroupByQueryTest.java b/extensions-contrib/distinctcount/src/test/java/org/apache/druid/query/aggregation/distinctcount/DistinctCountGroupByQueryTest.java
index 6c7db8e..3da07ff 100644
--- a/extensions-contrib/distinctcount/src/test/java/org/apache/druid/query/aggregation/distinctcount/DistinctCountGroupByQueryTest.java
+++ b/extensions-contrib/distinctcount/src/test/java/org/apache/druid/query/aggregation/distinctcount/DistinctCountGroupByQueryTest.java
@@ -42,6 +42,7 @@ import org.apache.druid.segment.Segment;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.junit.After;
import org.junit.Before;
@@ -78,7 +79,7 @@ public class DistinctCountGroupByQueryTest extends InitializedNullHandlingTest
@Test
public void testGroupByWithDistinctCountAgg() throws Exception
{
- IncrementalIndex index = new IncrementalIndex.Builder()
+ IncrementalIndex index = new OnheapIncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withQueryGranularity(Granularities.SECOND)
@@ -87,7 +88,7 @@ public class DistinctCountGroupByQueryTest extends InitializedNullHandlingTest
)
.setConcurrentEventAdd(true)
.setMaxRowCount(1000)
- .buildOnheap();
+ .build();
String visitor_id = "visitor_id";
String client_type = "client_type";
diff --git a/extensions-contrib/distinctcount/src/test/java/org/apache/druid/query/aggregation/distinctcount/DistinctCountTimeseriesQueryTest.java b/extensions-contrib/distinctcount/src/test/java/org/apache/druid/query/aggregation/distinctcount/DistinctCountTimeseriesQueryTest.java
index 2cc0526..f553bfc 100644
--- a/extensions-contrib/distinctcount/src/test/java/org/apache/druid/query/aggregation/distinctcount/DistinctCountTimeseriesQueryTest.java
+++ b/extensions-contrib/distinctcount/src/test/java/org/apache/druid/query/aggregation/distinctcount/DistinctCountTimeseriesQueryTest.java
@@ -35,6 +35,7 @@ import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.incremental.IncrementalIndexStorageAdapter;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.joda.time.DateTime;
import org.junit.Test;
@@ -50,7 +51,7 @@ public class DistinctCountTimeseriesQueryTest extends InitializedNullHandlingTes
{
TimeseriesQueryEngine engine = new TimeseriesQueryEngine();
- IncrementalIndex index = new IncrementalIndex.Builder()
+ IncrementalIndex index = new OnheapIncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withQueryGranularity(Granularities.SECOND)
@@ -58,7 +59,7 @@ public class DistinctCountTimeseriesQueryTest extends InitializedNullHandlingTes
.build()
)
.setMaxRowCount(1000)
- .buildOnheap();
+ .build();
String visitor_id = "visitor_id";
String client_type = "client_type";
diff --git a/extensions-contrib/distinctcount/src/test/java/org/apache/druid/query/aggregation/distinctcount/DistinctCountTopNQueryTest.java b/extensions-contrib/distinctcount/src/test/java/org/apache/druid/query/aggregation/distinctcount/DistinctCountTopNQueryTest.java
index 7b14fba..ef1344c 100644
--- a/extensions-contrib/distinctcount/src/test/java/org/apache/druid/query/aggregation/distinctcount/DistinctCountTopNQueryTest.java
+++ b/extensions-contrib/distinctcount/src/test/java/org/apache/druid/query/aggregation/distinctcount/DistinctCountTopNQueryTest.java
@@ -37,6 +37,7 @@ import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.incremental.IncrementalIndexStorageAdapter;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.joda.time.DateTime;
import org.junit.After;
@@ -80,7 +81,7 @@ public class DistinctCountTopNQueryTest extends InitializedNullHandlingTest
{
TopNQueryEngine engine = new TopNQueryEngine(pool);
- IncrementalIndex index = new IncrementalIndex.Builder()
+ IncrementalIndex index = new OnheapIncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withQueryGranularity(Granularities.SECOND)
@@ -88,7 +89,7 @@ public class DistinctCountTopNQueryTest extends InitializedNullHandlingTest
.build()
)
.setMaxRowCount(1000)
- .buildOnheap();
+ .build();
String visitor_id = "visitor_id";
String client_type = "client_type";
diff --git a/extensions-contrib/virtual-columns/src/test/java/org/apache/druid/segment/MapVirtualColumnTestBase.java b/extensions-contrib/virtual-columns/src/test/java/org/apache/druid/segment/MapVirtualColumnTestBase.java
index 4d2164d..87286b4 100644
--- a/extensions-contrib/virtual-columns/src/test/java/org/apache/druid/segment/MapVirtualColumnTestBase.java
+++ b/extensions-contrib/virtual-columns/src/test/java/org/apache/druid/segment/MapVirtualColumnTestBase.java
@@ -27,6 +27,7 @@ import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.testing.InitializedNullHandlingTest;
import java.io.IOException;
@@ -62,10 +63,10 @@ public class MapVirtualColumnTestBase extends InitializedNullHandlingTest
.build();
return TestIndex.loadIncrementalIndex(
- () -> new IncrementalIndex.Builder()
+ () -> new OnheapIncrementalIndex.Builder()
.setIndexSchema(schema)
.setMaxRowCount(10000)
- .buildOnheap(),
+ .build(),
input,
parser
);
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java
index 9d30023..2831734 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java
@@ -45,6 +45,7 @@ import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexAddResult;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.segment.indexing.DataSchema;
import javax.annotation.Nullable;
@@ -230,8 +231,8 @@ public class InputSourceSampler
.withRollup(dataSchema.getGranularitySpec().isRollup())
.build();
- return new IncrementalIndex.Builder().setIndexSchema(schema)
+ return new OnheapIncrementalIndex.Builder().setIndexSchema(schema)
.setMaxRowCount(samplerConfig.getNumRows())
- .buildOnheap();
+ .build();
}
}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java
index 6ab4161..01a1d24 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java
@@ -73,6 +73,7 @@ import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.segment.loading.LocalDataSegmentPuller;
import org.apache.druid.segment.loading.LocalLoadSpec;
import org.apache.druid.segment.realtime.firehose.CombiningFirehoseFactory;
@@ -196,10 +197,10 @@ public class IngestSegmentFirehoseFactoryTest
new DoubleSumAggregatorFactory(METRIC_FLOAT_NAME, DIM_FLOAT_NAME)
)
.build();
- final IncrementalIndex index = new IncrementalIndex.Builder()
+ final IncrementalIndex index = new OnheapIncrementalIndex.Builder()
.setIndexSchema(schema)
.setMaxRowCount(MAX_ROWS * MAX_SHARD_NUMBER)
- .buildOnheap();
+ .build();
for (Integer i = 0; i < MAX_ROWS; ++i) {
index.add(ROW_PARSER.parseBatch(buildRow(i.longValue())).get(0));
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java
index 06ebc56..400f7da 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java
@@ -51,6 +51,7 @@ import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.incremental.IndexSizeExceededException;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.timeline.DataSegment;
@@ -250,10 +251,10 @@ public class IngestSegmentFirehoseFactoryTimelineTest
.withDimensionsSpec(ROW_PARSER)
.withMetrics(new LongSumAggregatorFactory(METRICS[0], METRICS[0]))
.build();
- final IncrementalIndex index = new IncrementalIndex.Builder()
+ final IncrementalIndex index = new OnheapIncrementalIndex.Builder()
.setIndexSchema(schema)
.setMaxRowCount(rows.length)
- .buildOnheap();
+ .build();
for (InputRow row : rows) {
try {
diff --git a/processing/src/main/java/org/apache/druid/segment/generator/DataGenerator.java b/processing/src/main/java/org/apache/druid/segment/generator/DataGenerator.java
index f2fac21..f9aae18 100644
--- a/processing/src/main/java/org/apache/druid/segment/generator/DataGenerator.java
+++ b/processing/src/main/java/org/apache/druid/segment/generator/DataGenerator.java
@@ -24,17 +24,22 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.MapBasedInputRow;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.segment.incremental.IncrementalIndex;
+import org.apache.druid.segment.incremental.IndexSizeExceededException;
import org.joda.time.Interval;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
public class DataGenerator
{
private final List<GeneratorColumnSchema> columnSchemas;
- private final long seed;
private List<ColumnValueGenerator> columnGenerators;
private final long startTime;
@@ -46,6 +51,8 @@ public class DataGenerator
private int timeCounter;
private List<String> dimensionNames;
+ private static final Logger log = new Logger(DataGenerator.class);
+
public DataGenerator(
List<GeneratorColumnSchema> columnSchemas,
final long seed,
@@ -55,7 +62,6 @@ public class DataGenerator
)
{
this.columnSchemas = columnSchemas;
- this.seed = seed;
this.startTime = startTime;
this.endTime = Long.MAX_VALUE;
@@ -63,7 +69,7 @@ public class DataGenerator
this.timestampIncrement = timestampIncrement;
this.currentTime = startTime;
- init();
+ reset(seed);
}
public DataGenerator(
@@ -74,7 +80,6 @@ public class DataGenerator
)
{
this.columnSchemas = columnSchemas;
- this.seed = seed;
this.startTime = interval.getStartMillis();
this.endTime = interval.getEndMillis() - 1;
@@ -85,7 +90,7 @@ public class DataGenerator
this.timestampIncrement = timeDelta / (numRows * 1.0);
this.numConsecutiveTimestamps = 0;
- init();
+ reset(seed);
}
public InputRow nextRow()
@@ -98,7 +103,12 @@ public class DataGenerator
return row;
}
- private void init()
+ /**
+ * Reset this generator to start from the begining of the interval with a new seed.
+ *
+ * @param seed the new seed to generate rows from
+ */
+ public DataGenerator reset(long seed)
{
this.timeCounter = 0;
this.currentTime = startTime;
@@ -126,6 +136,8 @@ public class DataGenerator
}
)
);
+
+ return this;
}
private long nextTimestamp()
@@ -143,4 +155,71 @@ public class DataGenerator
}
}
+ /**
+ * Initialize a Java Stream generator for InputRow from this DataGenerator.
+ * The generator will log its progress once every 10,000 rows.
+ *
+ * @param numOfRows the number of rows to generate
+ * @return a generator
+ */
+ private Stream<InputRow> generator(int numOfRows)
+ {
+ return Stream.generate(
+ new Supplier<InputRow>()
+ {
+ int i = 0;
+
+ @Override
+ public InputRow get()
+ {
+ InputRow row = DataGenerator.this.nextRow();
+ i++;
+ if (i % 10_000 == 0) {
+ log.info("%,d/%,d rows generated.", i, numOfRows);
+ }
+ return row;
+ }
+ }
+ ).limit(numOfRows);
+ }
+
+ /**
+ * Add rows from any generator to an index.
+ *
+ * @param stream the stream of rows to add
+ * @param index the index to add rows to
+ */
+ public static void addStreamToIndex(Stream<InputRow> stream, IncrementalIndex<?> index)
+ {
+ stream.forEachOrdered(row -> {
+ try {
+ index.add(row);
+ }
+ catch (IndexSizeExceededException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ /**
+ * Add rows from this generator to an index.
+ *
+ * @param index the index to add rows to
+ * @param numOfRows the number of rows to add
+ */
+ public void addToIndex(IncrementalIndex<?> index, int numOfRows)
+ {
+ addStreamToIndex(generator(numOfRows), index);
+ }
+
+ /**
+ * Put rows from this generator to a list.
+ *
+ * @param numOfRows the number of rows to put in the list
+ * @return a List of InputRow
+ */
+ public List<InputRow> toList(int numOfRows)
+ {
+ return generator(numOfRows).collect(Collectors.toList());
+ }
}
diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexBuilder.java b/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexBuilder.java
index 220f0e3..faf164f 100644
--- a/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexBuilder.java
+++ b/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexBuilder.java
@@ -35,7 +35,7 @@ public abstract class AppendableIndexBuilder
protected int maxRowCount = 0;
protected long maxBytesInMemory = 0;
- protected final Logger log = new Logger(this.getClass().getName());
+ protected final Logger log = new Logger(this.getClass());
public AppendableIndexBuilder setIndexSchema(final IncrementalIndexSchema incrementalIndexSchema)
{
diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
index bbef755..3e3ab54 100644
--- a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
+++ b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
@@ -321,66 +321,6 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
}
}
- /**
- * This class exists only as backward competability to reduce the number of modified lines.
- */
- public static class Builder extends OnheapIncrementalIndex.Builder
- {
- @Override
- public Builder setIndexSchema(final IncrementalIndexSchema incrementalIndexSchema)
- {
- return (Builder) super.setIndexSchema(incrementalIndexSchema);
- }
-
- @Override
- public Builder setSimpleTestingIndexSchema(final AggregatorFactory... metrics)
- {
- return (Builder) super.setSimpleTestingIndexSchema(metrics);
- }
-
- @Override
- public Builder setSimpleTestingIndexSchema(@Nullable Boolean rollup, final AggregatorFactory... metrics)
- {
- return (Builder) super.setSimpleTestingIndexSchema(rollup, metrics);
- }
-
- @Override
- public Builder setDeserializeComplexMetrics(final boolean deserializeComplexMetrics)
- {
- return (Builder) super.setDeserializeComplexMetrics(deserializeComplexMetrics);
- }
-
- @Override
- public Builder setConcurrentEventAdd(final boolean concurrentEventAdd)
- {
- return (Builder) super.setConcurrentEventAdd(concurrentEventAdd);
- }
-
- @Override
- public Builder setSortFacts(final boolean sortFacts)
- {
- return (Builder) super.setSortFacts(sortFacts);
- }
-
- @Override
- public Builder setMaxRowCount(final int maxRowCount)
- {
- return (Builder) super.setMaxRowCount(maxRowCount);
- }
-
- @Override
- public Builder setMaxBytesInMemory(final long maxBytesInMemory)
- {
- return (Builder) super.setMaxBytesInMemory(maxBytesInMemory);
- }
-
- public OnheapIncrementalIndex buildOnheap()
- {
- return (OnheapIncrementalIndex) build();
- }
- }
-
-
public abstract FactsHolder getFacts();
public abstract boolean canAppendRow();
diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/OffheapIncrementalIndex.java b/processing/src/main/java/org/apache/druid/segment/incremental/OffheapIncrementalIndex.java
index b3cdabc..a74f94f 100644
--- a/processing/src/main/java/org/apache/druid/segment/incremental/OffheapIncrementalIndex.java
+++ b/processing/src/main/java/org/apache/druid/segment/incremental/OffheapIncrementalIndex.java
@@ -150,18 +150,13 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
boolean skipMaxRowsInMemoryCheck // ignored, we always want to check this for offheap
) throws IndexSizeExceededException
{
- ByteBuffer aggBuffer;
- int bufferIndex;
- int bufferOffset;
-
synchronized (this) {
final AggregatorFactory[] metrics = getMetrics();
final int priorIndex = facts.getPriorIndex(key);
if (IncrementalIndexRow.EMPTY_ROW_INDEX != priorIndex) {
final int[] indexAndOffset = indexAndOffsets.get(priorIndex);
- bufferIndex = indexAndOffset[0];
- bufferOffset = indexAndOffset[1];
- aggBuffer = aggBuffers.get(bufferIndex).get();
+ ByteBuffer aggBuffer = aggBuffers.get(indexAndOffset[0]).get();
+ return aggregate(row, rowContainer, aggBuffer, indexAndOffset[1]);
} else {
if (metrics.length > 0 && getAggs()[0] == null) {
// note: creation of Aggregators is done lazily when at least one row from input is available
@@ -174,7 +169,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
rowContainer.set(null);
}
- bufferIndex = aggBuffers.size() - 1;
+ int bufferIndex = aggBuffers.size() - 1;
ByteBuffer lastBuffer = aggBuffers.isEmpty() ? null : aggBuffers.get(aggBuffers.size() - 1).get();
int[] lastAggregatorsIndexAndOffset = indexAndOffsets.isEmpty()
? null
@@ -184,7 +179,8 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
throw new ISE("last row's aggregate's buffer and last buffer index must be same");
}
- bufferOffset = aggsTotalSize + (lastAggregatorsIndexAndOffset != null ? lastAggregatorsIndexAndOffset[1] : 0);
+ int bufferOffset = aggsTotalSize + (lastAggregatorsIndexAndOffset != null ? lastAggregatorsIndexAndOffset[1] : 0);
+ ByteBuffer aggBuffer;
if (lastBuffer != null &&
lastBuffer.capacity() - bufferOffset >= aggsTotalSize) {
aggBuffer = lastBuffer;
@@ -207,8 +203,9 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
final int rowIndex = indexIncrement.getAndIncrement();
- // note that indexAndOffsets must be updated before facts, because as soon as we update facts
- // concurrent readers get hold of it and might ask for newly added row
+ // note that we must update indexAndOffsets and the aggregator's buffers before facts, because as soon as we
+ // update facts concurrent readers get hold of it and might ask for newly added row
+ AddToFactsResult res = aggregate(row, rowContainer, aggBuffer, bufferOffset);
indexAndOffsets.add(new int[]{bufferIndex, bufferOffset});
final int prev = facts.putIfAbsent(key, rowIndex);
if (IncrementalIndexRow.EMPTY_ROW_INDEX == prev) {
@@ -216,12 +213,22 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
} else {
throw new ISE("Unexpected state: Concurrent fact addition.");
}
+
+ return res;
}
}
+ }
- rowContainer.set(row);
-
+ public AddToFactsResult aggregate(
+ InputRow row,
+ ThreadLocal<InputRow> rowContainer,
+ ByteBuffer aggBuffer,
+ int bufferOffset
+ )
+ {
final List<String> parseExceptionMessages = new ArrayList<>();
+
+ rowContainer.set(row);
for (int i = 0; i < getMetrics().length; i++) {
final BufferAggregator agg = getAggs()[i];
@@ -237,9 +244,11 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
}
}
rowContainer.set(null);
+
return new AddToFactsResult(getNumEntries().get(), 0, parseExceptionMessages);
}
+
@Override
public int getLastRowIndex()
{
diff --git a/processing/src/test/java/org/apache/druid/query/DoubleStorageTest.java b/processing/src/test/java/org/apache/druid/query/DoubleStorageTest.java
index d6d27b8..ee58422 100644
--- a/processing/src/test/java/org/apache/druid/query/DoubleStorageTest.java
+++ b/processing/src/test/java/org/apache/druid/query/DoubleStorageTest.java
@@ -55,6 +55,7 @@ import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.incremental.IndexSizeExceededException;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.timeline.SegmentId;
import org.joda.time.Interval;
@@ -321,10 +322,10 @@ public class DoubleStorageTest
)
.build();
- final IncrementalIndex index = new IncrementalIndex.Builder()
+ final IncrementalIndex index = new OnheapIncrementalIndex.Builder()
.setIndexSchema(schema)
.setMaxRowCount(MAX_ROWS)
- .buildOnheap();
+ .build();
getStreamOfEvents().forEach(o -> {
diff --git a/processing/src/test/java/org/apache/druid/query/MultiValuedDimensionTest.java b/processing/src/test/java/org/apache/druid/query/MultiValuedDimensionTest.java
index 33e16db..be6b207 100644
--- a/processing/src/test/java/org/apache/druid/query/MultiValuedDimensionTest.java
+++ b/processing/src/test/java/org/apache/druid/query/MultiValuedDimensionTest.java
@@ -63,6 +63,7 @@ import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.incremental.IncrementalIndex;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
@@ -140,10 +141,10 @@ public class MultiValuedDimensionTest extends InitializedNullHandlingTest
@Before
public void setup() throws Exception
{
- incrementalIndex = new IncrementalIndex.Builder()
+ incrementalIndex = new OnheapIncrementalIndex.Builder()
.setSimpleTestingIndexSchema(new CountAggregatorFactory("count"))
.setMaxRowCount(5000)
- .buildOnheap();
+ .build();
StringInputRowParser parser = new StringInputRowParser(
new CSVParseSpec(
@@ -183,10 +184,10 @@ public class MultiValuedDimensionTest extends InitializedNullHandlingTest
"UTF-8"
);
- incrementalIndexNullSampler = new IncrementalIndex.Builder()
+ incrementalIndexNullSampler = new OnheapIncrementalIndex.Builder()
.setSimpleTestingIndexSchema(new CountAggregatorFactory("count"))
.setMaxRowCount(5000)
- .buildOnheap();
+ .build();
String[] rowsNullSampler = new String[]{
"{\"time\":\"2011-01-13T00:00:00.000Z\",\"product\":\"product_1\",\"tags\":[],\"othertags\":[\"u1\", \"u2\"]}",
diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java b/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java
index c599333..4a7df06 100644
--- a/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java
+++ b/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java
@@ -82,6 +82,7 @@ import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.column.ColumnConfig;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.timeline.SegmentId;
import org.junit.rules.TemporaryFolder;
@@ -479,7 +480,7 @@ public class AggregationTestHelper implements Closeable
List<File> toMerge = new ArrayList<>();
try {
- index = new IncrementalIndex.Builder()
+ index = new OnheapIncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withMinTimestamp(minTimestamp)
@@ -491,7 +492,7 @@ public class AggregationTestHelper implements Closeable
)
.setDeserializeComplexMetrics(deserializeComplexMetrics)
.setMaxRowCount(maxRowCount)
- .buildOnheap();
+ .build();
while (rows.hasNext()) {
Object row = rows.next();
@@ -500,7 +501,7 @@ public class AggregationTestHelper implements Closeable
toMerge.add(tmp);
indexMerger.persist(index, tmp, new IndexSpec(), null);
index.close();
- index = new IncrementalIndex.Builder()
+ index = new OnheapIncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withMinTimestamp(minTimestamp)
@@ -512,7 +513,7 @@ public class AggregationTestHelper implements Closeable
)
.setDeserializeComplexMetrics(deserializeComplexMetrics)
.setMaxRowCount(maxRowCount)
- .buildOnheap();
+ .build();
}
if (row instanceof String && parser instanceof StringInputRowParser) {
//Note: this is required because StringInputRowParser is InputRowParser<ByteBuffer> as opposed to
@@ -570,7 +571,7 @@ public class AggregationTestHelper implements Closeable
boolean rollup
) throws Exception
{
- IncrementalIndex index = new IncrementalIndex.Builder()
+ IncrementalIndex index = new OnheapIncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withMinTimestamp(minTimestamp)
@@ -582,7 +583,7 @@ public class AggregationTestHelper implements Closeable
)
.setDeserializeComplexMetrics(deserializeComplexMetrics)
.setMaxRowCount(maxRowCount)
- .buildOnheap();
+ .build();
while (rows.hasNext()) {
Object row = rows.next();
diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstTimeseriesQueryTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstTimeseriesQueryTest.java
index 445c6c9..3f83308 100644
--- a/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstTimeseriesQueryTest.java
+++ b/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstTimeseriesQueryTest.java
@@ -42,6 +42,7 @@ import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.incremental.IncrementalIndexStorageAdapter;
import org.apache.druid.segment.incremental.IndexSizeExceededException;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.segment.serde.ComplexMetrics;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.joda.time.DateTime;
@@ -69,7 +70,7 @@ public class StringFirstTimeseriesQueryTest extends InitializedNullHandlingTest
final SerializablePairLongStringSerde serde = new SerializablePairLongStringSerde();
ComplexMetrics.registerSerde(serde.getTypeName(), serde);
- incrementalIndex = new IncrementalIndex.Builder()
+ incrementalIndex = new OnheapIncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withQueryGranularity(Granularities.SECOND)
@@ -78,7 +79,7 @@ public class StringFirstTimeseriesQueryTest extends InitializedNullHandlingTest
.build()
)
.setMaxRowCount(1000)
- .buildOnheap();
+ .build();
incrementalIndex.add(
new MapBasedInputRow(
diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastTimeseriesQueryTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastTimeseriesQueryTest.java
index 7765ec4..ae79cb5 100644
--- a/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastTimeseriesQueryTest.java
+++ b/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastTimeseriesQueryTest.java
@@ -42,6 +42,7 @@ import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.incremental.IncrementalIndexStorageAdapter;
import org.apache.druid.segment.incremental.IndexSizeExceededException;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.segment.serde.ComplexMetrics;
import org.joda.time.DateTime;
import org.junit.Before;
@@ -68,7 +69,7 @@ public class StringLastTimeseriesQueryTest
final SerializablePairLongStringSerde serde = new SerializablePairLongStringSerde();
ComplexMetrics.registerSerde(serde.getTypeName(), serde);
- incrementalIndex = new IncrementalIndex.Builder()
+ incrementalIndex = new OnheapIncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withQueryGranularity(Granularities.SECOND)
@@ -77,7 +78,7 @@ public class StringLastTimeseriesQueryTest
.build()
)
.setMaxRowCount(1000)
- .buildOnheap();
+ .build();
incrementalIndex.add(
new MapBasedInputRow(
diff --git a/processing/src/test/java/org/apache/druid/query/datasourcemetadata/DataSourceMetadataQueryTest.java b/processing/src/test/java/org/apache/druid/query/datasourcemetadata/DataSourceMetadataQueryTest.java
index d22ba99..5ac6b58 100644
--- a/processing/src/test/java/org/apache/druid/query/datasourcemetadata/DataSourceMetadataQueryTest.java
+++ b/processing/src/test/java/org/apache/druid/query/datasourcemetadata/DataSourceMetadataQueryTest.java
@@ -41,6 +41,7 @@ import org.apache.druid.query.context.ConcurrentResponseContext;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.segment.IncrementalIndexSegment;
import org.apache.druid.segment.incremental.IncrementalIndex;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.timeline.LogicalSegment;
import org.apache.druid.timeline.SegmentId;
import org.joda.time.DateTime;
@@ -114,10 +115,10 @@ public class DataSourceMetadataQueryTest
@Test
public void testMaxIngestedEventTime() throws Exception
{
- final IncrementalIndex rtIndex = new IncrementalIndex.Builder()
+ final IncrementalIndex rtIndex = new OnheapIncrementalIndex.Builder()
.setSimpleTestingIndexSchema(new CountAggregatorFactory("count"))
.setMaxRowCount(1000)
- .buildOnheap();
+ .build();
final QueryRunner runner = QueryRunnerTestHelper.makeQueryRunner(
new DataSourceMetadataQueryRunnerFactory(
diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java
index a2e994e..f22701d 100644
--- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java
+++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java
@@ -73,6 +73,7 @@ import org.apache.druid.segment.Segment;
import org.apache.druid.segment.column.ColumnConfig;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.apache.druid.timeline.SegmentId;
@@ -133,7 +134,7 @@ public class GroupByLimitPushDownInsufficientBufferTest extends InitializedNullH
private IncrementalIndex makeIncIndex(boolean withRollup)
{
- return new IncrementalIndex.Builder()
+ return new OnheapIncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withDimensionsSpec(new DimensionsSpec(
@@ -149,7 +150,7 @@ public class GroupByLimitPushDownInsufficientBufferTest extends InitializedNullH
)
.setConcurrentEventAdd(true)
.setMaxRowCount(1000)
- .buildOnheap();
+ .build();
}
@Before
diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java
index 357a1e2..3c62e11 100644
--- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java
+++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java
@@ -79,6 +79,7 @@ import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.timeline.SegmentId;
@@ -140,7 +141,7 @@ public class GroupByLimitPushDownMultiNodeMergeTest
private IncrementalIndex makeIncIndex(boolean withRollup)
{
- return new IncrementalIndex.Builder()
+ return new OnheapIncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withDimensionsSpec(new DimensionsSpec(
@@ -156,7 +157,7 @@ public class GroupByLimitPushDownMultiNodeMergeTest
)
.setConcurrentEventAdd(true)
.setMaxRowCount(1000)
- .buildOnheap();
+ .build();
}
@Before
diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByMultiSegmentTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByMultiSegmentTest.java
index 4f9b1ff..7b73cc1 100644
--- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByMultiSegmentTest.java
+++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByMultiSegmentTest.java
@@ -70,6 +70,7 @@ import org.apache.druid.segment.Segment;
import org.apache.druid.segment.column.ColumnConfig;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.timeline.SegmentId;
import org.junit.After;
@@ -127,7 +128,7 @@ public class GroupByMultiSegmentTest
private IncrementalIndex makeIncIndex(boolean withRollup)
{
- return new IncrementalIndex.Builder()
+ return new OnheapIncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withDimensionsSpec(new DimensionsSpec(
@@ -143,7 +144,7 @@ public class GroupByMultiSegmentTest
)
.setConcurrentEventAdd(true)
.setMaxRowCount(1000)
- .buildOnheap();
+ .build();
}
@Before
diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFactoryTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFactoryTest.java
index 5c62fed..6f84d57 100644
--- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFactoryTest.java
+++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFactoryTest.java
@@ -42,6 +42,7 @@ import org.apache.druid.segment.IncrementalIndexSegment;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.incremental.IncrementalIndex;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.timeline.SegmentId;
import org.junit.After;
import org.junit.Before;
@@ -138,11 +139,11 @@ public class GroupByQueryRunnerFactoryTest
private Segment createSegment() throws Exception
{
- IncrementalIndex incrementalIndex = new IncrementalIndex.Builder()
+ IncrementalIndex incrementalIndex = new OnheapIncrementalIndex.Builder()
.setSimpleTestingIndexSchema(new CountAggregatorFactory("count"))
.setConcurrentEventAdd(true)
.setMaxRowCount(5000)
- .buildOnheap();
+ .build();
StringInputRowParser parser = new StringInputRowParser(
new CSVParseSpec(
diff --git a/processing/src/test/java/org/apache/druid/query/groupby/NestedQueryPushDownTest.java b/processing/src/test/java/org/apache/druid/query/groupby/NestedQueryPushDownTest.java
index 9d793c7..8a97e89 100644
--- a/processing/src/test/java/org/apache/druid/query/groupby/NestedQueryPushDownTest.java
+++ b/processing/src/test/java/org/apache/druid/query/groupby/NestedQueryPushDownTest.java
@@ -77,6 +77,7 @@ import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.timeline.SegmentId;
import org.junit.After;
@@ -125,7 +126,7 @@ public class NestedQueryPushDownTest
private IncrementalIndex makeIncIndex()
{
- return new IncrementalIndex.Builder()
+ return new OnheapIncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withDimensionsSpec(new DimensionsSpec(
@@ -142,7 +143,7 @@ public class NestedQueryPushDownTest
)
.setConcurrentEventAdd(true)
.setMaxRowCount(1000)
- .buildOnheap();
+ .build();
}
@Before
diff --git a/processing/src/test/java/org/apache/druid/query/metadata/SegmentAnalyzerTest.java b/processing/src/test/java/org/apache/druid/query/metadata/SegmentAnalyzerTest.java
index 6f56e0d..11e00f7 100644
--- a/processing/src/test/java/org/apache/druid/query/metadata/SegmentAnalyzerTest.java
+++ b/processing/src/test/java/org/apache/druid/query/metadata/SegmentAnalyzerTest.java
@@ -51,6 +51,7 @@ import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.data.ObjectStrategy;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.segment.serde.ComplexMetricExtractor;
import org.apache.druid.segment.serde.ComplexMetricSerde;
import org.apache.druid.segment.serde.ComplexMetrics;
@@ -275,10 +276,10 @@ public class SegmentAnalyzerTest extends InitializedNullHandlingTest
.withRollup(true)
.build();
- final IncrementalIndex retVal = new IncrementalIndex.Builder()
+ final IncrementalIndex retVal = new OnheapIncrementalIndex.Builder()
.setIndexSchema(schema)
.setMaxRowCount(10000)
- .buildOnheap();
+ .build();
IncrementalIndex incrementalIndex = TestIndex.loadIncrementalIndex(retVal, source);
QueryableIndex queryableIndex = TestIndex.persistRealtimeAndLoadMMapped(incrementalIndex);
SegmentAnalyzer analyzer = new SegmentAnalyzer(EnumSet.of(SegmentMetadataQuery.AnalysisType.SIZE));
diff --git a/processing/src/test/java/org/apache/druid/query/scan/MultiSegmentScanQueryTest.java b/processing/src/test/java/org/apache/druid/query/scan/MultiSegmentScanQueryTest.java
index a579df0..93f46d3 100644
--- a/processing/src/test/java/org/apache/druid/query/scan/MultiSegmentScanQueryTest.java
+++ b/processing/src/test/java/org/apache/druid/query/scan/MultiSegmentScanQueryTest.java
@@ -46,6 +46,7 @@ import org.apache.druid.segment.Segment;
import org.apache.druid.segment.TestIndex;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.joda.time.Interval;
@@ -151,10 +152,10 @@ public class MultiSegmentScanQueryTest extends NullHandlingTest
.withQueryGranularity(Granularities.HOUR)
.withMetrics(TestIndex.METRIC_AGGS)
.build();
- return new IncrementalIndex.Builder()
+ return new OnheapIncrementalIndex.Builder()
.setIndexSchema(schema)
.setMaxRowCount(maxRowCount)
- .buildOnheap();
+ .build();
}
@AfterClass
diff --git a/processing/src/test/java/org/apache/druid/query/search/SearchQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/search/SearchQueryRunnerTest.java
index aebcf25..16a3590 100644
--- a/processing/src/test/java/org/apache/druid/query/search/SearchQueryRunnerTest.java
+++ b/processing/src/test/java/org/apache/druid/query/search/SearchQueryRunnerTest.java
@@ -59,6 +59,7 @@ import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.apache.druid.timeline.SegmentId;
import org.junit.Assert;
@@ -719,14 +720,14 @@ public class SearchQueryRunnerTest extends InitializedNullHandlingTest
@Test
public void testSearchWithNullValueInDimension() throws Exception
{
- IncrementalIndex<Aggregator> index = new IncrementalIndex.Builder()
+ IncrementalIndex<Aggregator> index = new OnheapIncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withMinTimestamp(DateTimes.of("2011-01-12T00:00:00.000Z").getMillis())
.build()
)
.setMaxRowCount(10)
- .buildOnheap();
+ .build();
index.add(
new MapBasedInputRow(
diff --git a/processing/src/test/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java
index ecbe8af..6834cec 100644
--- a/processing/src/test/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java
+++ b/processing/src/test/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java
@@ -41,6 +41,7 @@ import org.apache.druid.segment.Segment;
import org.apache.druid.segment.TestIndex;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.NoneShardSpec;
@@ -117,10 +118,10 @@ public class TimeBoundaryQueryRunnerTest
.withQueryGranularity(Granularities.HOUR)
.withMetrics(TestIndex.METRIC_AGGS)
.build();
- return new IncrementalIndex.Builder()
+ return new OnheapIncrementalIndex.Builder()
.setIndexSchema(schema)
.setMaxRowCount(maxRowCount)
- .buildOnheap();
+ .build();
}
private static SegmentId makeIdentifier(IncrementalIndex index, String version)
diff --git a/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerBonusTest.java b/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerBonusTest.java
index a0d33e1..8413c7e 100644
--- a/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerBonusTest.java
+++ b/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerBonusTest.java
@@ -38,6 +38,7 @@ import org.apache.druid.segment.IncrementalIndexSegment;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.timeline.SegmentId;
import org.junit.Assert;
import org.junit.Test;
@@ -66,14 +67,14 @@ public class TimeseriesQueryRunnerBonusTest
@Test
public void testOneRowAtATime() throws Exception
{
- final IncrementalIndex oneRowIndex = new IncrementalIndex.Builder()
+ final IncrementalIndex oneRowIndex = new OnheapIncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withMinTimestamp(DateTimes.of("2012-01-01T00:00:00Z").getMillis())
.build()
)
.setMaxRowCount(1000)
- .buildOnheap();
+ .build();
List<Result<TimeseriesResultValue>> results;
diff --git a/processing/src/test/java/org/apache/druid/segment/EmptyIndexTest.java b/processing/src/test/java/org/apache/druid/segment/EmptyIndexTest.java
index caeab96..8264460 100644
--- a/processing/src/test/java/org/apache/druid/segment/EmptyIndexTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/EmptyIndexTest.java
@@ -28,6 +28,7 @@ import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexAdapter;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory;
@@ -72,10 +73,10 @@ public class EmptyIndexTest
}
try {
- IncrementalIndex emptyIndex = new IncrementalIndex.Builder()
+ IncrementalIndex emptyIndex = new OnheapIncrementalIndex.Builder()
.setSimpleTestingIndexSchema(/* empty */)
.setMaxRowCount(1000)
- .buildOnheap();
+ .build();
IncrementalIndexAdapter emptyIndexAdapter = new IncrementalIndexAdapter(
Intervals.of("2012-08-01/P3D"),
diff --git a/processing/src/test/java/org/apache/druid/segment/IndexBuilder.java b/processing/src/test/java/org/apache/druid/segment/IndexBuilder.java
index 2ed65fd..c5d0b3a 100644
--- a/processing/src/test/java/org/apache/druid/segment/IndexBuilder.java
+++ b/processing/src/test/java/org/apache/druid/segment/IndexBuilder.java
@@ -32,6 +32,7 @@ import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.incremental.IndexSizeExceededException;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.apache.druid.timeline.SegmentId;
@@ -226,10 +227,10 @@ public class IndexBuilder
)
{
Preconditions.checkNotNull(schema, "schema");
- final IncrementalIndex incrementalIndex = new IncrementalIndex.Builder()
+ final IncrementalIndex incrementalIndex = new OnheapIncrementalIndex.Builder()
.setIndexSchema(schema)
.setMaxRowCount(maxRows)
- .buildOnheap();
+ .build();
for (InputRow row : rows) {
try {
diff --git a/processing/src/test/java/org/apache/druid/segment/IndexIOTest.java b/processing/src/test/java/org/apache/druid/segment/IndexIOTest.java
index 2fb6bc3..5066bef 100644
--- a/processing/src/test/java/org/apache/druid/segment/IndexIOTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/IndexIOTest.java
@@ -40,6 +40,7 @@ import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexAdapter;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.incremental.IndexSizeExceededException;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.joda.time.Interval;
import org.junit.Assert;
@@ -253,7 +254,7 @@ public class IndexIOTest extends InitializedNullHandlingTest
this.exception = exception;
}
- final IncrementalIndex<Aggregator> incrementalIndex1 = new IncrementalIndex.Builder()
+ final IncrementalIndex<Aggregator> incrementalIndex1 = new OnheapIncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withMinTimestamp(DEFAULT_INTERVAL.getStart().getMillis())
@@ -268,9 +269,9 @@ public class IndexIOTest extends InitializedNullHandlingTest
.build()
)
.setMaxRowCount(1000000)
- .buildOnheap();
+ .build();
- final IncrementalIndex<Aggregator> incrementalIndex2 = new IncrementalIndex.Builder()
+ final IncrementalIndex<Aggregator> incrementalIndex2 = new OnheapIncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withMinTimestamp(DEFAULT_INTERVAL.getStart().getMillis())
@@ -285,7 +286,7 @@ public class IndexIOTest extends InitializedNullHandlingTest
.build()
)
.setMaxRowCount(1000000)
- .buildOnheap();
+ .build();
IndexableAdapter adapter1;
IndexableAdapter adapter2;
diff --git a/processing/src/test/java/org/apache/druid/segment/IndexMergerTestBase.java b/processing/src/test/java/org/apache/druid/segment/IndexMergerTestBase.java
index b16429f..124554e 100644
--- a/processing/src/test/java/org/apache/druid/segment/IndexMergerTestBase.java
+++ b/processing/src/test/java/org/apache/druid/segment/IndexMergerTestBase.java
@@ -57,6 +57,7 @@ import org.apache.druid.segment.data.IncrementalIndexTest;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexAdapter;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.joda.time.Interval;
@@ -270,10 +271,10 @@ public class IndexMergerTestBase extends InitializedNullHandlingTest
IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(null);
IncrementalIndexTest.populateIndex(timestamp, toPersist1);
- IncrementalIndex toPersist2 = new IncrementalIndex.Builder()
+ IncrementalIndex toPersist2 = new OnheapIncrementalIndex.Builder()
.setSimpleTestingIndexSchema(new CountAggregatorFactory("count"))
.setMaxRowCount(1000)
- .buildOnheap();
+ .build();
toPersist2.add(
new MapBasedInputRow(
@@ -344,15 +345,15 @@ public class IndexMergerTestBase extends InitializedNullHandlingTest
@Test
public void testPersistEmptyColumn() throws Exception
{
- final IncrementalIndex toPersist1 = new IncrementalIndex.Builder()
+ final IncrementalIndex toPersist1 = new OnheapIncrementalIndex.Builder()
.setSimpleTestingIndexSchema(/* empty */)
.setMaxRowCount(10)
- .buildOnheap();
+ .build();
- final IncrementalIndex toPersist2 = new IncrementalIndex.Builder()
+ final IncrementalIndex toPersist2 = new OnheapIncrementalIndex.Builder()
.setSimpleTestingIndexSchema(/* empty */)
.setMaxRowCount(10)
- .buildOnheap();
+ .build();
final File tmpDir1 = temporaryFolder.newFolder();
final File tmpDir2 = temporaryFolder.newFolder();
@@ -840,18 +841,18 @@ public class IndexMergerTestBase extends InitializedNullHandlingTest
.build();
- IncrementalIndex toPersist1 = new IncrementalIndex.Builder()
+ IncrementalIndex toPersist1 = new OnheapIncrementalIndex.Builder()
.setIndexSchema(schema)
.setMaxRowCount(1000)
- .buildOnheap();
- IncrementalIndex toPersist2 = new IncrementalIndex.Builder()
+ .build();
+ IncrementalIndex toPersist2 = new OnheapIncrementalIndex.Builder()
.setIndexSchema(schema)
.setMaxRowCount(1000)
- .buildOnheap();
- IncrementalIndex toPersist3 = new IncrementalIndex.Builder()
+ .build();
+ IncrementalIndex toPersist3 = new OnheapIncrementalIndex.Builder()
.setIndexSchema(schema)
.setMaxRowCount(1000)
- .buildOnheap();
+ .build();
addDimValuesToIndex(toPersist1, "dimA", Arrays.asList("1", "2"));
addDimValuesToIndex(toPersist2, "dimA", Arrays.asList("1", "2"));
@@ -1027,10 +1028,10 @@ public class IndexMergerTestBase extends InitializedNullHandlingTest
for (IncrementalIndexSchema indexSchema : Arrays.asList(rollupIndexSchema, noRollupIndexSchema)) {
- IncrementalIndex toPersistA = new IncrementalIndex.Builder()
+ IncrementalIndex toPersistA = new OnheapIncrementalIndex.Builder()
.setIndexSchema(indexSchema)
.setMaxRowCount(1000)
- .buildOnheap();
+ .build();
toPersistA.add(
new MapBasedInputRow(
@@ -1047,10 +1048,10 @@ public class IndexMergerTestBase extends InitializedNullHandlingTest
)
);
- IncrementalIndex toPersistB = new IncrementalIndex.Builder()
+ IncrementalIndex toPersistB = new OnheapIncrementalIndex.Builder()
.setIndexSchema(indexSchema)
.setMaxRowCount(1000)
- .buildOnheap();
+ .build();
toPersistB.add(
new MapBasedInputRow(
@@ -1193,10 +1194,10 @@ public class IndexMergerTestBase extends InitializedNullHandlingTest
.withMetrics(new CountAggregatorFactory("count"))
.withRollup(false)
.build();
- IncrementalIndex toPersistA = new IncrementalIndex.Builder()
+ IncrementalIndex toPersistA = new OnheapIncrementalIndex.Builder()
.setIndexSchema(indexSchema)
.setMaxRowCount(1000)
- .buildOnheap();
+ .build();
toPersistA.add(
new MapBasedInputRow(
@@ -1217,10 +1218,10 @@ public class IndexMergerTestBase extends InitializedNullHandlingTest
)
);
- IncrementalIndex toPersistB = new IncrementalIndex.Builder()
+ IncrementalIndex toPersistB = new OnheapIncrementalIndex.Builder()
.setIndexSchema(indexSchema)
.setMaxRowCount(1000)
- .buildOnheap();
+ .build();
toPersistB.add(
new MapBasedInputRow(
@@ -1330,10 +1331,10 @@ public class IndexMergerTestBase extends InitializedNullHandlingTest
IncrementalIndex toPersistBA = getSingleDimIndex("dimB", Arrays.asList("1", "2", "3"));
addDimValuesToIndex(toPersistBA, "dimA", Arrays.asList("1", "2"));
- IncrementalIndex toPersistBA2 = new IncrementalIndex.Builder()
+ IncrementalIndex toPersistBA2 = new OnheapIncrementalIndex.Builder()
.setSimpleTestingIndexSchema(new CountAggregatorFactory("count"))
.setMaxRowCount(1000)
- .buildOnheap();
+ .build();
toPersistBA2.add(
new MapBasedInputRow(
@@ -1885,10 +1886,10 @@ public class IndexMergerTestBase extends InitializedNullHandlingTest
.withMetrics(new CountAggregatorFactory("count"))
.build();
- return new IncrementalIndex.Builder()
+ return new OnheapIncrementalIndex.Builder()
.setIndexSchema(schema)
.setMaxRowCount(1000)
- .buildOnheap();
+ .build();
}
@@ -1932,10 +1933,10 @@ public class IndexMergerTestBase extends InitializedNullHandlingTest
private IncrementalIndex getIndexD3() throws Exception
{
- IncrementalIndex toPersist1 = new IncrementalIndex.Builder()
+ IncrementalIndex toPersist1 = new OnheapIncrementalIndex.Builder()
.setSimpleTestingIndexSchema(new CountAggregatorFactory("count"))
.setMaxRowCount(1000)
- .buildOnheap();
+ .build();
toPersist1.add(
new MapBasedInputRow(
@@ -1966,10 +1967,10 @@ public class IndexMergerTestBase extends InitializedNullHandlingTest
private IncrementalIndex getSingleDimIndex(String dimName, List<String> values) throws Exception
{
- IncrementalIndex toPersist1 = new IncrementalIndex.Builder()
+ IncrementalIndex toPersist1 = new OnheapIncrementalIndex.Builder()
.setSimpleTestingIndexSchema(new CountAggregatorFactory("count"))
.setMaxRowCount(1000)
- .buildOnheap();
+ .build();
addDimValuesToIndex(toPersist1, dimName, values);
return toPersist1;
@@ -1989,10 +1990,10 @@ public class IndexMergerTestBase extends InitializedNullHandlingTest
.withMetrics(new CountAggregatorFactory("count"))
.build();
- return new IncrementalIndex.Builder()
+ return new OnheapIncrementalIndex.Builder()
.setIndexSchema(schema)
.setMaxRowCount(1000)
- .buildOnheap();
+ .build();
}
private AggregatorFactory[] getCombiningAggregators(AggregatorFactory[] aggregators)
@@ -2225,10 +2226,10 @@ public class IndexMergerTestBase extends InitializedNullHandlingTest
.withRollup(true)
.build();
- IncrementalIndex toPersistA = new IncrementalIndex.Builder()
+ IncrementalIndex toPersistA = new OnheapIncrementalIndex.Builder()
.setIndexSchema(indexSchema)
.setMaxRowCount(1000)
- .buildOnheap();
+ .build();
Map<String, Object> event1 = new HashMap<>();
event1.put("dimA", "leek");
@@ -2243,10 +2244,10 @@ public class IndexMergerTestBase extends InitializedNullHandlingTest
toPersistA.add(new MapBasedInputRow(1, dims, event1));
toPersistA.add(new MapBasedInputRow(1, dims, event2));
- IncrementalIndex toPersistB = new IncrementalIndex.Builder()
+ IncrementalIndex toPersistB = new OnheapIncrementalIndex.Builder()
.setIndexSchema(indexSchema)
.setMaxRowCount(1000)
- .buildOnheap();
+ .build();
Map<String, Object> event3 = new HashMap<>();
event3.put("dimA", "leek");
@@ -2472,10 +2473,10 @@ public class IndexMergerTestBase extends InitializedNullHandlingTest
multivalEvent9
);
- IncrementalIndex toPersistA = new IncrementalIndex.Builder()
+ IncrementalIndex toPersistA = new OnheapIncrementalIndex.Builder()
.setIndexSchema(indexSchema)
.setMaxRowCount(1000)
- .buildOnheap();
+ .build();
for (Map<String, Object> event : events) {
toPersistA.add(new MapBasedInputRow(1, dims, event));
@@ -2488,10 +2489,10 @@ public class IndexMergerTestBase extends InitializedNullHandlingTest
List<QueryableIndex> singleEventIndexes = new ArrayList<>();
for (Map<String, Object> event : events) {
- IncrementalIndex toPersist = new IncrementalIndex.Builder()
+ IncrementalIndex toPersist = new OnheapIncrementalIndex.Builder()
.setIndexSchema(indexSchema)
.setMaxRowCount(1000)
- .buildOnheap();
+ .build();
toPersist.add(new MapBasedInputRow(1, dims, event));
final File tmpDir = temporaryFolder.newFolder();
diff --git a/processing/src/test/java/org/apache/druid/segment/IndexMergerV9CompatibilityTest.java b/processing/src/test/java/org/apache/druid/segment/IndexMergerV9CompatibilityTest.java
index 4baaddf..b1866ff 100644
--- a/processing/src/test/java/org/apache/druid/segment/IndexMergerV9CompatibilityTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/IndexMergerV9CompatibilityTest.java
@@ -35,6 +35,7 @@ import org.apache.druid.segment.data.CompressionStrategy;
import org.apache.druid.segment.data.ConciseBitmapSerdeFactory;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory;
@@ -133,7 +134,7 @@ public class IndexMergerV9CompatibilityTest
@Before
public void setUp() throws IOException
{
- toPersist = new IncrementalIndex.Builder()
+ toPersist = new OnheapIncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withMinTimestamp(JodaUtils.MIN_INSTANT)
@@ -141,7 +142,7 @@ public class IndexMergerV9CompatibilityTest
.build()
)
.setMaxRowCount(1000000)
- .buildOnheap();
+ .build();
toPersist.getMetadata().put("key", "value");
for (InputRow event : events) {
diff --git a/processing/src/test/java/org/apache/druid/segment/IndexMergerV9WithSpatialIndexTest.java b/processing/src/test/java/org/apache/druid/segment/IndexMergerV9WithSpatialIndexTest.java
index 7a0f8e7..8e5da94 100644
--- a/processing/src/test/java/org/apache/druid/segment/IndexMergerV9WithSpatialIndexTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/IndexMergerV9WithSpatialIndexTest.java
@@ -47,6 +47,7 @@ import org.apache.druid.query.timeseries.TimeseriesQueryRunnerFactory;
import org.apache.druid.query.timeseries.TimeseriesResultValue;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.joda.time.Interval;
@@ -101,7 +102,7 @@ public class IndexMergerV9WithSpatialIndexTest extends InitializedNullHandlingTe
private static IncrementalIndex makeIncrementalIndex() throws IOException
{
- IncrementalIndex theIndex = new IncrementalIndex.Builder()
+ IncrementalIndex theIndex = new OnheapIncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withMinTimestamp(DATA_INTERVAL.getStartMillis())
@@ -126,7 +127,7 @@ public class IndexMergerV9WithSpatialIndexTest extends InitializedNullHandlingTe
).build()
)
.setMaxRowCount(NUM_POINTS)
- .buildOnheap();
+ .build();
theIndex.add(
new MapBasedInputRow(
@@ -276,7 +277,7 @@ public class IndexMergerV9WithSpatialIndexTest extends InitializedNullHandlingTe
)
{
try {
- IncrementalIndex first = new IncrementalIndex.Builder()
+ IncrementalIndex first = new OnheapIncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withMinTimestamp(DATA_INTERVAL.getStartMillis())
@@ -301,9 +302,9 @@ public class IndexMergerV9WithSpatialIndexTest extends InitializedNullHandlingTe
).build()
)
.setMaxRowCount(1000)
- .buildOnheap();
+ .build();
- IncrementalIndex second = new IncrementalIndex.Builder()
+ IncrementalIndex second = new OnheapIncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withMinTimestamp(DATA_INTERVAL.getStartMillis())
@@ -328,9 +329,9 @@ public class IndexMergerV9WithSpatialIndexTest extends InitializedNullHandlingTe
).build()
)
.setMaxRowCount(1000)
- .buildOnheap();
+ .build();
- IncrementalIndex third = new IncrementalIndex.Builder()
+ IncrementalIndex third = new OnheapIncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withMinTimestamp(DATA_INTERVAL.getStartMillis())
@@ -355,7 +356,7 @@ public class IndexMergerV9WithSpatialIndexTest extends InitializedNullHandlingTe
).build()
)
.setMaxRowCount(NUM_POINTS)
- .buildOnheap();
+ .build();
first.add(
new MapBasedInputRow(
diff --git a/processing/src/test/java/org/apache/druid/segment/SchemalessIndexTest.java b/processing/src/test/java/org/apache/druid/segment/SchemalessIndexTest.java
index 6acd72a..7328061 100644
--- a/processing/src/test/java/org/apache/druid/segment/SchemalessIndexTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/SchemalessIndexTest.java
@@ -39,6 +39,7 @@ import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.incremental.IndexSizeExceededException;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.segment.serde.ComplexMetrics;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.apache.druid.timeline.Overshadowable;
@@ -149,7 +150,7 @@ public class SchemalessIndexTest
final long timestamp = new DateTime(event.get(TIMESTAMP), ISOChronology.getInstanceUTC()).getMillis();
if (theIndex == null) {
- theIndex = new IncrementalIndex.Builder()
+ theIndex = new OnheapIncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withMinTimestamp(timestamp)
@@ -158,7 +159,7 @@ public class SchemalessIndexTest
.build()
)
.setMaxRowCount(1000)
- .buildOnheap();
+ .build();
}
final List<String> dims = new ArrayList<>();
@@ -369,7 +370,7 @@ public class SchemalessIndexTest
}
}
- final IncrementalIndex rowIndex = new IncrementalIndex.Builder()
+ final IncrementalIndex rowIndex = new OnheapIncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withMinTimestamp(timestamp)
@@ -378,7 +379,7 @@ public class SchemalessIndexTest
.build()
)
.setMaxRowCount(1000)
- .buildOnheap();
+ .build();
rowIndex.add(
new MapBasedInputRow(timestamp, dims, event)
@@ -406,7 +407,7 @@ public class SchemalessIndexTest
String filename = resource.getFile();
log.info("Realtime loading index file[%s]", filename);
- final IncrementalIndex retVal = new IncrementalIndex.Builder()
+ final IncrementalIndex retVal = new OnheapIncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withMinTimestamp(DateTimes.of("2011-01-12T00:00:00.000Z").getMillis())
@@ -415,7 +416,7 @@ public class SchemalessIndexTest
.build()
)
.setMaxRowCount(1000)
- .buildOnheap();
+ .build();
try {
final List<Object> events = JSON_MAPPER.readValue(new File(filename), List.class);
diff --git a/processing/src/test/java/org/apache/druid/segment/TestIndex.java b/processing/src/test/java/org/apache/druid/segment/TestIndex.java
index 58bf56c..020a4ec 100644
--- a/processing/src/test/java/org/apache/druid/segment/TestIndex.java
+++ b/processing/src/test/java/org/apache/druid/segment/TestIndex.java
@@ -49,6 +49,7 @@ import org.apache.druid.query.expression.TestExprMacroTable;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.segment.serde.ComplexMetrics;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
@@ -293,10 +294,10 @@ public class TestIndex
.withMetrics(METRIC_AGGS)
.withRollup(rollup)
.build();
- final IncrementalIndex retVal = new IncrementalIndex.Builder()
+ final IncrementalIndex retVal = new OnheapIncrementalIndex.Builder()
.setIndexSchema(schema)
.setMaxRowCount(10000)
- .buildOnheap();
+ .build();
try {
return loadIncrementalIndex(retVal, source);
diff --git a/processing/src/test/java/org/apache/druid/segment/data/IncrementalIndexTest.java b/processing/src/test/java/org/apache/druid/segment/data/IncrementalIndexTest.java
index 534034b..9b5d2f3 100644
--- a/processing/src/test/java/org/apache/druid/segment/data/IncrementalIndexTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/data/IncrementalIndexTest.java
@@ -19,6 +19,7 @@
package org.apache.druid.segment.data;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
@@ -28,7 +29,6 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.druid.collections.CloseableStupidPool;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.Row;
import org.apache.druid.data.input.impl.DimensionsSpec;
@@ -37,7 +37,6 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Accumulator;
import org.apache.druid.java.util.common.guava.Sequence;
-import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.query.Druids;
import org.apache.druid.query.FinalizeResultsQueryRunner;
import org.apache.druid.query.QueryPlus;
@@ -45,7 +44,6 @@ import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryRunnerTestHelper;
import org.apache.druid.query.Result;
-import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
@@ -63,21 +61,18 @@ import org.apache.druid.segment.CloserRule;
import org.apache.druid.segment.IncrementalIndexSegment;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.incremental.IncrementalIndex;
+import org.apache.druid.segment.incremental.IncrementalIndexCreator;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.incremental.IndexSizeExceededException;
-import org.apache.druid.segment.incremental.OffheapIncrementalIndex;
import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.joda.time.Interval;
-import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
-import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -96,70 +91,24 @@ import java.util.concurrent.atomic.AtomicInteger;
@RunWith(Parameterized.class)
public class IncrementalIndexTest extends InitializedNullHandlingTest
{
- interface IndexCreator
- {
- IncrementalIndex createIndex(AggregatorFactory[] aggregatorFactories);
- }
-
- private static final Closer RESOURCE_CLOSER = Closer.create();
-
- @AfterClass
- public static void teardown() throws IOException
- {
- RESOURCE_CLOSER.close();
- }
-
- private final IndexCreator indexCreator;
+ public final IncrementalIndexCreator indexCreator;
@Rule
- public final CloserRule closerRule = new CloserRule(false);
+ public final CloserRule closer = new CloserRule(false);
- public IncrementalIndexTest(IndexCreator indexCreator)
+ public IncrementalIndexTest(String indexType, String mode) throws JsonProcessingException
{
- this.indexCreator = indexCreator;
+ indexCreator = closer.closeLater(new IncrementalIndexCreator(indexType, (builder, args) -> builder
+ .setSimpleTestingIndexSchema("rollup".equals(mode), (AggregatorFactory[]) args[0])
+ .setMaxRowCount(1_000_000)
+ .build()
+ ));
}
- @Parameterized.Parameters
+ @Parameterized.Parameters(name = "{index}: {0}, {1}")
public static Collection<?> constructorFeeder()
{
- final List<Object[]> params = new ArrayList<>();
- params.add(new Object[] {(IndexCreator) IncrementalIndexTest::createIndex});
- final CloseableStupidPool<ByteBuffer> pool1 = new CloseableStupidPool<>(
- "OffheapIncrementalIndex-bufferPool",
- () -> ByteBuffer.allocate(256 * 1024)
- );
- RESOURCE_CLOSER.register(pool1);
- params.add(
- new Object[] {
- (IndexCreator) factories -> new OffheapIncrementalIndex.Builder()
- .setBufferPool(pool1)
- .setSimpleTestingIndexSchema(factories)
- .setMaxRowCount(1000000)
- .build()
- }
- );
- params.add(new Object[] {(IndexCreator) IncrementalIndexTest::createNoRollupIndex});
- final CloseableStupidPool<ByteBuffer> pool2 = new CloseableStupidPool<>(
- "OffheapIncrementalIndex-bufferPool",
- () -> ByteBuffer.allocate(256 * 1024)
- );
- RESOURCE_CLOSER.register(pool2);
- params.add(
- new Object[] {
- (IndexCreator) factories -> new OffheapIncrementalIndex.Builder()
- .setBufferPool(pool2)
- .setIndexSchema(
- new IncrementalIndexSchema.Builder()
- .withMetrics(factories)
- .withRollup(false)
- .build()
- )
- .setMaxRowCount(1000000)
- .build()
- }
- );
-
- return params;
+ return IncrementalIndexCreator.indexTypeCartesianProduct(ImmutableList.of("rollup", "plain"));
}
public static AggregatorFactory[] getDefaultCombiningAggregatorFactories()
@@ -268,7 +217,7 @@ public class IncrementalIndexTest extends InitializedNullHandlingTest
public void testCaseSensitivity() throws Exception
{
long timestamp = System.currentTimeMillis();
- IncrementalIndex index = closerRule.closeLater(indexCreator.createIndex(DEFAULT_AGGREGATOR_FACTORIES));
+ IncrementalIndex<?> index = indexCreator.createIndex((Object) DEFAULT_AGGREGATOR_FACTORIES);
populateIndex(timestamp, index);
Assert.assertEquals(Arrays.asList("dim1", "dim2"), index.getDimensionNames());
@@ -290,27 +239,25 @@ public class IncrementalIndexTest extends InitializedNullHandlingTest
public void testFilteredAggregators() throws Exception
{
long timestamp = System.currentTimeMillis();
- IncrementalIndex index = closerRule.closeLater(
- indexCreator.createIndex(new AggregatorFactory[]{
- new CountAggregatorFactory("count"),
- new FilteredAggregatorFactory(
- new CountAggregatorFactory("count_selector_filtered"),
- new SelectorDimFilter("dim2", "2", null)
- ),
- new FilteredAggregatorFactory(
- new CountAggregatorFactory("count_bound_filtered"),
- new BoundDimFilter("dim2", "2", "3", false, true, null, null, StringComparators.NUMERIC)
- ),
- new FilteredAggregatorFactory(
- new CountAggregatorFactory("count_multivaldim_filtered"),
- new SelectorDimFilter("dim3", "b", null)
- ),
- new FilteredAggregatorFactory(
- new CountAggregatorFactory("count_numeric_filtered"),
- new SelectorDimFilter("met1", "11", null)
- )
- })
- );
+ IncrementalIndex<?> index = indexCreator.createIndex((Object) new AggregatorFactory[]{
+ new CountAggregatorFactory("count"),
+ new FilteredAggregatorFactory(
+ new CountAggregatorFactory("count_selector_filtered"),
+ new SelectorDimFilter("dim2", "2", null)
+ ),
+ new FilteredAggregatorFactory(
+ new CountAggregatorFactory("count_bound_filtered"),
+ new BoundDimFilter("dim2", "2", "3", false, true, null, null, StringComparators.NUMERIC)
+ ),
+ new FilteredAggregatorFactory(
+ new CountAggregatorFactory("count_multivaldim_filtered"),
+ new SelectorDimFilter("dim3", "b", null)
+ ),
+ new FilteredAggregatorFactory(
+ new CountAggregatorFactory("count_numeric_filtered"),
+ new SelectorDimFilter("met1", "11", null)
+ )
+ });
index.add(
new MapBasedInputRow(
@@ -386,11 +333,9 @@ public class IncrementalIndexTest extends InitializedNullHandlingTest
);
}
- final IncrementalIndex index = closerRule.closeLater(
- indexCreator.createIndex(
- ingestAggregatorFactories.toArray(
- new AggregatorFactory[0]
- )
+ final IncrementalIndex<?> index = indexCreator.createIndex(
+ (Object) ingestAggregatorFactories.toArray(
+ new AggregatorFactory[0]
)
);
@@ -501,8 +446,8 @@ public class IncrementalIndexTest extends InitializedNullHandlingTest
}
- final IncrementalIndex index = closerRule.closeLater(
- indexCreator.createIndex(ingestAggregatorFactories.toArray(new AggregatorFactory[0]))
+ final IncrementalIndex<?> index = indexCreator.createIndex(
+ (Object) ingestAggregatorFactories.toArray(new AggregatorFactory[0])
);
final int concurrentThreads = 2;
final int elementsPerThread = 10_000;
@@ -679,7 +624,7 @@ public class IncrementalIndexTest extends InitializedNullHandlingTest
@Test
public void testConcurrentAdd() throws Exception
{
- final IncrementalIndex index = closerRule.closeLater(indexCreator.createIndex(DEFAULT_AGGREGATOR_FACTORIES));
+ final IncrementalIndex<?> index = indexCreator.createIndex((Object) DEFAULT_AGGREGATOR_FACTORIES);
final int threadCount = 10;
final int elementsPerThread = 200;
final int dimensionCount = 5;
@@ -725,22 +670,23 @@ public class IncrementalIndexTest extends InitializedNullHandlingTest
@Test
public void testgetDimensions()
{
- final IncrementalIndex<Aggregator> incrementalIndex = (OnheapIncrementalIndex) new OnheapIncrementalIndex.Builder()
- .setIndexSchema(
- new IncrementalIndexSchema.Builder()
- .withMetrics(new CountAggregatorFactory("count"))
- .withDimensionsSpec(
- new DimensionsSpec(
- DimensionsSpec.getDefaultSchemas(Arrays.asList("dim0", "dim1")),
- null,
- null
+ final IncrementalIndex<?> incrementalIndex = indexCreator.createIndex(
+ (builder, args) -> builder
+ .setIndexSchema(
+ new IncrementalIndexSchema.Builder()
+ .withMetrics(new CountAggregatorFactory("count"))
+ .withDimensionsSpec(
+ new DimensionsSpec(
+ DimensionsSpec.getDefaultSchemas(Arrays.asList("dim0", "dim1")),
+ null,
+ null
+ )
)
- )
- .build()
- )
- .setMaxRowCount(1000000)
- .build();
- closerRule.closeLater(incrementalIndex);
+ .build()
+ )
+ .setMaxRowCount(1000000)
+ .build()
+ );
Assert.assertEquals(Arrays.asList("dim0", "dim1"), incrementalIndex.getDimensionNames());
}
@@ -748,11 +694,13 @@ public class IncrementalIndexTest extends InitializedNullHandlingTest
@Test
public void testDynamicSchemaRollup() throws IndexSizeExceededException
{
- IncrementalIndex<Aggregator> index = (OnheapIncrementalIndex) new OnheapIncrementalIndex.Builder()
- .setSimpleTestingIndexSchema(/* empty */)
- .setMaxRowCount(10)
- .build();
- closerRule.closeLater(index);
+ final IncrementalIndex<?> index = indexCreator.createIndex(
+ (builder, args) -> builder
+ .setSimpleTestingIndexSchema(/* empty */)
+ .setMaxRowCount(10)
+ .build()
+ );
+
index.add(
new MapBasedInputRow(
1481871600000L,
diff --git a/processing/src/test/java/org/apache/druid/segment/filter/SpatialFilterBonusTest.java b/processing/src/test/java/org/apache/druid/segment/filter/SpatialFilterBonusTest.java
index 5cc8190..94e413c 100644
--- a/processing/src/test/java/org/apache/druid/segment/filter/SpatialFilterBonusTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/filter/SpatialFilterBonusTest.java
@@ -56,6 +56,7 @@ import org.apache.druid.segment.Segment;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.joda.time.Interval;
import org.junit.Test;
@@ -114,7 +115,7 @@ public class SpatialFilterBonusTest
private static IncrementalIndex makeIncrementalIndex() throws IOException
{
- IncrementalIndex theIndex = new IncrementalIndex.Builder()
+ IncrementalIndex theIndex = new OnheapIncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withMinTimestamp(DATA_INTERVAL.getStartMillis())
@@ -134,7 +135,7 @@ public class SpatialFilterBonusTest
).build()
)
.setMaxRowCount(NUM_POINTS)
- .buildOnheap();
+ .build();
theIndex.add(
new MapBasedInputRow(
@@ -261,7 +262,7 @@ public class SpatialFilterBonusTest
)
{
try {
- IncrementalIndex first = new IncrementalIndex.Builder()
+ IncrementalIndex first = new OnheapIncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withMinTimestamp(DATA_INTERVAL.getStartMillis())
@@ -282,9 +283,9 @@ public class SpatialFilterBonusTest
).build()
)
.setMaxRowCount(NUM_POINTS)
- .buildOnheap();
+ .build();
- IncrementalIndex second = new IncrementalIndex.Builder()
+ IncrementalIndex second = new OnheapIncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withMinTimestamp(DATA_INTERVAL.getStartMillis())
@@ -304,9 +305,9 @@ public class SpatialFilterBonusTest
).build()
)
.setMaxRowCount(NUM_POINTS)
- .buildOnheap();
+ .build();
- IncrementalIndex third = new IncrementalIndex.Builder()
+ IncrementalIndex third = new OnheapIncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withMinTimestamp(DATA_INTERVAL.getStartMillis())
@@ -327,7 +328,7 @@ public class SpatialFilterBonusTest
).build()
)
.setMaxRowCount(NUM_POINTS)
- .buildOnheap();
+ .build();
first.add(
new MapBasedInputRow(
diff --git a/processing/src/test/java/org/apache/druid/segment/filter/SpatialFilterTest.java b/processing/src/test/java/org/apache/druid/segment/filter/SpatialFilterTest.java
index 8b961b6..47835bf 100644
--- a/processing/src/test/java/org/apache/druid/segment/filter/SpatialFilterTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/filter/SpatialFilterTest.java
@@ -55,6 +55,7 @@ import org.apache.druid.segment.Segment;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.joda.time.Interval;
@@ -113,7 +114,7 @@ public class SpatialFilterTest extends InitializedNullHandlingTest
private static IncrementalIndex makeIncrementalIndex() throws IOException
{
- IncrementalIndex theIndex = new IncrementalIndex.Builder()
+ IncrementalIndex theIndex = new OnheapIncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withMinTimestamp(DATA_INTERVAL.getStartMillis())
@@ -138,7 +139,7 @@ public class SpatialFilterTest extends InitializedNullHandlingTest
).build()
)
.setMaxRowCount(NUM_POINTS)
- .buildOnheap();
+ .build();
theIndex.add(
new MapBasedInputRow(
@@ -279,7 +280,7 @@ public class SpatialFilterTest extends InitializedNullHandlingTest
private static QueryableIndex makeMergedQueryableIndex(IndexSpec indexSpec)
{
try {
- IncrementalIndex first = new IncrementalIndex.Builder()
+ IncrementalIndex first = new OnheapIncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withMinTimestamp(DATA_INTERVAL.getStartMillis())
@@ -303,9 +304,9 @@ public class SpatialFilterTest extends InitializedNullHandlingTest
).build()
)
.setMaxRowCount(1000)
- .buildOnheap();
+ .build();
- IncrementalIndex second = new IncrementalIndex.Builder()
+ IncrementalIndex second = new OnheapIncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withMinTimestamp(DATA_INTERVAL.getStartMillis())
@@ -329,9 +330,9 @@ public class SpatialFilterTest extends InitializedNullHandlingTest
).build()
)
.setMaxRowCount(1000)
- .buildOnheap();
+ .build();
- IncrementalIndex third = new IncrementalIndex.Builder()
+ IncrementalIndex third = new OnheapIncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withMinTimestamp(DATA_INTERVAL.getStartMillis())
@@ -355,7 +356,7 @@ public class SpatialFilterTest extends InitializedNullHandlingTest
).build()
)
.setMaxRowCount(NUM_POINTS)
- .buildOnheap();
+ .build();
first.add(
new MapBasedInputRow(
diff --git a/processing/src/test/java/org/apache/druid/segment/generator/DataGeneratorTest.java b/processing/src/test/java/org/apache/druid/segment/generator/DataGeneratorTest.java
index 814e8ae..0da7d87 100644
--- a/processing/src/test/java/org/apache/druid/segment/generator/DataGeneratorTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/generator/DataGeneratorTest.java
@@ -21,8 +21,17 @@ package org.apache.druid.segment.generator;
import org.apache.commons.math3.distribution.NormalDistribution;
import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.incremental.IncrementalIndex;
+import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
+import org.apache.druid.testing.InitializedNullHandlingTest;
import org.junit.Assert;
import org.junit.Test;
@@ -34,7 +43,7 @@ import java.util.List;
import java.util.Map;
// Doesn't assert behavior right now, just generates rows and prints out some distribution numbers
-public class DataGeneratorTest
+public class DataGeneratorTest extends InitializedNullHandlingTest
{
@Test
public void testSequential()
@@ -538,4 +547,124 @@ public class DataGeneratorTest
}
}
}
+
+ @Test
+ public void testToList()
+ {
+ List<GeneratorColumnSchema> schemas = new ArrayList<>();
+ RowValueTracker tracker = new RowValueTracker();
+
+ schemas.add(
+ GeneratorColumnSchema.makeSequential(
+ "dimA",
+ ValueType.STRING,
+ false,
+ 1,
+ null,
+ 10,
+ 20
+ )
+ );
+
+ schemas.add(
+ GeneratorColumnSchema.makeEnumeratedSequential(
+ "dimB",
+ ValueType.STRING,
+ false,
+ 1,
+ null,
+ Arrays.asList("Hello", "World", "Foo", "Bar")
+ )
+ );
+
+ schemas.add(
+ GeneratorColumnSchema.makeSequential(
+ "dimC",
+ ValueType.STRING,
+ false,
+ 1,
+ 0.50,
+ 30,
+ 40
+ )
+ );
+
+ DataGenerator dataGenerator = new DataGenerator(schemas, 9999, 0, 0, 1000.0);
+ List<InputRow> rows = dataGenerator.toList(100);
+ Assert.assertEquals(100, rows.size());
+
+ for (InputRow row : rows) {
+ tracker.addRow(row);
+ }
+ tracker.printStuff();
+ }
+
+ @Test
+ public void testToIndex()
+ {
+ List<GeneratorColumnSchema> schemas = new ArrayList<>();
+
+ schemas.add(
+ GeneratorColumnSchema.makeSequential(
+ "dimA",
+ ValueType.STRING,
+ false,
+ 1,
+ null,
+ 10,
+ 20
+ )
+ );
+
+ schemas.add(
+ GeneratorColumnSchema.makeEnumeratedSequential(
+ "dimB",
+ ValueType.STRING,
+ false,
+ 1,
+ null,
+ Arrays.asList("Hello", "World", "Foo", "Bar")
+ )
+ );
+
+ schemas.add(
+ GeneratorColumnSchema.makeSequential(
+ "dimC",
+ ValueType.STRING,
+ false,
+ 1,
+ 0.50,
+ 30,
+ 40
+ )
+ );
+
+ DataGenerator dataGenerator = new DataGenerator(schemas, 9999, 0, 0, 1000.0);
+
+ DimensionsSpec dimensions = new DimensionsSpec(
+ Arrays.asList(
+ new StringDimensionSchema("dimA"),
+ new StringDimensionSchema("dimB"),
+ new StringDimensionSchema("dimC")
+ ), null, null
+ );
+ AggregatorFactory[] metrics = {
+ new CountAggregatorFactory("cnt")
+ };
+ final IncrementalIndexSchema schema = new IncrementalIndexSchema.Builder()
+ .withQueryGranularity(Granularities.MINUTE)
+ .withDimensionsSpec(dimensions)
+ .withMetrics(metrics)
+ .withRollup(false)
+ .build();
+
+ IncrementalIndex<?> index = new OnheapIncrementalIndex.Builder()
+ .setIndexSchema(schema)
+ .setSortFacts(false)
+ .setMaxRowCount(1_000_000)
+ .build();
+
+ dataGenerator.addToIndex(index, 100);
+ Assert.assertEquals(100, index.size());
+ }
}
diff --git a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexAdapterTest.java b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexAdapterTest.java
index 32a26a5..e9c6139 100644
--- a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexAdapterTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexAdapterTest.java
@@ -19,7 +19,10 @@
package org.apache.druid.segment.incremental;
+import com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.segment.CloserRule;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.IndexableAdapter;
import org.apache.druid.segment.RowIterator;
@@ -31,12 +34,17 @@ import org.apache.druid.segment.data.ConciseBitmapSerdeFactory;
import org.apache.druid.segment.data.IncrementalIndexTest;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.junit.Assert;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.function.Function;
+@RunWith(Parameterized.class)
public class IncrementalIndexAdapterTest extends InitializedNullHandlingTest
{
private static final IndexSpec INDEX_SPEC = new IndexSpec(
@@ -46,11 +54,31 @@ public class IncrementalIndexAdapterTest extends InitializedNullHandlingTest
CompressionFactory.LongEncodingStrategy.LONGS
);
+ public final IncrementalIndexCreator indexCreator;
+
+ @Rule
+ public final CloserRule closer = new CloserRule(false);
+
+ public IncrementalIndexAdapterTest(String indexType) throws JsonProcessingException
+ {
+ indexCreator = closer.closeLater(new IncrementalIndexCreator(indexType, (builder, args) -> builder
+ .setSimpleTestingIndexSchema("rollup".equals(args[0]), new CountAggregatorFactory("count"))
+ .setMaxRowCount(1_000_000)
+ .build()
+ ));
+ }
+
+ @Parameterized.Parameters(name = "{index}: {0}")
+ public static Collection<?> constructorFeeder()
+ {
+ return IncrementalIndexCreator.getAppendableIndexTypes();
+ }
+
@Test
public void testGetBitmapIndex() throws Exception
{
final long timestamp = System.currentTimeMillis();
- IncrementalIndex incrementalIndex = IncrementalIndexTest.createIndex(null);
+ IncrementalIndex<?> incrementalIndex = indexCreator.createIndex("rollup");
IncrementalIndexTest.populateIndex(timestamp, incrementalIndex);
IndexableAdapter adapter = new IncrementalIndexAdapter(
incrementalIndex.getInterval(),
@@ -70,7 +98,7 @@ public class IncrementalIndexAdapterTest extends InitializedNullHandlingTest
public void testGetRowsIterable() throws Exception
{
final long timestamp = System.currentTimeMillis();
- IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(null);
+ IncrementalIndex<?> toPersist1 = indexCreator.createIndex("rollup");
IncrementalIndexTest.populateIndex(timestamp, toPersist1);
final IndexableAdapter incrementalAdapter = new IncrementalIndexAdapter(
@@ -94,7 +122,7 @@ public class IncrementalIndexAdapterTest extends InitializedNullHandlingTest
public void testGetRowsIterableNoRollup() throws Exception
{
final long timestamp = System.currentTimeMillis();
- IncrementalIndex toPersist1 = IncrementalIndexTest.createNoRollupIndex(null);
+ IncrementalIndex<?> toPersist1 = indexCreator.createIndex("plain");
IncrementalIndexTest.populateIndex(timestamp, toPersist1);
IncrementalIndexTest.populateIndex(timestamp, toPersist1);
IncrementalIndexTest.populateIndex(timestamp, toPersist1);
diff --git a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexCreator.java b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexCreator.java
new file mode 100644
index 0000000..3f112cc
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexCreator.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.incremental;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.cfg.MapperConfig;
+import com.fasterxml.jackson.databind.introspect.AnnotatedClass;
+import com.fasterxml.jackson.databind.introspect.AnnotatedClassResolver;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.fasterxml.jackson.databind.jsontype.SubtypeResolver;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.io.Closer;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * This class handles the incremental-index lifecycle for testing.
+ * Any index created using this class during the test will be closed automatically once this class is closed.
+ *
+ * To allow testing multiple incremental-index implementations, this class can be instantiated with any
+ * {@code AppendableIndexSpec} instance.
+ * Alternatively, this class can instantiate an {@code AppendableIndexSpec} for you given the appendable-index type as
+ * a string.
+ * This allows tests' parameterization with the appendable-index types as strings.
+ *
+ * To further facilitate the tests' parameterization, this class supports listing all the available incremental-index
+ * implementations, and produce a cartesian product of many parameter options together with each incremental-index
+ * implementation.
+ */
+public class IncrementalIndexCreator implements Closeable
+{
+ public static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper();
+
+ /**
+ * Allows adding support for testing unregistered indexes.
+ * It is used by Druid's extensions for the incremental-index.
+ *
+ * @param c an index spec class
+ * @param name an index spec name
+ */
+ public static void addIndexSpec(Class<?> c, String name)
+ {
+ JSON_MAPPER.registerSubtypes(new NamedType(c, name));
+ }
+
+ static {
+ // The off-heap incremental-index is not registered for production, but we want to include it in the tests.
+ IncrementalIndexCreator.addIndexSpec(OffheapIncrementalIndexTestSpec.class, OffheapIncrementalIndexTestSpec.TYPE);
+ }
+
+ /**
+ * Fetch all the available incremental-index implementations.
+ * It can be used to parametrize the test. If more parameters are needed, use indexTypeCartesianProduct().
+ * @see #indexTypeCartesianProduct(Collection[]).
+ *
+ * @return a list of all the incremental-index implementations types (String)
+ */
+ public static List<String> getAppendableIndexTypes()
+ {
+ SubtypeResolver resolver = JSON_MAPPER.getSubtypeResolver();
+ MapperConfig<?> config = JSON_MAPPER.getDeserializationConfig();
+ AnnotatedClass cls = AnnotatedClassResolver.resolveWithoutSuperTypes(config, AppendableIndexSpec.class);
+ Collection<NamedType> types = resolver.collectAndResolveSubtypesByClass(config, cls);
+ return types.stream().map(NamedType::getName).filter(Objects::nonNull).distinct().collect(Collectors.toList());
+ }
+
+ public interface IndexCreator
+ {
+ /**
+ * Build an index given a builder and args.
+ *
+ * @param builder an incremental index builder supplied by the framework
+ * @param args a list of arguments that are used to configure the builder
+ * @return a new instance of an incremental-index
+ */
+ IncrementalIndex<?> createIndex(AppendableIndexBuilder builder, Object... args);
+ }
+
+ private final Closer closer = Closer.create();
+
+ private final AppendableIndexSpec appendableIndexSpec;
+
+ private final IndexCreator indexCreator;
+
+ /**
+ * Initialize the creator.
+ *
+ * @param spec a spec that can generate a incremental-index builder
+ * @param indexCreator a function that generate an index given a builder and arguments
+ */
+ public IncrementalIndexCreator(AppendableIndexSpec spec, IndexCreator indexCreator)
+ {
+ this.appendableIndexSpec = spec;
+ this.indexCreator = indexCreator;
+ }
+
+ /**
+ * Initialize the creator.
+ *
+ * @param indexType an index type (name)
+ * @param indexCreator a function that generate an index given a builder and arguments
+ */
+ public IncrementalIndexCreator(String indexType, IndexCreator indexCreator) throws JsonProcessingException
+ {
+ this(parseIndexType(indexType), indexCreator);
+ }
+
+ /**
+ * Generate an AppendableIndexSpec from index type.
+ *
+ * @param indexType an index type
+ * @return AppendableIndexSpec instance of this type
+ * @throws JsonProcessingException if failed to to parse the index
+ */
+ public static AppendableIndexSpec parseIndexType(String indexType) throws JsonProcessingException
+ {
+ return JSON_MAPPER.readValue(
+ StringUtils.format("{\"type\": \"%s\"}", indexType),
+ AppendableIndexSpec.class
+ );
+ }
+
+ /**
+ * Create an index given the input args.
+ *
+ * @param args The arguments for the index-generator
+ * @return An incremental-index instance
+ */
+ public final IncrementalIndex<?> createIndex(Object... args)
+ {
+ return createIndex(indexCreator, args);
+ }
+
+ /**
+ * Create an index given the input args with a specialized index-creator.
+ *
+ * @param args The arguments for the index-generator
+ * @return An incremental-index instance
+ */
+ public final IncrementalIndex<?> createIndex(IndexCreator indexCreator, Object... args)
+ {
+ return closer.register(indexCreator.createIndex(appendableIndexSpec.builder(), args));
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ closer.close();
+
+ if (appendableIndexSpec instanceof Closeable) {
+ ((Closeable) appendableIndexSpec).close();
+ }
+ }
+
+ /**
+ * Generates all the permutations of the parameters with each of the registered appendable index types.
+ * It is used to parameterize the tests with all the permutations of the parameters
+ * together with all the appnedbale index types.
+ *
+ * For example, for a parameterized test with the following constrctor:
+ * {@code
+ * public IncrementalIndexTest(String indexType, String mode, boolean deserializeComplexMetrics)
+ * {
+ * ...
+ * }
+ * }
+ *
+ * we can test all the input combinations as follows:
+ * {@code
+ * @Parameterized.Parameters(name = "{index}: {0}, {1}, deserialize={2}")
+ * public static Collection<?> constructorFeeder()
+ * {
+ * return IncrementalIndexCreator.indexTypeCartesianProduct(
+ * ImmutableList.of("rollup", "plain"),
+ * ImmutableList.of(true, false)
+ * );
+ * }
+ * }
+ *
+ * @param c a list of collections of parameters
+ * @return the cartesian product of all parameters and appendable index types
+ */
+ public static List<Object[]> indexTypeCartesianProduct(Collection<?>... c)
+ {
+ Collection<?>[] args = new Collection<?>[c.length + 1];
+ args[0] = getAppendableIndexTypes();
+ System.arraycopy(c, 0, args, 1, c.length);
+ return cartesianProduct(args);
+ }
+
+ /**
+ * Generates all the permutations of the parameters.
+ *
+ * @param c a list of collections of parameters
+ * @return the cartesian product of all parameters
+ */
+ private static List<Object[]> cartesianProduct(Collection<?>... c)
+ {
+ final ArrayList<Object[]> res = new ArrayList<>();
+ final int curLength = c.length;
+
+ if (curLength == 0) {
+ res.add(new Object[0]);
+ return res;
+ }
+
+ final int curItem = curLength - 1;
+ for (Object[] objList : cartesianProduct(Arrays.copyOfRange(c, 0, curItem))) {
+ for (Object o : c[curItem]) {
+ Object[] newObjList = Arrays.copyOf(objList, curLength);
+ newObjList[curItem] = o;
+ res.add(newObjList);
+ }
+ }
+
+ return res;
+ }
+}
diff --git a/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexTest.java b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexIngestionTest.java
similarity index 68%
rename from processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexTest.java
rename to processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexIngestionTest.java
index d475a57..5886f7c 100644
--- a/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexIngestionTest.java
@@ -19,6 +19,7 @@
package org.apache.druid.segment.incremental;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.data.input.MapBasedInputRow;
@@ -30,32 +31,54 @@ import org.apache.druid.query.aggregation.LongMaxAggregator;
import org.apache.druid.query.aggregation.LongMaxAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.expression.TestExprMacroTable;
+import org.apache.druid.segment.CloserRule;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.easymock.EasyMock;
import org.junit.Assert;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import java.util.Collection;
import java.util.Collections;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
-public class OnheapIncrementalIndexTest extends InitializedNullHandlingTest
+@RunWith(Parameterized.class)
+public class IncrementalIndexIngestionTest extends InitializedNullHandlingTest
{
- private static final int MAX_ROWS = 100000;
+ private static final int MAX_ROWS = 100_000;
+
+ public final IncrementalIndexCreator indexCreator;
+
+ @Rule
+ public final CloserRule closer = new CloserRule(false);
+
+ public IncrementalIndexIngestionTest(String indexType) throws JsonProcessingException
+ {
+ indexCreator = closer.closeLater(new IncrementalIndexCreator(indexType, (builder, args) -> builder
+ .setIndexSchema((IncrementalIndexSchema) args[0])
+ .setMaxRowCount(MAX_ROWS)
+ .build()
+ ));
+ }
+
+ @Parameterized.Parameters(name = "{index}: {0}")
+ public static Collection<?> constructorFeeder()
+ {
+ return IncrementalIndexCreator.getAppendableIndexTypes();
+ }
@Test
public void testMultithreadAddFacts() throws Exception
{
- final IncrementalIndex index = new IncrementalIndex.Builder()
- .setIndexSchema(
- new IncrementalIndexSchema.Builder()
- .withQueryGranularity(Granularities.MINUTE)
- .withMetrics(new LongMaxAggregatorFactory("max", "max"))
- .build()
- )
- .setMaxRowCount(MAX_ROWS)
- .buildOnheap();
+ final IncrementalIndex<?> index = indexCreator.createIndex(new IncrementalIndexSchema.Builder()
+ .withQueryGranularity(Granularities.MINUTE)
+ .withMetrics(new LongMaxAggregatorFactory("max", "max"))
+ .build()
+ );
final int addThreadCount = 2;
Thread[] addThreads = new Thread[addThreadCount];
@@ -111,39 +134,33 @@ public class OnheapIncrementalIndexTest extends InitializedNullHandlingTest
@Test
public void testMultithreadAddFactsUsingExpressionAndJavaScript() throws Exception
{
- final IncrementalIndex indexExpr = new IncrementalIndex.Builder()
- .setIndexSchema(
- new IncrementalIndexSchema.Builder()
- .withQueryGranularity(Granularities.MINUTE)
- .withMetrics(new LongSumAggregatorFactory(
- "oddnum",
- null,
- "if(value%2==1,1,0)",
- TestExprMacroTable.INSTANCE
- ))
- .withRollup(true)
- .build()
- )
- .setMaxRowCount(MAX_ROWS)
- .buildOnheap();
-
- final IncrementalIndex indexJs = new IncrementalIndex.Builder()
- .setIndexSchema(
- new IncrementalIndexSchema.Builder()
- .withQueryGranularity(Granularities.MINUTE)
- .withMetrics(new JavaScriptAggregatorFactory(
- "oddnum",
- ImmutableList.of("value"),
- "function(current, value) { if (value%2==1) current = current + 1; return current;}",
- "function() {return 0;}",
- "function(a, b) { return a + b;}",
- JavaScriptConfig.getEnabledInstance()
- ))
- .withRollup(true)
- .build()
- )
- .setMaxRowCount(MAX_ROWS)
- .buildOnheap();
+ final IncrementalIndex<?> indexExpr = indexCreator.createIndex(
+ new IncrementalIndexSchema.Builder()
+ .withQueryGranularity(Granularities.MINUTE)
+ .withMetrics(new LongSumAggregatorFactory(
+ "oddnum",
+ null,
+ "if(value%2==1,1,0)",
+ TestExprMacroTable.INSTANCE
+ ))
+ .withRollup(true)
+ .build()
+ );
+
+ final IncrementalIndex<?> indexJs = indexCreator.createIndex(
+ new IncrementalIndexSchema.Builder()
+ .withQueryGranularity(Granularities.MINUTE)
+ .withMetrics(new JavaScriptAggregatorFactory(
+ "oddnum",
+ ImmutableList.of("value"),
+ "function(current, value) { if (value%2==1) current = current + 1; return current;}",
+ "function() {return 0;}",
+ "function(a, b) { return a + b;}",
+ JavaScriptConfig.getEnabledInstance()
+ ))
+ .withRollup(true)
+ .build()
+ );
final int addThreadCount = 2;
Thread[] addThreads = new Thread[addThreadCount];
@@ -205,15 +222,19 @@ public class OnheapIncrementalIndexTest extends InitializedNullHandlingTest
mockedAggregator.close();
EasyMock.expectLastCall().times(1);
- final OnheapIncrementalIndex index = (OnheapIncrementalIndex) new IncrementalIndex.Builder()
- .setIndexSchema(
- new IncrementalIndexSchema.Builder()
- .withQueryGranularity(Granularities.MINUTE)
- .withMetrics(new LongMaxAggregatorFactory("max", "max"))
- .build()
- )
- .setMaxRowCount(MAX_ROWS)
- .buildOnheap();
+ final IncrementalIndex<?> genericIndex = indexCreator.createIndex(
+ new IncrementalIndexSchema.Builder()
+ .withQueryGranularity(Granularities.MINUTE)
+ .withMetrics(new LongMaxAggregatorFactory("max", "max"))
+ .build()
+ );
+
+ // This test is specific to the on-heap index
+ if (!(genericIndex instanceof OnheapIncrementalIndex)) {
+ return;
+ }
+
+ final OnheapIncrementalIndex index = (OnheapIncrementalIndex) genericIndex;
index.add(new MapBasedInputRow(
0,
diff --git a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexMultiValueSpecTest.java b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexMultiValueSpecTest.java
index dfd386f..24aae3a 100644
--- a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexMultiValueSpecTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexMultiValueSpecTest.java
@@ -19,6 +19,7 @@
package org.apache.druid.segment.incremental;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.Lists;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.Row;
@@ -28,19 +29,45 @@ import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.segment.CloserRule;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.junit.Assert;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import java.util.Arrays;
+import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
/**
*/
+@RunWith(Parameterized.class)
public class IncrementalIndexMultiValueSpecTest extends InitializedNullHandlingTest
{
+ public final IncrementalIndexCreator indexCreator;
+
+ @Rule
+ public final CloserRule closer = new CloserRule(false);
+
+ public IncrementalIndexMultiValueSpecTest(String indexType) throws JsonProcessingException
+ {
+ indexCreator = closer.closeLater(new IncrementalIndexCreator(indexType, (builder, args) -> builder
+ .setIndexSchema((IncrementalIndexSchema) args[0])
+ .setMaxRowCount(10_000)
+ .build()
+ ));
+ }
+
+ @Parameterized.Parameters(name = "{index}: {0}")
+ public static Collection<?> constructorFeeder()
+ {
+ return IncrementalIndexCreator.getAppendableIndexTypes();
+ }
+
@Test
public void test() throws IndexSizeExceededException
{
@@ -78,10 +105,7 @@ public class IncrementalIndexMultiValueSpecTest extends InitializedNullHandlingT
return null;
}
};
- IncrementalIndex<?> index = new IncrementalIndex.Builder()
- .setIndexSchema(schema)
- .setMaxRowCount(10000)
- .buildOnheap();
+ IncrementalIndex<?> index = indexCreator.createIndex(schema);
index.add(
new MapBasedInputRow(
0,
diff --git a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexRowCompTest.java b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexRowCompTest.java
index f6f95e0..166b332 100644
--- a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexRowCompTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexRowCompTest.java
@@ -19,29 +19,54 @@
package org.apache.druid.segment.incremental;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.Lists;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.segment.CloserRule;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.junit.Assert;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
/**
*/
+@RunWith(Parameterized.class)
public class IncrementalIndexRowCompTest extends InitializedNullHandlingTest
{
+ public final IncrementalIndexCreator indexCreator;
+
+ @Rule
+ public final CloserRule closer = new CloserRule(false);
+
+ public IncrementalIndexRowCompTest(String indexType) throws JsonProcessingException
+ {
+ indexCreator = closer.closeLater(
+ new IncrementalIndexCreator(indexType, (builder, args) -> builder
+ .setSimpleTestingIndexSchema(new CountAggregatorFactory("cnt"))
+ .setMaxRowCount(1_000)
+ .build())
+ );
+ }
+
+ @Parameterized.Parameters(name = "{index}: {0}")
+ public static Collection<?> constructorFeeder()
+ {
+ return IncrementalIndexCreator.getAppendableIndexTypes();
+ }
+
@Test
public void testBasic()
{
- IncrementalIndex<?> index = new IncrementalIndex.Builder()
- .setSimpleTestingIndexSchema(new CountAggregatorFactory("cnt"))
- .setMaxRowCount(1000)
- .buildOnheap();
+ IncrementalIndex<?> index = indexCreator.createIndex();
long time = System.currentTimeMillis();
IncrementalIndexRow ir1 = index.toIncrementalIndexRow(toMapRow(time, "billy", "A", "joe", "B")).getIncrementalIndexRow();
diff --git a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexRowSizeTest.java b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexRowSizeTest.java
index 865789e..cfd2e1a 100644
--- a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexRowSizeTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexRowSizeTest.java
@@ -19,30 +19,55 @@
package org.apache.druid.segment.incremental;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.Lists;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.segment.CloserRule;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.junit.Assert;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import java.util.Arrays;
+import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
/**
*/
+@RunWith(Parameterized.class)
public class IncrementalIndexRowSizeTest extends InitializedNullHandlingTest
{
+ public final IncrementalIndexCreator indexCreator;
+
+ @Rule
+ public final CloserRule closer = new CloserRule(false);
+
+ public IncrementalIndexRowSizeTest(String indexType) throws JsonProcessingException
+ {
+ indexCreator = closer.closeLater(
+ new IncrementalIndexCreator(indexType, (builder, args) -> builder
+ .setSimpleTestingIndexSchema(new CountAggregatorFactory("cnt"))
+ .setMaxRowCount(10_000)
+ .setMaxBytesInMemory(1_000)
+ .build())
+ );
+ }
+
+ @Parameterized.Parameters(name = "{index}: {0}")
+ public static Collection<?> constructorFeeder()
+ {
+ return IncrementalIndexCreator.getAppendableIndexTypes();
+ }
+
@Test
public void testIncrementalIndexRowSizeBasic()
{
- IncrementalIndex index = new IncrementalIndex.Builder()
- .setSimpleTestingIndexSchema(new CountAggregatorFactory("cnt"))
- .setMaxRowCount(10000)
- .setMaxBytesInMemory(1000)
- .buildOnheap();
+ IncrementalIndex<?> index = indexCreator.createIndex();
long time = System.currentTimeMillis();
IncrementalIndex.IncrementalIndexRowResult tndResult = index.toIncrementalIndexRow(toMapRow(
time,
@@ -59,11 +84,7 @@ public class IncrementalIndexRowSizeTest extends InitializedNullHandlingTest
@Test
public void testIncrementalIndexRowSizeArr()
{
- IncrementalIndex index = new IncrementalIndex.Builder()
- .setSimpleTestingIndexSchema(new CountAggregatorFactory("cnt"))
- .setMaxRowCount(10000)
- .setMaxBytesInMemory(1000)
- .buildOnheap();
+ IncrementalIndex<?> index = indexCreator.createIndex();
long time = System.currentTimeMillis();
IncrementalIndex.IncrementalIndexRowResult tndResult = index.toIncrementalIndexRow(toMapRow(
time + 1,
@@ -80,11 +101,7 @@ public class IncrementalIndexRowSizeTest extends InitializedNullHandlingTest
@Test
public void testIncrementalIndexRowSizeComplex()
{
- IncrementalIndex index = new IncrementalIndex.Builder()
- .setSimpleTestingIndexSchema(new CountAggregatorFactory("cnt"))
- .setMaxRowCount(10000)
- .setMaxBytesInMemory(1000)
- .buildOnheap();
+ IncrementalIndex<?> index = indexCreator.createIndex();
long time = System.currentTimeMillis();
IncrementalIndex.IncrementalIndexRowResult tndResult = index.toIncrementalIndexRow(toMapRow(
time + 1,
@@ -101,11 +118,7 @@ public class IncrementalIndexRowSizeTest extends InitializedNullHandlingTest
@Test
public void testIncrementalIndexRowSizeEmptyString()
{
- IncrementalIndex index = new IncrementalIndex.Builder()
- .setSimpleTestingIndexSchema(new CountAggregatorFactory("cnt"))
- .setMaxRowCount(10000)
- .setMaxBytesInMemory(1000)
- .buildOnheap();
+ IncrementalIndex<?> index = indexCreator.createIndex();
long time = System.currentTimeMillis();
IncrementalIndex.IncrementalIndexRowResult tndResult = index.toIncrementalIndexRow(toMapRow(
time + 1,
diff --git a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java
index ec03a9e..6168719 100644
--- a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java
@@ -19,6 +19,7 @@
package org.apache.druid.segment.incremental;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.base.Suppliers;
@@ -55,6 +56,7 @@ import org.apache.druid.query.groupby.GroupByQueryEngine;
import org.apache.druid.query.topn.TopNQueryBuilder;
import org.apache.druid.query.topn.TopNQueryEngine;
import org.apache.druid.query.topn.TopNResultValue;
+import org.apache.druid.segment.CloserRule;
import org.apache.druid.segment.ColumnSelector;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.Cursor;
@@ -68,6 +70,7 @@ import org.apache.druid.testing.InitializedNullHandlingTest;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Assert;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -86,46 +89,30 @@ import java.util.concurrent.atomic.AtomicInteger;
@RunWith(Parameterized.class)
public class IncrementalIndexStorageAdapterTest extends InitializedNullHandlingTest
{
- interface IndexCreator
- {
- IncrementalIndex createIndex();
- }
+ public final IncrementalIndexCreator indexCreator;
- private final IndexCreator indexCreator;
+ @Rule
+ public final CloserRule closer = new CloserRule(false);
- public IncrementalIndexStorageAdapterTest(
- IndexCreator IndexCreator
- )
+ public IncrementalIndexStorageAdapterTest(String indexType) throws JsonProcessingException
{
- this.indexCreator = IndexCreator;
+ indexCreator = closer.closeLater(new IncrementalIndexCreator(indexType, (builder, args) -> builder
+ .setSimpleTestingIndexSchema(new CountAggregatorFactory("cnt"))
+ .setMaxRowCount(1_000)
+ .build()
+ ));
}
- @Parameterized.Parameters
+ @Parameterized.Parameters(name = "{index}: {0}")
public static Collection<?> constructorFeeder()
{
- return Arrays.asList(
- new Object[][]{
- {
- new IndexCreator()
- {
- @Override
- public IncrementalIndex createIndex()
- {
- return new IncrementalIndex.Builder()
- .setSimpleTestingIndexSchema(new CountAggregatorFactory("cnt"))
- .setMaxRowCount(1000)
- .buildOnheap();
- }
- }
- }
- }
- );
+ return IncrementalIndexCreator.getAppendableIndexTypes();
}
@Test
public void testSanity() throws Exception
{
- IncrementalIndex index = indexCreator.createIndex();
+ IncrementalIndex<?> index = indexCreator.createIndex();
index.add(
new MapBasedInputRow(
System.currentTimeMillis() - 1,
@@ -189,7 +176,7 @@ public class IncrementalIndexStorageAdapterTest extends InitializedNullHandlingT
@Test
public void testObjectColumnSelectorOnVaryingColumnSchema() throws Exception
{
- IncrementalIndex index = indexCreator.createIndex();
+ IncrementalIndex<?> index = indexCreator.createIndex();
index.add(
new MapBasedInputRow(
DateTimes.of("2014-09-01T00:00:00"),
@@ -271,7 +258,7 @@ public class IncrementalIndexStorageAdapterTest extends InitializedNullHandlingT
public void testResetSanity() throws IOException
{
- IncrementalIndex index = indexCreator.createIndex();
+ IncrementalIndex<?> index = indexCreator.createIndex();
DateTime t = DateTimes.nowUtc();
Interval interval = new Interval(t.minusMinutes(1), t.plusMinutes(1));
@@ -331,7 +318,7 @@ public class IncrementalIndexStorageAdapterTest extends InitializedNullHandlingT
@Test
public void testSingleValueTopN() throws IOException
{
- IncrementalIndex index = indexCreator.createIndex();
+ IncrementalIndex<?> index = indexCreator.createIndex();
DateTime t = DateTimes.nowUtc();
index.add(
new MapBasedInputRow(
@@ -373,7 +360,7 @@ public class IncrementalIndexStorageAdapterTest extends InitializedNullHandlingT
@Test
public void testFilterByNull() throws Exception
{
- IncrementalIndex index = indexCreator.createIndex();
+ IncrementalIndex<?> index = indexCreator.createIndex();
index.add(
new MapBasedInputRow(
System.currentTimeMillis() - 1,
@@ -434,7 +421,7 @@ public class IncrementalIndexStorageAdapterTest extends InitializedNullHandlingT
@Test
public void testCursoringAndIndexUpdationInterleaving() throws Exception
{
- final IncrementalIndex index = indexCreator.createIndex();
+ final IncrementalIndex<?> index = indexCreator.createIndex();
final long timestamp = System.currentTimeMillis();
for (int i = 0; i < 2; i++) {
@@ -498,7 +485,7 @@ public class IncrementalIndexStorageAdapterTest extends InitializedNullHandlingT
{
// Tests the dictionary ID race condition bug described at https://github.com/apache/druid/pull/6340
- final IncrementalIndex index = indexCreator.createIndex();
+ final IncrementalIndex<?> index = indexCreator.createIndex();
final long timestamp = System.currentTimeMillis();
for (int i = 0; i < 5; i++) {
@@ -549,7 +536,7 @@ public class IncrementalIndexStorageAdapterTest extends InitializedNullHandlingT
@Test
public void testCursoringAndSnapshot() throws Exception
{
- final IncrementalIndex index = indexCreator.createIndex();
+ final IncrementalIndex<?> index = indexCreator.createIndex();
final long timestamp = System.currentTimeMillis();
for (int i = 0; i < 2; i++) {
diff --git a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexTest.java b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexTest.java
index b8b4e70..4a6b38b 100644
--- a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexTest.java
@@ -19,10 +19,10 @@
package org.apache.druid.segment.incremental;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
-import org.apache.druid.collections.CloseableStupidPool;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.DoubleDimensionSchema;
@@ -31,7 +31,6 @@ import org.apache.druid.data.input.impl.LongDimensionSchema;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.granularity.Granularities;
-import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
@@ -39,53 +38,38 @@ import org.apache.druid.query.aggregation.FilteredAggregatorFactory;
import org.apache.druid.query.filter.SelectorDimFilter;
import org.apache.druid.segment.CloserRule;
import org.apache.druid.testing.InitializedNullHandlingTest;
-import org.junit.After;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
-import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
-import java.util.List;
/**
*/
@RunWith(Parameterized.class)
public class IncrementalIndexTest extends InitializedNullHandlingTest
{
- interface IndexCreator
- {
- IncrementalIndex createIndex();
- }
-
- @Rule
- public ExpectedException expectedException = ExpectedException.none();
+ public final IncrementalIndexCreator indexCreator;
@Rule
- public final CloserRule closerRule = new CloserRule(false);
-
- private final IndexCreator indexCreator;
- private final Closer resourceCloser;
+ public final CloserRule closer = new CloserRule(false);
- @After
- public void teardown() throws IOException
+ public IncrementalIndexTest(String indexType, String mode, boolean deserializeComplexMetrics,
+ IncrementalIndexSchema schema) throws JsonProcessingException
{
- resourceCloser.close();
- }
-
- public IncrementalIndexTest(IndexCreator IndexCreator, Closer resourceCloser)
- {
- this.indexCreator = IndexCreator;
- this.resourceCloser = resourceCloser;
+ indexCreator = closer.closeLater(new IncrementalIndexCreator(indexType, (builder, args) -> builder
+ .setIndexSchema(schema)
+ .setDeserializeComplexMetrics(deserializeComplexMetrics)
+ .setSortFacts("rollup".equals(mode))
+ .setMaxRowCount(1_000_000)
+ .build())
+ );
}
- @Parameterized.Parameters
+ @Parameterized.Parameters(name = "{index}: {0}, {1}, deserialize={2}")
public static Collection<?> constructorFeeder()
{
DimensionsSpec dimensions = new DimensionsSpec(
@@ -108,59 +92,17 @@ public class IncrementalIndexTest extends InitializedNullHandlingTest
.withMetrics(metrics)
.build();
- final List<Object[]> constructors = new ArrayList<>();
- for (final Boolean sortFacts : ImmutableList.of(false, true)) {
- constructors.add(
- new Object[]{
- new IndexCreator()
- {
- @Override
- public IncrementalIndex createIndex()
- {
- return new OnheapIncrementalIndex.Builder()
- .setIndexSchema(schema)
- .setDeserializeComplexMetrics(false)
- .setSortFacts(sortFacts)
- .setMaxRowCount(1000)
- .build();
- }
- },
- Closer.create()
- }
- );
- final Closer poolCloser = Closer.create();
- final CloseableStupidPool<ByteBuffer> stupidPool = new CloseableStupidPool<>(
- "OffheapIncrementalIndex-bufferPool",
- () -> ByteBuffer.allocate(256 * 1024)
- );
- poolCloser.register(stupidPool);
- constructors.add(
- new Object[]{
- new IndexCreator()
- {
- @Override
- public IncrementalIndex createIndex()
- {
- return new OffheapIncrementalIndex.Builder()
- .setBufferPool(stupidPool)
- .setIndexSchema(schema)
- .setSortFacts(sortFacts)
- .setMaxRowCount(1000000)
- .build();
- }
- },
- poolCloser
- }
- );
- }
-
- return constructors;
+ return IncrementalIndexCreator.indexTypeCartesianProduct(
+ ImmutableList.of("rollup", "plain"),
+ ImmutableList.of(true, false),
+ ImmutableList.of(schema)
+ );
}
@Test(expected = ISE.class)
public void testDuplicateDimensions() throws IndexSizeExceededException
{
- IncrementalIndex index = closerRule.closeLater(indexCreator.createIndex());
+ IncrementalIndex<?> index = indexCreator.createIndex();
index.add(
new MapBasedInputRow(
System.currentTimeMillis() - 1,
@@ -180,7 +122,7 @@ public class IncrementalIndexTest extends InitializedNullHandlingTest
@Test(expected = ISE.class)
public void testDuplicateDimensionsFirstOccurrence() throws IndexSizeExceededException
{
- IncrementalIndex index = closerRule.closeLater(indexCreator.createIndex());
+ IncrementalIndex<?> index = indexCreator.createIndex();
index.add(
new MapBasedInputRow(
System.currentTimeMillis() - 1,
@@ -193,7 +135,7 @@ public class IncrementalIndexTest extends InitializedNullHandlingTest
@Test
public void controlTest() throws IndexSizeExceededException
{
- IncrementalIndex index = closerRule.closeLater(indexCreator.createIndex());
+ IncrementalIndex<?> index = indexCreator.createIndex();
index.add(
new MapBasedInputRow(
System.currentTimeMillis() - 1,
@@ -220,7 +162,7 @@ public class IncrementalIndexTest extends InitializedNullHandlingTest
@Test
public void testUnparseableNumerics() throws IndexSizeExceededException
{
- IncrementalIndex<?> index = closerRule.closeLater(indexCreator.createIndex());
+ IncrementalIndex<?> index = indexCreator.createIndex();
IncrementalIndexAddResult result;
result = index.add(
@@ -286,7 +228,7 @@ public class IncrementalIndexTest extends InitializedNullHandlingTest
Lists.newArrayList("billy", "joe"),
ImmutableMap.of("billy", "A", "joe", "B")
);
- IncrementalIndex index = closerRule.closeLater(indexCreator.createIndex());
+ IncrementalIndex<?> index = indexCreator.createIndex();
index.add(row);
index.add(row);
index.add(row);
diff --git a/processing/src/test/java/org/apache/druid/segment/incremental/OffheapIncrementalIndexTestSpec.java b/processing/src/test/java/org/apache/druid/segment/incremental/OffheapIncrementalIndexTestSpec.java
new file mode 100644
index 0000000..9259731
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/segment/incremental/OffheapIncrementalIndexTestSpec.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.incremental;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Supplier;
+import org.apache.druid.collections.CloseableStupidPool;
+import org.apache.druid.utils.JvmUtils;
+
+import javax.annotation.Nullable;
+import java.io.Closeable;
+import java.nio.ByteBuffer;
+
+/**
+ * OffheapIncrementalIndexTestSpec describes the off-heap indexing method for data ingestion.
+ * It also acts as a ByteBuffer supplier for the created off-heap incremental index.
+ *
+ * Note: since the off-heap incremental index is not yet supported in production ingestion, we define its spec here
+ * only for testing purposes.
+ */
+public class OffheapIncrementalIndexTestSpec implements AppendableIndexSpec, Supplier<ByteBuffer>, Closeable
+{
+ public static final String TYPE = "offheap";
+ static final int DEFAULT_BUFFER_SIZE = 1 << 23;
+ static final int DEFAULT_CACHE_SIZE = 1 << 30;
+
+ final int bufferSize;
+ final int cacheSize;
+
+ final CloseableStupidPool<ByteBuffer> bufferPool;
+
+ @JsonCreator
+ public OffheapIncrementalIndexTestSpec(
+ final @JsonProperty("bufferSize") @Nullable Integer bufferSize,
+ final @JsonProperty("cacheSize") @Nullable Integer cacheSize
+ )
+ {
+ this.bufferSize = bufferSize != null && bufferSize > 0 ? bufferSize : DEFAULT_BUFFER_SIZE;
+ this.cacheSize = cacheSize != null && cacheSize > this.bufferSize ? cacheSize : DEFAULT_CACHE_SIZE;
+ this.bufferPool = new CloseableStupidPool<>(
+ "Off-heap incremental-index buffer pool",
+ this,
+ 0,
+ this.cacheSize / this.bufferSize
+ );
+ }
+
+ @JsonProperty
+ public int getBufferSize()
+ {
+ return bufferSize;
+ }
+
+ @JsonProperty
+ public int getCacheSize()
+ {
+ return cacheSize;
+ }
+
+ @Override
+ public AppendableIndexBuilder builder()
+ {
+ return new OffheapIncrementalIndex.Builder().setBufferPool(bufferPool);
+ }
+
+ @Override
+ public long getDefaultMaxBytesInMemory()
+ {
+ // In the realtime node, the entire JVM's direct memory is utilized for ingestion and persist operations.
+ // But maxBytesInMemory only refers to the active index size and not to the index being flushed to disk and the
+ // persist buffer.
+ // To account for that, we set default to 1/2 of the max JVM's direct memory.
+ return JvmUtils.getRuntimeInfo().getDirectMemorySizeBytes() / 2;
+ }
+
+ // Supplier<ByteBuffer> and Closeable interface implementation
+
+ @Override
+ public ByteBuffer get()
+ {
+ return ByteBuffer.allocateDirect(bufferSize);
+ }
+
+ @Override
+ public void close()
+ {
+ bufferPool.close();
+ }
+}
diff --git a/processing/src/test/java/org/apache/druid/segment/virtual/ExpressionSelectorsTest.java b/processing/src/test/java/org/apache/druid/segment/virtual/ExpressionSelectorsTest.java
index 5bd1a59..64da13d 100644
--- a/processing/src/test/java/org/apache/druid/segment/virtual/ExpressionSelectorsTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/virtual/ExpressionSelectorsTest.java
@@ -48,6 +48,7 @@ import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.incremental.IncrementalIndexStorageAdapter;
import org.apache.druid.segment.incremental.IndexSizeExceededException;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.junit.Assert;
import org.junit.Test;
@@ -360,7 +361,7 @@ public class ExpressionSelectorsTest extends InitializedNullHandlingTest
true
);
- IncrementalIndex index = new IncrementalIndex.Builder().setMaxRowCount(100).setIndexSchema(schema).buildOnheap();
+ IncrementalIndex index = new OnheapIncrementalIndex.Builder().setMaxRowCount(100).setIndexSchema(schema).build();
index.add(
new MapBasedInputRow(
DateTimes.nowUtc().getMillis(),
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java b/server/src/test/java/org/apache/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java
index 1bb58ab..10a7ec9 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java
@@ -46,6 +46,7 @@ import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.incremental.IncrementalIndexStorageAdapter;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
@@ -126,7 +127,7 @@ public class IngestSegmentFirehoseTest
try (
final QueryableIndex qi = indexIO.loadIndex(segmentDir);
- final IncrementalIndex index = new IncrementalIndex.Builder()
+ final IncrementalIndex index = new OnheapIncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withDimensionsSpec(DIMENSIONS_SPEC_REINDEX)
@@ -134,7 +135,7 @@ public class IngestSegmentFirehoseTest
.build()
)
.setMaxRowCount(5000)
- .buildOnheap()
+ .build()
) {
final StorageAdapter sa = new QueryableIndexStorageAdapter(qi);
final WindowedStorageAdapter wsa = new WindowedStorageAdapter(sa, sa.getInterval());
@@ -216,7 +217,7 @@ public class IngestSegmentFirehoseTest
);
try (
- final IncrementalIndex index = new IncrementalIndex.Builder()
+ final IncrementalIndex index = new OnheapIncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withDimensionsSpec(parser.getParseSpec().getDimensionsSpec())
@@ -224,7 +225,7 @@ public class IngestSegmentFirehoseTest
.build()
)
.setMaxRowCount(5000)
- .buildOnheap()
+ .build()
) {
for (String line : rows) {
index.add(parser.parse(line));
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org