You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ma...@apache.org on 2022/04/08 18:02:11 UTC
[druid] branch master updated: Add a new flag for ingestion to preserve existing metrics (#12185)
This is an automated email from the ASF dual-hosted git repository.
maytasm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 8edea5a82d Add a new flag for ingestion to preserve existing metrics (#12185)
8edea5a82d is described below
commit 8edea5a82dfdb34c3177c0efd7a79bae54751ab8
Author: Maytas Monsereenusorn <ma...@apache.org>
AuthorDate: Fri Apr 8 11:02:02 2022 -0700
Add a new flag for ingestion to preserve existing metrics (#12185)
* add impl
* add impl
* fix checkstyle
* add impl
* add unit test
* fix stuff
* fix stuff
* fix stuff
* add unit test
* add more unit tests
* add more unit tests
* add IT
* add IT
* add IT
* add IT
* add ITs
* address comments
* fix test
* fix test
* fix test
* address comments
* address comments
* address comments
* fix conflict
* fix checkstyle
* address comments
* fix test
* fix checkstyle
* fix test
* fix test
* fix IT
---
.../apache/druid/data/input/InputRowSchema.java | 27 ++
.../druid/indexing/input/DruidInputSource.java | 35 ++-
.../druid/indexing/input/InputRowSchemas.java | 6 +-
.../task/ClientCompactionTaskQuerySerdeTest.java | 5 +-
.../druid/indexing/input/DruidInputSourceTest.java | 78 ++++++
.../druid/indexing/input/InputRowSchemasTest.java | 71 +++++
integration-tests/pom.xml | 5 +
.../apache/druid/testing/utils/CompactionUtil.java | 1 +
.../coordinator/duty/ITAutoCompactionTest.java | 297 ++++++++++++++++++---
.../duty/ITAutoCompactionUpgradeTest.java | 1 +
.../wikipedia_index_no_rollup_preserve_metric.json | 76 ++++++
.../wikipedia_index_rollup_preserve_metric.json | 95 +++++++
.../indexer/wikipedia_index_sketch_queries.json | 49 ++++
.../incremental/AppendableIndexBuilder.java | 21 +-
.../segment/incremental/IncrementalIndex.java | 10 +
.../incremental/OnheapIncrementalIndex.java | 165 ++++++++++--
.../druid/segment/data/IncrementalIndexTest.java | 285 +++++++++++++++++++-
.../incremental/IncrementalIndexAdapterTest.java | 2 +-
.../OnheapIncrementalIndexBenchmark.java | 2 +
.../ClientCompactionTaskQueryTuningConfig.java | 25 +-
.../UserCompactionTaskQueryTuningConfig.java | 3 +
.../DataSourceCompactionConfigTest.java | 44 +++
.../UserCompactionTaskQueryTuningConfigTest.java | 3 +
.../coordinator/duty/CompactSegmentsTest.java | 14 +
.../duty/NewestSegmentFirstIteratorTest.java | 8 +
.../duty/NewestSegmentFirstPolicyTest.java | 109 +++++++-
26 files changed, 1358 insertions(+), 79 deletions(-)
diff --git a/core/src/main/java/org/apache/druid/data/input/InputRowSchema.java b/core/src/main/java/org/apache/druid/data/input/InputRowSchema.java
index 227bd3a6d1..3c4263ba99 100644
--- a/core/src/main/java/org/apache/druid/data/input/InputRowSchema.java
+++ b/core/src/main/java/org/apache/druid/data/input/InputRowSchema.java
@@ -19,9 +19,13 @@
package org.apache.druid.data.input;
+import com.google.common.collect.ImmutableSet;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
+import javax.validation.constraints.NotNull;
+import java.util.Set;
+
/**
* Schema of {@link InputRow}.
*/
@@ -30,16 +34,39 @@ public class InputRowSchema
private final TimestampSpec timestampSpec;
private final DimensionsSpec dimensionsSpec;
private final ColumnsFilter columnsFilter;
+ /**
+ * Set of metric names for further downstream processing by {@link InputSource}.
+ * Empty set if no metric given.
+ */
+ @NotNull
+ private final Set<String> metricNames;
public InputRowSchema(
final TimestampSpec timestampSpec,
final DimensionsSpec dimensionsSpec,
final ColumnsFilter columnsFilter
)
+ {
+ this(timestampSpec, dimensionsSpec, columnsFilter, ImmutableSet.of());
+ }
+
+ public InputRowSchema(
+ final TimestampSpec timestampSpec,
+ final DimensionsSpec dimensionsSpec,
+ final ColumnsFilter columnsFilter,
+ final Set<String> metricNames
+ )
{
this.timestampSpec = timestampSpec;
this.dimensionsSpec = dimensionsSpec;
this.columnsFilter = columnsFilter;
+ this.metricNames = metricNames == null ? ImmutableSet.of() : metricNames;
+ }
+
+ @NotNull
+ public Set<String> getMetricNames()
+ {
+ return metricNames;
}
public TimestampSpec getTimestampSpec()
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java
index ff1dd617f1..6f1a6d6446 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java
@@ -23,12 +23,14 @@ import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.data.input.AbstractInputSource;
+import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputFileAttribute;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRowSchema;
@@ -241,8 +243,26 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI
final DruidSegmentInputFormat inputFormat = new DruidSegmentInputFormat(indexIO, dimFilter);
+ return new InputEntityIteratingReader(
+ getInputRowSchemaToUse(inputRowSchema),
+ inputFormat,
+ entityIterator,
+ temporaryDirectory
+ );
+ }
+
+ @VisibleForTesting
+ InputRowSchema getInputRowSchemaToUse(InputRowSchema inputRowSchema)
+ {
final InputRowSchema inputRowSchemaToUse;
+ ColumnsFilter columnsFilterToUse = inputRowSchema.getColumnsFilter();
+ if (inputRowSchema.getMetricNames() != null) {
+ for (String metricName : inputRowSchema.getMetricNames()) {
+ columnsFilterToUse = columnsFilterToUse.plus(metricName);
+ }
+ }
+
if (taskConfig.isIgnoreTimestampSpecForDruidInputSource()) {
// Legacy compatibility mode; see https://github.com/apache/druid/pull/10267.
LOG.warn("Ignoring the provided timestampSpec and reading the __time column instead. To use timestampSpecs with "
@@ -251,10 +271,14 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI
inputRowSchemaToUse = new InputRowSchema(
new TimestampSpec(ColumnHolder.TIME_COLUMN_NAME, STANDARD_TIME_COLUMN_FORMATS.iterator().next(), null),
inputRowSchema.getDimensionsSpec(),
- inputRowSchema.getColumnsFilter().plus(ColumnHolder.TIME_COLUMN_NAME)
+ columnsFilterToUse.plus(ColumnHolder.TIME_COLUMN_NAME)
);
} else {
- inputRowSchemaToUse = inputRowSchema;
+ inputRowSchemaToUse = new InputRowSchema(
+ inputRowSchema.getTimestampSpec(),
+ inputRowSchema.getDimensionsSpec(),
+ columnsFilterToUse
+ );
}
if (ColumnHolder.TIME_COLUMN_NAME.equals(inputRowSchemaToUse.getTimestampSpec().getTimestampColumn())
@@ -268,12 +292,7 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI
);
}
- return new InputEntityIteratingReader(
- inputRowSchemaToUse,
- inputFormat,
- entityIterator,
- temporaryDirectory
- );
+ return inputRowSchemaToUse;
}
private List<TimelineObjectHolder<String, DataSegment>> createTimeline()
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/InputRowSchemas.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/InputRowSchemas.java
index f273be7922..c895eb14b7 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/input/InputRowSchemas.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/InputRowSchemas.java
@@ -29,6 +29,7 @@ import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.transform.Transform;
import org.apache.druid.segment.transform.TransformSpec;
+import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;
@@ -56,7 +57,10 @@ public class InputRowSchemas
dataSchema.getDimensionsSpec(),
dataSchema.getTransformSpec(),
dataSchema.getAggregators()
- )
+ ),
+ Arrays.stream(dataSchema.getAggregators())
+ .map(AggregatorFactory::getName)
+ .collect(Collectors.toSet())
);
}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
index d2ed8b31e5..29952ebc1c 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
@@ -57,6 +57,7 @@ import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.data.BitmapSerde.DefaultBitmapSerdeFactory;
import org.apache.druid.segment.data.CompressionFactory.LongEncodingStrategy;
import org.apache.druid.segment.data.CompressionStrategy;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
@@ -93,6 +94,7 @@ public class ClientCompactionTaskQuerySerdeTest
true
),
new ClientCompactionTaskQueryTuningConfig(
+ null,
null,
40000,
2000L,
@@ -249,7 +251,7 @@ public class ClientCompactionTaskQuerySerdeTest
new ParallelIndexTuningConfig(
null,
null,
- null,
+ new OnheapIncrementalIndex.Spec(true),
40000,
2000L,
null,
@@ -313,6 +315,7 @@ public class ClientCompactionTaskQuerySerdeTest
),
new ClientCompactionTaskQueryTuningConfig(
100,
+ new OnheapIncrementalIndex.Spec(true),
40000,
2000L,
30000L,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidInputSourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidInputSourceTest.java
index ebc2b94f32..2989415a5e 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidInputSourceTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidInputSourceTest.java
@@ -23,8 +23,13 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
import org.apache.druid.client.coordinator.CoordinatorClient;
+import org.apache.druid.data.input.ColumnsFilter;
+import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.InputSource;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.guice.IndexingServiceInputSourceModule;
import org.apache.druid.indexing.common.RetryPolicyFactory;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
@@ -35,12 +40,15 @@ import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.TestHelper;
import org.easymock.EasyMock;
import org.hamcrest.CoreMatchers;
+import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
+import java.util.Arrays;
+
public class DruidInputSourceTest
{
private final IndexIO indexIO = EasyMock.createMock(IndexIO.class);
@@ -221,4 +229,74 @@ public class DruidInputSourceTest
mapper.readValue(json, InputSource.class);
}
+
+ @Test
+ public void testReaderColumnsFilterWithMetricGiven()
+ {
+ String datasource = "foo";
+ Interval interval = Intervals.of("2000/2001");
+ String column = "c1";
+ String metricName = "m1";
+ ColumnsFilter originalColumnsFilter = ColumnsFilter.inclusionBased(ImmutableSet.of(column));
+ InputRowSchema inputRowSchema = new InputRowSchema(
+ new TimestampSpec("timestamp", "auto", null),
+ new DimensionsSpec(
+ DimensionsSpec.getDefaultSchemas(Arrays.asList("timestamp", "a", "b"))
+ ),
+ originalColumnsFilter,
+ ImmutableSet.of(metricName)
+ );
+ DruidInputSource druidInputSource = new DruidInputSource(
+ datasource,
+ interval,
+ null,
+ null,
+ ImmutableList.of("a"),
+ ImmutableList.of("b"),
+ indexIO,
+ coordinatorClient,
+ segmentCacheManagerFactory,
+ retryPolicyFactory,
+ taskConfig
+ );
+ InputRowSchema inputSourceReader = druidInputSource.getInputRowSchemaToUse(inputRowSchema);
+ ColumnsFilter columnsFilter = inputSourceReader.getColumnsFilter();
+ Assert.assertTrue(columnsFilter.apply(column));
+ Assert.assertTrue(columnsFilter.apply(metricName));
+ }
+
+ @Test
+ public void testReaderColumnsFilterWithNoMetricGiven()
+ {
+ String datasource = "foo";
+ Interval interval = Intervals.of("2000/2001");
+ String column = "c1";
+ String metricName = "m1";
+ ColumnsFilter originalColumnsFilter = ColumnsFilter.inclusionBased(ImmutableSet.of(column));
+ InputRowSchema inputRowSchema = new InputRowSchema(
+ new TimestampSpec("timestamp", "auto", null),
+ new DimensionsSpec(
+ DimensionsSpec.getDefaultSchemas(Arrays.asList("timestamp", "a", "b"))
+ ),
+ originalColumnsFilter,
+ ImmutableSet.of()
+ );
+ DruidInputSource druidInputSource = new DruidInputSource(
+ datasource,
+ interval,
+ null,
+ null,
+ ImmutableList.of("a"),
+ ImmutableList.of("b"),
+ indexIO,
+ coordinatorClient,
+ segmentCacheManagerFactory,
+ retryPolicyFactory,
+ taskConfig
+ );
+ InputRowSchema inputSourceReader = druidInputSource.getInputRowSchemaToUse(inputRowSchema);
+ ColumnsFilter columnsFilter = inputSourceReader.getColumnsFilter();
+ Assert.assertTrue(columnsFilter.apply(column));
+ Assert.assertFalse(columnsFilter.apply(metricName));
+ }
}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/input/InputRowSchemasTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/input/InputRowSchemasTest.java
index ae6ed8816d..991a5950f9 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/input/InputRowSchemasTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/input/InputRowSchemasTest.java
@@ -23,18 +23,28 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.common.config.NullHandlingTest;
import org.apache.druid.data.input.ColumnsFilter;
+import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.DoubleDimensionSchema;
+import org.apache.druid.data.input.impl.FloatDimensionSchema;
+import org.apache.druid.data.input.impl.LongDimensionSchema;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.filter.SelectorDimFilter;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.transform.ExpressionTransform;
import org.apache.druid.segment.transform.TransformSpec;
import org.junit.Assert;
import org.junit.Test;
+import java.util.Arrays;
+
public class InputRowSchemasTest extends NullHandlingTest
{
@Test
@@ -98,4 +108,65 @@ public class InputRowSchemasTest extends NullHandlingTest
columnsFilter
);
}
+
+ @Test
+ public void testFromDataSchema()
+ {
+ TimestampSpec timestampSpec = new TimestampSpec(null, null, null);
+ DimensionsSpec dimensionsSpec = new DimensionsSpec(
+ Arrays.asList(
+ new StringDimensionSchema("d1"),
+ new StringDimensionSchema("d2"),
+ new LongDimensionSchema("d3"),
+ new FloatDimensionSchema("d4"),
+ new DoubleDimensionSchema("d5")
+ )
+ );
+ DataSchema schema = new DataSchema(
+ "dataSourceName",
+ new TimestampSpec(null, null, null),
+ dimensionsSpec,
+ new AggregatorFactory[]{
+ new CountAggregatorFactory("count"),
+ new LongSumAggregatorFactory("met", "met")
+ },
+ new UniformGranularitySpec(Granularities.MINUTE, Granularities.NONE, null),
+ null
+ );
+
+ InputRowSchema inputRowSchema = InputRowSchemas.fromDataSchema(schema);
+ Assert.assertEquals(timestampSpec, inputRowSchema.getTimestampSpec());
+ Assert.assertEquals(dimensionsSpec.getDimensions(), inputRowSchema.getDimensionsSpec().getDimensions());
+ Assert.assertEquals(dimensionsSpec.getDimensionNames(), inputRowSchema.getDimensionsSpec().getDimensionNames());
+ Assert.assertEquals(ImmutableSet.of("count", "met"), inputRowSchema.getMetricNames());
+ }
+
+ @Test
+ public void testFromDataSchemaWithNoAggregator()
+ {
+ TimestampSpec timestampSpec = new TimestampSpec(null, null, null);
+ DimensionsSpec dimensionsSpec = new DimensionsSpec(
+ Arrays.asList(
+ new StringDimensionSchema("d1"),
+ new StringDimensionSchema("d2"),
+ new LongDimensionSchema("d3"),
+ new FloatDimensionSchema("d4"),
+ new DoubleDimensionSchema("d5")
+ )
+ );
+ DataSchema schema = new DataSchema(
+ "dataSourceName",
+ new TimestampSpec(null, null, null),
+ dimensionsSpec,
+ new AggregatorFactory[]{},
+ new UniformGranularitySpec(Granularities.MINUTE, Granularities.NONE, null),
+ null
+ );
+
+ InputRowSchema inputRowSchema = InputRowSchemas.fromDataSchema(schema);
+ Assert.assertEquals(timestampSpec, inputRowSchema.getTimestampSpec());
+ Assert.assertEquals(dimensionsSpec.getDimensions(), inputRowSchema.getDimensionsSpec().getDimensions());
+ Assert.assertEquals(dimensionsSpec.getDimensionNames(), inputRowSchema.getDimensionsSpec().getDimensionNames());
+ Assert.assertEquals(ImmutableSet.of(), inputRowSchema.getMetricNames());
+ }
}
diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml
index fb378b766e..71e6add527 100644
--- a/integration-tests/pom.xml
+++ b/integration-tests/pom.xml
@@ -432,6 +432,11 @@
<version>${aws.sdk.version}</version>
<scope>runtime</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.datasketches</groupId>
+ <artifactId>datasketches-java</artifactId>
+ <scope>runtime</scope>
+ </dependency>
</dependencies>
<build>
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/CompactionUtil.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/CompactionUtil.java
index 463c4b2c0b..accaec0146 100644
--- a/integration-tests/src/main/java/org/apache/druid/testing/utils/CompactionUtil.java
+++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/CompactionUtil.java
@@ -53,6 +53,7 @@ public class CompactionUtil
null,
null,
null,
+ null,
new MaxSizeSplitHintSpec(null, 1),
new DynamicPartitionsSpec(maxRowsPerSegment, null),
null,
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
index 21e34ca8da..83e0ac9109 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
@@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import org.apache.commons.io.IOUtils;
+import org.apache.datasketches.hll.TgtHllType;
import org.apache.druid.data.input.MaxSizeSplitHintSpec;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.indexer.TaskState;
@@ -41,8 +42,12 @@ import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
+import org.apache.druid.query.aggregation.datasketches.hll.HllSketchBuildAggregatorFactory;
+import org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchAggregatorFactory;
+import org.apache.druid.query.aggregation.datasketches.theta.SketchMergeAggregatorFactory;
import org.apache.druid.query.filter.SelectorDimFilter;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
@@ -90,7 +95,10 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
private static final String INDEX_TASK_WITH_GRANULARITY_SPEC = "/indexer/wikipedia_index_task_with_granularity_spec.json";
private static final String INDEX_TASK_WITH_DIMENSION_SPEC = "/indexer/wikipedia_index_task_with_dimension_spec.json";
private static final String INDEX_ROLLUP_QUERIES_RESOURCE = "/indexer/wikipedia_index_rollup_queries.json";
+ private static final String INDEX_ROLLUP_SKETCH_QUERIES_RESOURCE = "/indexer/wikipedia_index_sketch_queries.json";
private static final String INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_index_queries.json";
+ private static final String INDEX_TASK_WITH_ROLLUP_FOR_PRESERVE_METRICS = "/indexer/wikipedia_index_rollup_preserve_metric.json";
+ private static final String INDEX_TASK_WITHOUT_ROLLUP_FOR_PRESERVE_METRICS = "/indexer/wikipedia_index_no_rollup_preserve_metric.json";
private static final int MAX_ROWS_PER_SEGMENT_COMPACTED = 10000;
private static final Period NO_SKIP_OFFSET = Period.seconds(0);
@@ -110,6 +118,226 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
fullDatasourceName = "wikipedia_index_test_" + UUID.randomUUID() + config.getExtraDatasourceNameSuffix();
}
+ @Test
+ public void testAutoCompactionRowWithMetricAndRowWithoutMetricShouldPreserveExistingMetrics() throws Exception
+ {
+ // added = null, count = 2, sum_added = 62, quantilesDoublesSketch = 2, thetaSketch = 2, HLLSketchBuild = 2
+ loadData(INDEX_TASK_WITH_ROLLUP_FOR_PRESERVE_METRICS);
+ // added = 31, count = null, sum_added = null, quantilesDoublesSketch = null, thetaSketch = null, HLLSketchBuild = null
+ loadData(INDEX_TASK_WITHOUT_ROLLUP_FOR_PRESERVE_METRICS);
+ try (final Closeable ignored = unloader(fullDatasourceName)) {
+ final List<String> intervalsBeforeCompaction = coordinator.getSegmentIntervals(fullDatasourceName);
+ intervalsBeforeCompaction.sort(null);
+ // 2 segments across 1 days...
+ verifySegmentsCount(2);
+ ArrayList<Object> nullList = new ArrayList<Object>();
+ nullList.add(null);
+ Map<String, Object> queryAndResultFields = ImmutableMap.of(
+ "%%FIELD_TO_QUERY%%", "added",
+ "%%EXPECTED_COUNT_RESULT%%", 2,
+ "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(nullList)), ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(31))))
+ );
+ verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
+ queryAndResultFields = ImmutableMap.of(
+ "%%FIELD_TO_QUERY%%", "count",
+ "%%EXPECTED_COUNT_RESULT%%", 2,
+ "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(2))), ImmutableMap.of("events", ImmutableList.of(nullList)))
+ );
+ verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
+ queryAndResultFields = ImmutableMap.of(
+ "%%FIELD_TO_QUERY%%", "sum_added",
+ "%%EXPECTED_COUNT_RESULT%%", 2,
+ "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(62))), ImmutableMap.of("events", ImmutableList.of(nullList)))
+ );
+ verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
+ queryAndResultFields = ImmutableMap.of(
+ "%%QUANTILESRESULT%%", 2,
+ "%%THETARESULT%%", 2.0,
+ "%%HLLRESULT%%", 2
+ );
+ verifyQuery(INDEX_ROLLUP_SKETCH_QUERIES_RESOURCE, queryAndResultFields);
+
+ submitCompactionConfig(
+ MAX_ROWS_PER_SEGMENT_COMPACTED,
+ NO_SKIP_OFFSET,
+ new UserCompactionTaskGranularityConfig(null, null, true),
+ new UserCompactionTaskDimensionsConfig(DimensionsSpec.getDefaultSchemas(ImmutableList.of("language"))),
+ null,
+ new AggregatorFactory[]{
+ new CountAggregatorFactory("count"),
+ new LongSumAggregatorFactory("sum_added", "added"),
+ new SketchMergeAggregatorFactory("thetaSketch", "user", 16384, true, false, null),
+ new HllSketchBuildAggregatorFactory("HLLSketchBuild", "user", 12, TgtHllType.HLL_4.name(), false),
+ new DoublesSketchAggregatorFactory("quantilesDoublesSketch", "delta", 128, 1000000000L)
+ },
+ false
+ );
+ // should now only have 1 row after compaction
+ // added = null, count = 3, sum_added = 93
+ forceTriggerAutoCompaction(1);
+
+ queryAndResultFields = ImmutableMap.of(
+ "%%FIELD_TO_QUERY%%", "added",
+ "%%EXPECTED_COUNT_RESULT%%", 1,
+ "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(nullList)))
+ );
+ verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
+ queryAndResultFields = ImmutableMap.of(
+ "%%FIELD_TO_QUERY%%", "count",
+ "%%EXPECTED_COUNT_RESULT%%", 1,
+ "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(3))))
+ );
+ verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
+ queryAndResultFields = ImmutableMap.of(
+ "%%FIELD_TO_QUERY%%", "sum_added",
+ "%%EXPECTED_COUNT_RESULT%%", 1,
+ "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(93))))
+ );
+ verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
+ queryAndResultFields = ImmutableMap.of(
+ "%%QUANTILESRESULT%%", 3,
+ "%%THETARESULT%%", 3.0,
+ "%%HLLRESULT%%", 3
+ );
+ verifyQuery(INDEX_ROLLUP_SKETCH_QUERIES_RESOURCE, queryAndResultFields);
+
+ verifySegmentsCompacted(1, MAX_ROWS_PER_SEGMENT_COMPACTED);
+ checkCompactionIntervals(intervalsBeforeCompaction);
+
+ List<TaskResponseObject> compactTasksBefore = indexer.getCompleteTasksForDataSource(fullDatasourceName);
+ // Verify rollup segments does not get compacted again
+ forceTriggerAutoCompaction(1);
+ List<TaskResponseObject> compactTasksAfter = indexer.getCompleteTasksForDataSource(fullDatasourceName);
+ Assert.assertEquals(compactTasksAfter.size(), compactTasksBefore.size());
+ }
+ }
+
+ @Test
+ public void testAutoCompactionOnlyRowsWithoutMetricShouldAddNewMetrics() throws Exception
+ {
+ // added = 31, count = null, sum_added = null
+ loadData(INDEX_TASK_WITHOUT_ROLLUP_FOR_PRESERVE_METRICS);
+ // added = 31, count = null, sum_added = null
+ loadData(INDEX_TASK_WITHOUT_ROLLUP_FOR_PRESERVE_METRICS);
+ try (final Closeable ignored = unloader(fullDatasourceName)) {
+ final List<String> intervalsBeforeCompaction = coordinator.getSegmentIntervals(fullDatasourceName);
+ intervalsBeforeCompaction.sort(null);
+ // 2 segments across 1 days...
+ verifySegmentsCount(2);
+ ArrayList<Object> nullList = new ArrayList<Object>();
+ nullList.add(null);
+ Map<String, Object> queryAndResultFields = ImmutableMap.of(
+ "%%FIELD_TO_QUERY%%", "added",
+ "%%EXPECTED_COUNT_RESULT%%", 2,
+ "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(31))), ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(31))))
+ );
+ verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
+
+ submitCompactionConfig(
+ MAX_ROWS_PER_SEGMENT_COMPACTED,
+ NO_SKIP_OFFSET,
+ new UserCompactionTaskGranularityConfig(null, null, true),
+ new UserCompactionTaskDimensionsConfig(DimensionsSpec.getDefaultSchemas(ImmutableList.of("language"))),
+ null,
+ new AggregatorFactory[] {new CountAggregatorFactory("count"), new LongSumAggregatorFactory("sum_added", "added")},
+ false
+ );
+ // should now only have 1 row after compaction
+ // added = null, count = 2, sum_added = 62
+ forceTriggerAutoCompaction(1);
+
+ queryAndResultFields = ImmutableMap.of(
+ "%%FIELD_TO_QUERY%%", "added",
+ "%%EXPECTED_COUNT_RESULT%%", 1,
+ "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(nullList)))
+ );
+ verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
+ queryAndResultFields = ImmutableMap.of(
+ "%%FIELD_TO_QUERY%%", "count",
+ "%%EXPECTED_COUNT_RESULT%%", 1,
+ "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(2))))
+ );
+ verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
+ queryAndResultFields = ImmutableMap.of(
+ "%%FIELD_TO_QUERY%%", "sum_added",
+ "%%EXPECTED_COUNT_RESULT%%", 1,
+ "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(62))))
+ );
+ verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
+
+ verifySegmentsCompacted(1, MAX_ROWS_PER_SEGMENT_COMPACTED);
+ checkCompactionIntervals(intervalsBeforeCompaction);
+
+ List<TaskResponseObject> compactTasksBefore = indexer.getCompleteTasksForDataSource(fullDatasourceName);
+ // Verify rollup segments does not get compacted again
+ forceTriggerAutoCompaction(1);
+ List<TaskResponseObject> compactTasksAfter = indexer.getCompleteTasksForDataSource(fullDatasourceName);
+ Assert.assertEquals(compactTasksAfter.size(), compactTasksBefore.size());
+ }
+ }
+
+ @Test
+ public void testAutoCompactionOnlyRowsWithMetricShouldPreserveExistingMetrics() throws Exception
+ {
+ // added = null, count = 2, sum_added = 62
+ loadData(INDEX_TASK_WITH_ROLLUP_FOR_PRESERVE_METRICS);
+ // added = null, count = 2, sum_added = 62
+ loadData(INDEX_TASK_WITH_ROLLUP_FOR_PRESERVE_METRICS);
+ try (final Closeable ignored = unloader(fullDatasourceName)) {
+ final List<String> intervalsBeforeCompaction = coordinator.getSegmentIntervals(fullDatasourceName);
+ intervalsBeforeCompaction.sort(null);
+ // 2 segments across 1 days...
+ verifySegmentsCount(2);
+ Map<String, Object> queryAndResultFields = ImmutableMap.of(
+ "%%FIELD_TO_QUERY%%", "count",
+ "%%EXPECTED_COUNT_RESULT%%", 2,
+ "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(2))), ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(2))))
+ );
+ verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
+ queryAndResultFields = ImmutableMap.of(
+ "%%FIELD_TO_QUERY%%", "sum_added",
+ "%%EXPECTED_COUNT_RESULT%%", 2,
+ "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(62))), ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(62))))
+ );
+ verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
+
+ submitCompactionConfig(
+ MAX_ROWS_PER_SEGMENT_COMPACTED,
+ NO_SKIP_OFFSET,
+ new UserCompactionTaskGranularityConfig(null, null, true),
+ new UserCompactionTaskDimensionsConfig(DimensionsSpec.getDefaultSchemas(ImmutableList.of("language"))),
+ null,
+ new AggregatorFactory[] {new CountAggregatorFactory("count"), new LongSumAggregatorFactory("sum_added", "added")},
+ false
+ );
+ // should now only have 1 row after compaction
+ // added = null, count = 4, sum_added = 124
+ forceTriggerAutoCompaction(1);
+
+ queryAndResultFields = ImmutableMap.of(
+ "%%FIELD_TO_QUERY%%", "count",
+ "%%EXPECTED_COUNT_RESULT%%", 1,
+ "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(4))))
+ );
+ verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
+ queryAndResultFields = ImmutableMap.of(
+ "%%FIELD_TO_QUERY%%", "sum_added",
+ "%%EXPECTED_COUNT_RESULT%%", 1,
+ "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(124))))
+ );
+ verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
+
+ verifySegmentsCompacted(1, MAX_ROWS_PER_SEGMENT_COMPACTED);
+ checkCompactionIntervals(intervalsBeforeCompaction);
+
+ List<TaskResponseObject> compactTasksBefore = indexer.getCompleteTasksForDataSource(fullDatasourceName);
+ // Verify rollup segments does not get compacted again
+ forceTriggerAutoCompaction(1);
+ List<TaskResponseObject> compactTasksAfter = indexer.getCompleteTasksForDataSource(fullDatasourceName);
+ Assert.assertEquals(compactTasksAfter.size(), compactTasksBefore.size());
+ }
+ }
+
@Test
public void testAutoCompactionDutySubmitAndVerifyCompaction() throws Exception
{
@@ -646,12 +874,12 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
Map<String, Object> specs = ImmutableMap.of("%%GRANULARITYSPEC%%", new UniformGranularitySpec(Granularities.MONTH, Granularities.DAY, false, ImmutableList.of(new Interval("2013-08-31/2013-09-02", chrono))));
loadData(INDEX_TASK_WITH_GRANULARITY_SPEC, specs);
try (final Closeable ignored = unloader(fullDatasourceName)) {
- Map<String, Object> expectedResult = ImmutableMap.of(
+ Map<String, Object> queryAndResultFields = ImmutableMap.of(
"%%FIELD_TO_QUERY%%", "added",
"%%EXPECTED_COUNT_RESULT%%", 2,
"%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(57.0), ImmutableList.of(459.0))))
);
- verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult);
+ verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
submitCompactionConfig(
MAX_ROWS_PER_SEGMENT_COMPACTED,
NO_SKIP_OFFSET,
@@ -667,12 +895,12 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
// does not have data on every week on the month
forceTriggerAutoCompaction(3);
// Make sure that no data is lost after compaction
- expectedResult = ImmutableMap.of(
+ queryAndResultFields = ImmutableMap.of(
"%%FIELD_TO_QUERY%%", "added",
"%%EXPECTED_COUNT_RESULT%%", 2,
"%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(57.0), ImmutableList.of(459.0))))
);
- verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult);
+ verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
verifySegmentsCompacted(1, MAX_ROWS_PER_SEGMENT_COMPACTED);
List<TaskResponseObject> tasks = indexer.getCompleteTasksForDataSource(fullDatasourceName);
TaskResponseObject compactTask = null;
@@ -696,12 +924,12 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
Map<String, Object> specs = ImmutableMap.of("%%GRANULARITYSPEC%%", new UniformGranularitySpec(Granularities.WEEK, Granularities.DAY, false, ImmutableList.of(new Interval("2013-08-31/2013-09-02", chrono))));
loadData(INDEX_TASK_WITH_GRANULARITY_SPEC, specs);
try (final Closeable ignored = unloader(fullDatasourceName)) {
- Map<String, Object> expectedResult = ImmutableMap.of(
+ Map<String, Object> queryAndResultFields = ImmutableMap.of(
"%%FIELD_TO_QUERY%%", "added",
"%%EXPECTED_COUNT_RESULT%%", 2,
"%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(57.0), ImmutableList.of(459.0))))
);
- verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult);
+ verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
submitCompactionConfig(
MAX_ROWS_PER_SEGMENT_COMPACTED,
NO_SKIP_OFFSET,
@@ -714,12 +942,12 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
// we expect the compaction task's interval to align with the MONTH segmentGranularity (2013-08-01 to 2013-10-01)
forceTriggerAutoCompaction(2);
// Make sure that no data is lost after compaction
- expectedResult = ImmutableMap.of(
+ queryAndResultFields = ImmutableMap.of(
"%%FIELD_TO_QUERY%%", "added",
"%%EXPECTED_COUNT_RESULT%%", 2,
"%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(57.0), ImmutableList.of(459.0))))
);
- verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult);
+ verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
verifySegmentsCompacted(2, MAX_ROWS_PER_SEGMENT_COMPACTED);
List<TaskResponseObject> tasks = indexer.getCompleteTasksForDataSource(fullDatasourceName);
TaskResponseObject compactTask = null;
@@ -742,12 +970,12 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
Map<String, Object> specs = ImmutableMap.of("%%GRANULARITYSPEC%%", new UniformGranularitySpec(Granularities.DAY, Granularities.DAY, false, ImmutableList.of(new Interval("2013-08-31/2013-09-02", chrono))));
loadData(INDEX_TASK_WITH_GRANULARITY_SPEC, specs);
try (final Closeable ignored = unloader(fullDatasourceName)) {
- Map<String, Object> expectedResult = ImmutableMap.of(
+ Map<String, Object> queryAndResultFields = ImmutableMap.of(
"%%FIELD_TO_QUERY%%", "added",
"%%EXPECTED_COUNT_RESULT%%", 2,
"%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(57.0), ImmutableList.of(459.0))))
);
- verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult);
+ verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
submitCompactionConfig(
MAX_ROWS_PER_SEGMENT_COMPACTED,
NO_SKIP_OFFSET,
@@ -755,12 +983,12 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
false
);
forceTriggerAutoCompaction(2);
- expectedResult = ImmutableMap.of(
+ queryAndResultFields = ImmutableMap.of(
"%%FIELD_TO_QUERY%%", "added",
"%%EXPECTED_COUNT_RESULT%%", 1,
"%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(516.0))))
);
- verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult);
+ verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
verifySegmentsCompacted(2, MAX_ROWS_PER_SEGMENT_COMPACTED);
List<TaskResponseObject> compactTasksBefore = indexer.getCompleteTasksForDataSource(fullDatasourceName);
@@ -778,12 +1006,12 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
Map<String, Object> specs = ImmutableMap.of("%%GRANULARITYSPEC%%", new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, true, ImmutableList.of(new Interval("2013-08-31/2013-09-02", chrono))));
loadData(INDEX_TASK_WITH_GRANULARITY_SPEC, specs);
try (final Closeable ignored = unloader(fullDatasourceName)) {
- Map<String, Object> expectedResult = ImmutableMap.of(
+ Map<String, Object> queryAndResultFields = ImmutableMap.of(
"%%FIELD_TO_QUERY%%", "added",
"%%EXPECTED_COUNT_RESULT%%", 2,
"%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(57.0), ImmutableList.of(459.0))))
);
- verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult);
+ verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
submitCompactionConfig(
MAX_ROWS_PER_SEGMENT_COMPACTED,
NO_SKIP_OFFSET,
@@ -791,12 +1019,12 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
false
);
forceTriggerAutoCompaction(2);
- expectedResult = ImmutableMap.of(
+ queryAndResultFields = ImmutableMap.of(
"%%FIELD_TO_QUERY%%", "added",
"%%EXPECTED_COUNT_RESULT%%", 1,
"%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(516.0))))
);
- verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult);
+ verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
verifySegmentsCompacted(2, MAX_ROWS_PER_SEGMENT_COMPACTED);
List<TaskResponseObject> compactTasksBefore = indexer.getCompleteTasksForDataSource(fullDatasourceName);
@@ -820,12 +1048,12 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
verifySegmentsCount(4);
// Result is not rollup
- Map<String, Object> expectedResult = ImmutableMap.of(
+ Map<String, Object> queryAndResultFields = ImmutableMap.of(
"%%FIELD_TO_QUERY%%", "added",
"%%EXPECTED_COUNT_RESULT%%", 2,
"%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(57.0), ImmutableList.of(459.0))))
);
- verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult);
+ verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
// Compact and change dimension to only "language"
submitCompactionConfig(
@@ -840,12 +1068,12 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
forceTriggerAutoCompaction(2);
// Result should rollup on language dimension
- expectedResult = ImmutableMap.of(
+ queryAndResultFields = ImmutableMap.of(
"%%FIELD_TO_QUERY%%", "added",
"%%EXPECTED_COUNT_RESULT%%", 1,
"%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(516.0))))
);
- verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult);
+ verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
verifySegmentsCompacted(2, MAX_ROWS_PER_SEGMENT_COMPACTED);
List<TaskResponseObject> compactTasksBefore = indexer.getCompleteTasksForDataSource(fullDatasourceName);
@@ -868,12 +1096,12 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
// Result is not rollup
// For dim "page", result has values "Gypsy Danger" and "Striker Eureka"
- Map<String, Object> expectedResult = ImmutableMap.of(
+ Map<String, Object> queryAndResultFields = ImmutableMap.of(
"%%FIELD_TO_QUERY%%", "added",
"%%EXPECTED_COUNT_RESULT%%", 2,
"%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(57.0), ImmutableList.of(459.0))))
);
- verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult);
+ verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
// Compact and filter with selector on dim "page" and value "Striker Eureka"
submitCompactionConfig(
@@ -888,12 +1116,12 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
forceTriggerAutoCompaction(2);
// For dim "page", result should only contain value "Striker Eureka"
- expectedResult = ImmutableMap.of(
+ queryAndResultFields = ImmutableMap.of(
"%%FIELD_TO_QUERY%%", "added",
"%%EXPECTED_COUNT_RESULT%%", 1,
"%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(459.0))))
);
- verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult);
+ verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
verifySegmentsCompacted(2, MAX_ROWS_PER_SEGMENT_COMPACTED);
List<TaskResponseObject> compactTasksBefore = indexer.getCompleteTasksForDataSource(fullDatasourceName);
@@ -915,12 +1143,12 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
verifySegmentsCount(4);
// For dim "page", result has values "Gypsy Danger" and "Striker Eureka"
- Map<String, Object> expectedResult = ImmutableMap.of(
+ Map<String, Object> queryAndResultFields = ImmutableMap.of(
"%%FIELD_TO_QUERY%%", "added",
"%%EXPECTED_COUNT_RESULT%%", 2,
"%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(57.0), ImmutableList.of(459.0))))
);
- verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult);
+ verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
// Compact and add longSum and doubleSum metrics
submitCompactionConfig(
@@ -936,19 +1164,19 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
// Result should be the same with the addition of new metrics, "double_sum_added" and "long_sum_added".
// These new metrics should have the same value as the input field "added"
- expectedResult = ImmutableMap.of(
+ queryAndResultFields = ImmutableMap.of(
"%%FIELD_TO_QUERY%%", "double_sum_added",
"%%EXPECTED_COUNT_RESULT%%", 2,
"%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(57.0), ImmutableList.of(459.0))))
);
- verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult);
+ verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
- expectedResult = ImmutableMap.of(
+ queryAndResultFields = ImmutableMap.of(
"%%FIELD_TO_QUERY%%", "long_sum_added",
"%%EXPECTED_COUNT_RESULT%%", 2,
"%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(57), ImmutableList.of(459))))
);
- verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult);
+ verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
verifySegmentsCompacted(2, MAX_ROWS_PER_SEGMENT_COMPACTED);
@@ -976,12 +1204,12 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
// Result is not rollup
// For dim "page", result has values "Gypsy Danger" and "Striker Eureka"
- Map<String, Object> expectedResult = ImmutableMap.of(
+ Map<String, Object> queryAndResultFields = ImmutableMap.of(
"%%FIELD_TO_QUERY%%", "added",
"%%EXPECTED_COUNT_RESULT%%", 2,
"%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(57.0), ImmutableList.of(459.0))))
);
- verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult);
+ verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
submitCompactionConfig(
MAX_ROWS_PER_SEGMENT_COMPACTED,
@@ -994,11 +1222,11 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
);
// Compact the MONTH segment
forceTriggerAutoCompaction(2);
- verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult);
+ verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
// Compact the WEEK segment
forceTriggerAutoCompaction(2);
- verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult);
+ verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
// Verify all task succeed
List<TaskResponseObject> compactTasksBefore = indexer.getCompleteTasksForDataSource(fullDatasourceName);
@@ -1133,6 +1361,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
null,
null,
null,
+ null,
new MaxSizeSplitHintSpec(null, 1),
partitionsSpec,
null,
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionUpgradeTest.java b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionUpgradeTest.java
index 7b7eefac6c..7c5afb1ffd 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionUpgradeTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionUpgradeTest.java
@@ -81,6 +81,7 @@ public class ITAutoCompactionUpgradeTest extends AbstractIndexerTest
null,
null,
null,
+ null,
new MaxSizeSplitHintSpec(null, 1),
newPartitionsSpec,
null,
diff --git a/integration-tests/src/test/resources/indexer/wikipedia_index_no_rollup_preserve_metric.json b/integration-tests/src/test/resources/indexer/wikipedia_index_no_rollup_preserve_metric.json
new file mode 100644
index 0000000000..75e6b663a9
--- /dev/null
+++ b/integration-tests/src/test/resources/indexer/wikipedia_index_no_rollup_preserve_metric.json
@@ -0,0 +1,76 @@
+{
+ "type": "index_parallel",
+ "spec": {
+ "ioConfig": {
+ "type": "index_parallel",
+ "inputSource": {
+ "type": "inline",
+ "data": "{\"isRobot\":true,\"language\":\"en\",\"timestamp\":\"2013-08-31T00:00:11.080Z\",\"flags\":\"NB\",\"isUnpatrolled\":false,\"page\":\"Salo Toraut\",\"diffUrl\":\"https://sv.wikipedia.org/w/index.php?oldid=36099284&rcid=89369918\",\"added\":31,\"comment\":\"Botskapande Indonesien omdirigering\",\"commentLength\":35,\"isNew\":true,\"isMinor\":false,\"delta\":31,\"isAnonymous\":false,\"user\":\"maytas3\",\"deltaBucket\":0.0,\"deleted\":0,\"namespace\":\"Main\"}\n"
+ },
+ "inputFormat": {
+ "type": "json"
+ },
+ "appendToExisting": true
+ },
+ "tuningConfig": {
+ "type": "index_parallel",
+ "partitionsSpec": {
+ "type": "dynamic"
+ }
+ },
+ "dataSchema": {
+ "dataSource": "%%DATASOURCE%%",
+ "timestampSpec": {
+ "column": "timestamp",
+ "format": "iso"
+ },
+ "dimensionsSpec": {
+ "dimensions": [
+ "isRobot",
+ "language",
+ "flags",
+ "isUnpatrolled",
+ "page",
+ "diffUrl",
+ {
+ "type": "long",
+ "name": "added"
+ },
+ "comment",
+ {
+ "type": "long",
+ "name": "commentLength"
+ },
+ "isNew",
+ "isMinor",
+ {
+ "type": "long",
+ "name": "delta"
+ },
+ "isAnonymous",
+ "user",
+ {
+ "type": "long",
+ "name": "deltaBucket"
+ },
+ {
+ "type": "long",
+ "name": "deleted"
+ },
+ "namespace",
+ "cityName",
+ "countryName",
+ "regionIsoCode",
+ "metroCode",
+ "countryIsoCode",
+ "regionName"
+ ]
+ },
+ "granularitySpec": {
+ "queryGranularity": "hour",
+ "rollup": false,
+ "segmentGranularity": "day"
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/integration-tests/src/test/resources/indexer/wikipedia_index_rollup_preserve_metric.json b/integration-tests/src/test/resources/indexer/wikipedia_index_rollup_preserve_metric.json
new file mode 100644
index 0000000000..dacedf0ef1
--- /dev/null
+++ b/integration-tests/src/test/resources/indexer/wikipedia_index_rollup_preserve_metric.json
@@ -0,0 +1,95 @@
+{
+ "type": "index_parallel",
+ "spec": {
+ "ioConfig": {
+ "type": "index_parallel",
+ "inputSource": {
+ "type": "inline",
+ "data": "{\"isRobot\":true,\"language\":\"en\",\"timestamp\":\"2013-08-31T00:00:11.080Z\",\"flags\":\"NB\",\"isUnpatrolled\":false,\"page\":\"Salo Toraut\",\"diffUrl\":\"https://sv.wikipedia.org/w/index.php?oldid=36099284&rcid=89369918\",\"added\":31,\"comment\":\"Botskapande Indonesien omdirigering\",\"commentLength\":35,\"isNew\":true,\"isMinor\":false,\"delta\":31,\"isAnonymous\":false,\"user\":\"maytas1\",\"deltaBucket\":0.0,\"deleted\":0,\"namespace\":\"Main\"}\n{\"isRobot\" [...]
+ },
+ "inputFormat": {
+ "type": "json"
+ },
+ "appendToExisting": true
+ },
+ "dataSchema": {
+ "granularitySpec": {
+ "segmentGranularity": "day",
+ "queryGranularity": "hour",
+ "rollup": true
+ },
+ "dataSource": "%%DATASOURCE%%",
+ "timestampSpec": {
+ "column": "timestamp",
+ "format": "iso"
+ },
+ "dimensionsSpec": {
+ "dimensions": [
+ "isRobot",
+ "language",
+ "flags",
+ "isUnpatrolled",
+ "page",
+ "diffUrl",
+ "comment",
+ "isNew",
+ "isMinor",
+ "isAnonymous",
+ "namespace"
+ ]
+ },
+ "metricsSpec": [
+ {
+ "name": "count",
+ "type": "count"
+ },
+ {
+ "name": "sum_added",
+ "type": "longSum",
+ "fieldName": "added"
+ },
+ {
+ "name": "sum_commentLength",
+ "type": "longSum",
+ "fieldName": "commentLength"
+ },
+ {
+ "name": "sum_delta",
+ "type": "longSum",
+ "fieldName": "delta"
+ },
+ {
+ "name": "sum_deltaBucket",
+ "type": "longSum",
+ "fieldName": "deltaBucket"
+ },
+ {
+ "name": "sum_deleted",
+ "type": "longSum",
+ "fieldName": "deleted"
+ },
+ {
+ "name": "thetaSketch",
+ "type": "thetaSketch",
+ "fieldName": "user"
+ },
+ {
+ "name": "quantilesDoublesSketch",
+ "type": "quantilesDoublesSketch",
+ "fieldName": "delta"
+ },
+ {
+ "name": "HLLSketchBuild",
+ "type": "HLLSketchBuild",
+ "fieldName": "user"
+ }
+ ]
+ },
+ "tuningConfig": {
+ "type": "index_parallel",
+ "partitionsSpec": {
+ "type": "dynamic"
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/integration-tests/src/test/resources/indexer/wikipedia_index_sketch_queries.json b/integration-tests/src/test/resources/indexer/wikipedia_index_sketch_queries.json
new file mode 100644
index 0000000000..2b7a0625bc
--- /dev/null
+++ b/integration-tests/src/test/resources/indexer/wikipedia_index_sketch_queries.json
@@ -0,0 +1,49 @@
+[
+ {
+ "description": "timeseries, datasketch aggs, all",
+ "query":{
+ "queryType" : "timeseries",
+ "dataSource": "%%DATASOURCE%%",
+ "granularity":"day",
+ "intervals":[
+ "2013-08-31T00:00/2013-09-01T00:00"
+ ],
+ "filter":null,
+ "aggregations":[
+ {
+ "type": "HLLSketchMerge",
+ "name": "approxCountHLL",
+ "fieldName": "HLLSketchBuild",
+ "lgK": 12,
+ "tgtHllType": "HLL_4",
+ "round": true
+ },
+ {
+ "type":"thetaSketch",
+ "name":"approxCountTheta",
+ "fieldName":"thetaSketch",
+ "size":16384,
+ "shouldFinalize":true,
+ "isInputThetaSketch":false,
+ "errorBoundsStdDev":null
+ },
+ {
+ "type":"quantilesDoublesSketch",
+ "name":"quantilesSketch",
+ "fieldName":"quantilesDoublesSketch",
+ "k":128
+ }
+ ]
+ },
+ "expectedResults":[
+ {
+ "timestamp" : "2013-08-31T00:00:00.000Z",
+ "result" : {
+ "quantilesSketch":%%QUANTILESRESULT%%,
+ "approxCountTheta":%%THETARESULT%%,
+ "approxCountHLL":%%HLLRESULT%%
+ }
+ }
+ ]
+ }
+]
\ No newline at end of file
diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexBuilder.java b/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexBuilder.java
index 5260669f8b..1269fe1e6b 100644
--- a/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexBuilder.java
+++ b/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexBuilder.java
@@ -34,6 +34,12 @@ public abstract class AppendableIndexBuilder
protected boolean sortFacts = true;
protected int maxRowCount = 0;
protected long maxBytesInMemory = 0;
+ // When set to true, for any row that already has metric (with the same name defined in metricSpec),
+ // the metric aggregator in metricSpec is skipped and the existing metric is unchanged. If the row does not already have
+ // the metric, then the metric aggregator is applied on the source column as usual. This should only be set for
+ // DruidInputSource since that is the only case where we can have existing metrics.
+ // This is currently only use by auto compaction and should not be use for anything else.
+ protected boolean preserveExistingMetrics = false;
protected boolean useMaxMemoryEstimates = true;
protected final Logger log = new Logger(this.getClass());
@@ -56,7 +62,7 @@ public abstract class AppendableIndexBuilder
@VisibleForTesting
public AppendableIndexBuilder setSimpleTestingIndexSchema(final AggregatorFactory... metrics)
{
- return setSimpleTestingIndexSchema(null, metrics);
+ return setSimpleTestingIndexSchema(null, null, metrics);
}
@@ -70,10 +76,15 @@ public abstract class AppendableIndexBuilder
* @return this
*/
@VisibleForTesting
- public AppendableIndexBuilder setSimpleTestingIndexSchema(@Nullable Boolean rollup, final AggregatorFactory... metrics)
+ public AppendableIndexBuilder setSimpleTestingIndexSchema(
+ @Nullable Boolean rollup,
+ @Nullable Boolean preserveExistingMetrics,
+ final AggregatorFactory... metrics
+ )
{
IncrementalIndexSchema.Builder builder = new IncrementalIndexSchema.Builder().withMetrics(metrics);
this.incrementalIndexSchema = rollup != null ? builder.withRollup(rollup).build() : builder.build();
+ this.preserveExistingMetrics = preserveExistingMetrics != null ? preserveExistingMetrics : false;
return this;
}
@@ -107,6 +118,12 @@ public abstract class AppendableIndexBuilder
return this;
}
+ public AppendableIndexBuilder setPreserveExistingMetrics(final boolean preserveExistingMetrics)
+ {
+ this.preserveExistingMetrics = preserveExistingMetrics;
+ return this;
+ }
+
public AppendableIndexBuilder setUseMaxMemoryEstimates(final boolean useMaxMemoryEstimates)
{
this.useMaxMemoryEstimates = useMaxMemoryEstimates;
diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
index b36ae6872e..c2e36cade3 100644
--- a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
+++ b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
@@ -227,6 +227,7 @@ public abstract class IncrementalIndex extends AbstractIndex implements Iterable
private final AggregatorFactory[] metrics;
private final boolean deserializeComplexMetrics;
private final Metadata metadata;
+ protected final boolean preserveExistingMetrics;
private final Map<String, MetricDesc> metricDescs;
@@ -257,12 +258,20 @@ public abstract class IncrementalIndex extends AbstractIndex implements Iterable
* @param deserializeComplexMetrics flag whether or not to call ComplexMetricExtractor.extractValue() on the input
* value for aggregators that return metrics other than float.
* @param concurrentEventAdd flag whether ot not adding of input rows should be thread-safe
+ * @param preserveExistingMetrics When set to true, for any row that already has metric
+ * (with the same name defined in metricSpec), the metric aggregator in metricSpec
+ * is skipped and the existing metric is unchanged. If the row does not already have
+ * the metric, then the metric aggregator is applied on the source column as usual.
+ * This should only be set for DruidInputSource since that is the only case where we
+ * can have existing metrics. This is currently only use by auto compaction and
+ * should not be use for anything else.
* @param useMaxMemoryEstimates true if max values should be used to estimate memory
*/
protected IncrementalIndex(
final IncrementalIndexSchema incrementalIndexSchema,
final boolean deserializeComplexMetrics,
final boolean concurrentEventAdd,
+ final boolean preserveExistingMetrics,
final boolean useMaxMemoryEstimates
)
{
@@ -273,6 +282,7 @@ public abstract class IncrementalIndex extends AbstractIndex implements Iterable
this.metrics = incrementalIndexSchema.getMetrics();
this.rowTransformers = new CopyOnWriteArrayList<>();
this.deserializeComplexMetrics = deserializeComplexMetrics;
+ this.preserveExistingMetrics = preserveExistingMetrics;
this.useMaxMemoryEstimates = useMaxMemoryEstimates;
this.timeAndMetricsColumnCapabilities = new HashMap<>();
diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
index 493180db45..a28a0a620f 100644
--- a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
+++ b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
@@ -19,6 +19,8 @@
package org.apache.druid.segment.incremental;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Supplier;
import com.google.common.collect.Iterators;
import com.google.common.collect.Maps;
@@ -52,6 +54,7 @@ import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
/**
*
@@ -109,6 +112,8 @@ public class OnheapIncrementalIndex extends IncrementalIndex
@Nullable
private volatile Map<String, ColumnSelectorFactory> selectors;
@Nullable
+ private volatile Map<String, ColumnSelectorFactory> combiningAggSelectors;
+ @Nullable
private String outOfRowsReason = null;
OnheapIncrementalIndex(
@@ -118,10 +123,13 @@ public class OnheapIncrementalIndex extends IncrementalIndex
boolean sortFacts,
int maxRowCount,
long maxBytesInMemory,
+ // preserveExistingMetrics should only be set true for DruidInputSource since that is the only case where we can have existing metrics
+ // This is currently only use by auto compaction and should not be use for anything else.
+ boolean preserveExistingMetrics,
boolean useMaxMemoryEstimates
)
{
- super(incrementalIndexSchema, deserializeComplexMetrics, concurrentEventAdd, useMaxMemoryEstimates);
+ super(incrementalIndexSchema, deserializeComplexMetrics, concurrentEventAdd, preserveExistingMetrics, useMaxMemoryEstimates);
this.maxRowCount = maxRowCount;
this.maxBytesInMemory = maxBytesInMemory == 0 ? Long.MAX_VALUE : maxBytesInMemory;
this.facts = incrementalIndexSchema.isRollup() ? new RollupFactsHolder(sortFacts, dimsComparator(), getDimensions())
@@ -182,6 +190,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex
)
{
selectors = new HashMap<>();
+ combiningAggSelectors = new HashMap<>();
for (AggregatorFactory agg : metrics) {
selectors.put(
agg.getName(),
@@ -190,6 +199,16 @@ public class OnheapIncrementalIndex extends IncrementalIndex
concurrentEventAdd
)
);
+ if (preserveExistingMetrics) {
+ AggregatorFactory combiningAgg = agg.getCombiningFactory();
+ combiningAggSelectors.put(
+ combiningAgg.getName(),
+ new CachingColumnSelectorFactory(
+ makeColumnSelectorFactory(combiningAgg, rowSupplier, deserializeComplexMetrics),
+ concurrentEventAdd
+ )
+ );
+ }
}
}
@@ -214,7 +233,11 @@ public class OnheapIncrementalIndex extends IncrementalIndex
long aggSizeDelta = doAggregate(metrics, aggs, rowContainer, row, parseExceptionMessages);
totalSizeInBytes.addAndGet(useMaxMemoryEstimates ? 0 : aggSizeDelta);
} else {
- aggs = new Aggregator[metrics.length];
+ if (preserveExistingMetrics) {
+ aggs = new Aggregator[metrics.length * 2];
+ } else {
+ aggs = new Aggregator[metrics.length];
+ }
long aggSizeForRow = factorizeAggs(metrics, aggs, rowContainer, row);
aggSizeForRow += doAggregate(metrics, aggs, rowContainer, row, parseExceptionMessages);
@@ -279,23 +302,33 @@ public class OnheapIncrementalIndex extends IncrementalIndex
{
long totalInitialSizeBytes = 0L;
rowContainer.set(row);
-
final long aggReferenceSize = Long.BYTES;
for (int i = 0; i < metrics.length; i++) {
final AggregatorFactory agg = metrics[i];
-
+ // Creates aggregators to aggregate from input into output fields
if (useMaxMemoryEstimates) {
aggs[i] = agg.factorize(selectors.get(agg.getName()));
} else {
- AggregatorAndSize aggregatorAndSize =
- agg.factorizeWithSize(selectors.get(agg.getName()));
+ AggregatorAndSize aggregatorAndSize = agg.factorizeWithSize(selectors.get(agg.getName()));
aggs[i] = aggregatorAndSize.getAggregator();
totalInitialSizeBytes += aggregatorAndSize.getInitialSizeBytes();
totalInitialSizeBytes += aggReferenceSize;
}
+ // Creates aggregators to combine already aggregated field
+ if (preserveExistingMetrics) {
+ if (useMaxMemoryEstimates) {
+ AggregatorFactory combiningAgg = agg.getCombiningFactory();
+ aggs[i + metrics.length] = combiningAgg.factorize(combiningAggSelectors.get(combiningAgg.getName()));
+ } else {
+ AggregatorFactory combiningAgg = agg.getCombiningFactory();
+ AggregatorAndSize aggregatorAndSize = combiningAgg.factorizeWithSize(combiningAggSelectors.get(combiningAgg.getName()));
+ aggs[i + metrics.length] = aggregatorAndSize.getAggregator();
+ totalInitialSizeBytes += aggregatorAndSize.getInitialSizeBytes();
+ totalInitialSizeBytes += aggReferenceSize;
+ }
+ }
}
rowContainer.set(null);
-
return totalInitialSizeBytes;
}
@@ -315,10 +348,14 @@ public class OnheapIncrementalIndex extends IncrementalIndex
)
{
rowContainer.set(row);
-
long totalIncrementalBytes = 0L;
- for (int i = 0; i < aggs.length; i++) {
- final Aggregator agg = aggs[i];
+ for (int i = 0; i < metrics.length; i++) {
+ final Aggregator agg;
+ if (preserveExistingMetrics && row instanceof MapBasedRow && ((MapBasedRow) row).getEvent().containsKey(metrics[i].getName())) {
+ agg = aggs[i + metrics.length];
+ } else {
+ agg = aggs[i];
+ }
synchronized (agg) {
try {
if (useMaxMemoryEstimates) {
@@ -329,8 +366,13 @@ public class OnheapIncrementalIndex extends IncrementalIndex
}
catch (ParseException e) {
// "aggregate" can throw ParseExceptions if a selector expects something but gets something else.
- log.debug(e, "Encountered parse error, skipping aggregator[%s].", metrics[i].getName());
- parseExceptionsHolder.add(e.getMessage());
+ if (preserveExistingMetrics) {
+ log.warn(e, "Failing ingestion as preserveExistingMetrics is enabled but selector of aggregator[%s] recieved incompatible type.", metrics[i].getName());
+ throw e;
+ } else {
+ log.debug(e, "Encountered parse error, skipping aggregator[%s].", metrics[i].getName());
+ parseExceptionsHolder.add(e.getMessage());
+ }
}
}
}
@@ -410,31 +452,35 @@ public class OnheapIncrementalIndex extends IncrementalIndex
@Override
public float getMetricFloatValue(int rowOffset, int aggOffset)
{
- return concurrentGet(rowOffset)[aggOffset].getFloat();
+ return getMetricHelper(getMetricAggs(), concurrentGet(rowOffset), aggOffset, Aggregator::getFloat);
}
@Override
public long getMetricLongValue(int rowOffset, int aggOffset)
{
- return concurrentGet(rowOffset)[aggOffset].getLong();
+ return getMetricHelper(getMetricAggs(), concurrentGet(rowOffset), aggOffset, Aggregator::getLong);
}
@Override
public Object getMetricObjectValue(int rowOffset, int aggOffset)
{
- return concurrentGet(rowOffset)[aggOffset].get();
+ return getMetricHelper(getMetricAggs(), concurrentGet(rowOffset), aggOffset, Aggregator::get);
}
@Override
protected double getMetricDoubleValue(int rowOffset, int aggOffset)
{
- return concurrentGet(rowOffset)[aggOffset].getDouble();
+ return getMetricHelper(getMetricAggs(), concurrentGet(rowOffset), aggOffset, Aggregator::getDouble);
}
@Override
public boolean isNull(int rowOffset, int aggOffset)
{
- return concurrentGet(rowOffset)[aggOffset].isNull();
+ if (preserveExistingMetrics) {
+ return concurrentGet(rowOffset)[aggOffset].isNull() && concurrentGet(rowOffset)[aggOffset + getMetricAggs().length].isNull();
+ } else {
+ return concurrentGet(rowOffset)[aggOffset].isNull();
+ }
}
@Override
@@ -475,8 +521,9 @@ public class OnheapIncrementalIndex extends IncrementalIndex
}
Aggregator[] aggs = getAggsForRow(rowOffset);
- for (int i = 0; i < aggs.length; ++i) {
- theVals.put(metrics[i].getName(), aggs[i].get());
+ int aggLength = preserveExistingMetrics ? aggs.length / 2 : aggs.length;
+ for (int i = 0; i < aggLength; ++i) {
+ theVals.put(metrics[i].getName(), getMetricHelper(metrics, aggs, i, Aggregator::get));
}
if (postAggs != null) {
@@ -492,6 +539,40 @@ public class OnheapIncrementalIndex extends IncrementalIndex
}
}
+ /**
+ * Apply the getMetricTypeFunction function to the retrieve aggregated value given the list of aggregators and offset.
+ * If preserveExistingMetrics flag is set, then this method will combine values from two aggregators, the aggregator
+ * for aggregating from input into output field and the aggregator for combining already aggregated field, as needed
+ */
+ private <T> T getMetricHelper(AggregatorFactory[] metrics, Aggregator[] aggs, int aggOffset, Function<Aggregator, T> getMetricTypeFunction)
+ {
+ if (preserveExistingMetrics) {
+ // Since the preserveExistingMetrics flag is set, we will have to check and possibly retrieve the aggregated values
+ // from two aggregators, the aggregator for aggregating from input into output field and the aggregator
+ // for combining already aggregated field
+ if (aggs[aggOffset].isNull()) {
+ // If the aggregator for aggregating from input into output field is null, then we get the value from the
+ // aggregator that we use for combining already aggregated field
+ return getMetricTypeFunction.apply(aggs[aggOffset + metrics.length]);
+ } else if (aggs[aggOffset + metrics.length].isNull()) {
+ // If the aggregator for combining already aggregated field is null, then we get the value from the
+ // aggregator for aggregating from input into output field
+ return getMetricTypeFunction.apply(aggs[aggOffset]);
+ } else {
+ // Since both aggregators is not null and contain values, we will have to retrieve the values from both
+ // aggregators and combine them
+ AggregatorFactory aggregatorFactory = metrics[aggOffset];
+ T aggregatedFromSource = getMetricTypeFunction.apply(aggs[aggOffset]);
+ T aggregatedFromCombined = getMetricTypeFunction.apply(aggs[aggOffset + metrics.length]);
+ return (T) aggregatorFactory.combine(aggregatedFromSource, aggregatedFromCombined);
+ }
+ } else {
+ // If preserveExistingMetrics flag is not set then we simply get metrics from the list of Aggregator, aggs, using the
+ // given aggOffset
+ return getMetricTypeFunction.apply(aggs[aggOffset]);
+ }
+ }
+
/**
* Clear out maps to allow GC
* NOTE: This is NOT thread-safe with add... so make sure all the adding is DONE before closing
@@ -506,6 +587,9 @@ public class OnheapIncrementalIndex extends IncrementalIndex
if (selectors != null) {
selectors.clear();
}
+ if (combiningAggSelectors != null) {
+ combiningAggSelectors.clear();
+ }
}
/**
@@ -571,6 +655,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex
sortFacts,
maxRowCount,
maxBytesInMemory,
+ preserveExistingMetrics,
useMaxMemoryEstimates
);
}
@@ -578,12 +663,39 @@ public class OnheapIncrementalIndex extends IncrementalIndex
public static class Spec implements AppendableIndexSpec
{
+ private static final boolean DEFAULT_PRESERVE_EXISTING_METRICS = false;
public static final String TYPE = "onheap";
+ // When set to true, for any row that already has metric (with the same name defined in metricSpec),
+ // the metric aggregator in metricSpec is skipped and the existing metric is unchanged. If the row does not already have
+ // the metric, then the metric aggregator is applied on the source column as usual. This should only be set for
+ // DruidInputSource since that is the only case where we can have existing metrics.
+ // This is currently only use by auto compaction and should not be use for anything else.
+ final boolean preserveExistingMetrics;
+
+ public Spec()
+ {
+ this.preserveExistingMetrics = DEFAULT_PRESERVE_EXISTING_METRICS;
+ }
+
+ @JsonCreator
+ public Spec(
+ final @JsonProperty("preserveExistingMetrics") @Nullable Boolean preserveExistingMetrics
+ )
+ {
+ this.preserveExistingMetrics = preserveExistingMetrics != null ? preserveExistingMetrics : DEFAULT_PRESERVE_EXISTING_METRICS;
+ }
+
+ @JsonProperty
+ public boolean isPreserveExistingMetrics()
+ {
+ return preserveExistingMetrics;
+ }
+
@Override
public AppendableIndexBuilder builder()
{
- return new Builder();
+ return new Builder().setPreserveExistingMetrics(preserveExistingMetrics);
}
@Override
@@ -596,15 +708,22 @@ public class OnheapIncrementalIndex extends IncrementalIndex
}
@Override
- public boolean equals(Object that)
+ public boolean equals(Object o)
{
- return that.getClass().equals(this.getClass());
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ Spec spec = (Spec) o;
+ return preserveExistingMetrics == spec.preserveExistingMetrics;
}
@Override
public int hashCode()
{
- return Objects.hash(this.getClass());
+ return Objects.hash(preserveExistingMetrics);
}
}
}
diff --git a/processing/src/test/java/org/apache/druid/segment/data/IncrementalIndexTest.java b/processing/src/test/java/org/apache/druid/segment/data/IncrementalIndexTest.java
index 646f1d66bf..3257ec6d49 100644
--- a/processing/src/test/java/org/apache/druid/segment/data/IncrementalIndexTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/data/IncrementalIndexTest.java
@@ -29,6 +29,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.Row;
import org.apache.druid.data.input.impl.DimensionsSpec;
@@ -37,6 +38,7 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Accumulator;
import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.query.Druids;
import org.apache.druid.query.FinalizeResultsQueryRunner;
import org.apache.druid.query.QueryPlus;
@@ -70,6 +72,7 @@ import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -92,23 +95,30 @@ import java.util.concurrent.atomic.AtomicInteger;
public class IncrementalIndexTest extends InitializedNullHandlingTest
{
public final IncrementalIndexCreator indexCreator;
+ private final boolean isPreserveExistingMetrics;
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
@Rule
public final CloserRule closer = new CloserRule(false);
- public IncrementalIndexTest(String indexType, String mode) throws JsonProcessingException
+ public IncrementalIndexTest(String indexType, String mode, boolean isPreserveExistingMetrics) throws JsonProcessingException
{
+ this.isPreserveExistingMetrics = isPreserveExistingMetrics;
indexCreator = closer.closeLater(new IncrementalIndexCreator(indexType, (builder, args) -> builder
- .setSimpleTestingIndexSchema("rollup".equals(mode), (AggregatorFactory[]) args[0])
+ .setSimpleTestingIndexSchema("rollup".equals(mode), isPreserveExistingMetrics, (AggregatorFactory[]) args[0])
.setMaxRowCount(1_000_000)
.build()
));
}
- @Parameterized.Parameters(name = "{index}: {0}, {1}")
+ @Parameterized.Parameters(name = "{index}: {0}, {1}, {2}")
public static Collection<?> constructorFeeder()
{
- return IncrementalIndexCreator.indexTypeCartesianProduct(ImmutableList.of("rollup", "plain"));
+ return IncrementalIndexCreator.indexTypeCartesianProduct(
+ ImmutableList.of("rollup", "plain"),
+ ImmutableList.of(true, false)
+ );
}
public static AggregatorFactory[] getDefaultCombiningAggregatorFactories()
@@ -155,7 +165,7 @@ public class IncrementalIndexTest extends InitializedNullHandlingTest
}
return new OnheapIncrementalIndex.Builder()
- .setSimpleTestingIndexSchema(false, aggregatorFactories)
+ .setSimpleTestingIndexSchema(false, false, aggregatorFactories)
.setMaxRowCount(1000000)
.build();
}
@@ -721,4 +731,269 @@ public class IncrementalIndexTest extends InitializedNullHandlingTest
Assert.assertEquals(2, index.size());
}
+
+ @Test
+ public void testSchemaRollupWithRowWithExistingMetricsAndWithoutMetric() throws IndexSizeExceededException
+ {
+ AggregatorFactory[] aggregatorFactories = new AggregatorFactory[]{
+ new CountAggregatorFactory("count"),
+ new LongSumAggregatorFactory("sum_of_x", "x")
+ };
+ final IncrementalIndex index = indexCreator.createIndex((Object) aggregatorFactories);
+ index.add(
+ new MapBasedInputRow(
+ 1481871600000L,
+ Arrays.asList("name", "host"),
+ ImmutableMap.of("name", "name1", "host", "host", "x", 2)
+ )
+ );
+ index.add(
+ new MapBasedInputRow(
+ 1481871600000L,
+ Arrays.asList("name", "host"),
+ ImmutableMap.of("name", "name1", "host", "host", "x", 3)
+ )
+ );
+ index.add(
+ new MapBasedInputRow(
+ 1481871600000L,
+ Arrays.asList("name", "host"),
+ ImmutableMap.of("name", "name1", "host", "host", "count", 2, "sum_of_x", 4)
+ )
+ );
+ index.add(
+ new MapBasedInputRow(
+ 1481871600000L,
+ Arrays.asList("name", "host"),
+ ImmutableMap.of("name", "name1", "host", "host", "count", 3, "sum_of_x", 5)
+ )
+ );
+
+ Assert.assertEquals(index.isRollup() ? 1 : 4, index.size());
+ Iterator<Row> iterator = index.iterator();
+ int rowCount = 0;
+ while (iterator.hasNext()) {
+ rowCount++;
+ Row row = iterator.next();
+ Assert.assertEquals(1481871600000L, row.getTimestampFromEpoch());
+ if (index.isRollup()) {
+ // All rows are rollup into one row
+ Assert.assertEquals(isPreserveExistingMetrics ? 7 : 4, row.getMetric("count").intValue());
+ Assert.assertEquals(isPreserveExistingMetrics ? 14 : 5, row.getMetric("sum_of_x").intValue());
+ } else {
+ // We still have 4 rows
+ if (rowCount == 1 || rowCount == 2) {
+ Assert.assertEquals(1, row.getMetric("count").intValue());
+ Assert.assertEquals(1 + rowCount, row.getMetric("sum_of_x").intValue());
+ } else {
+ if (isPreserveExistingMetrics) {
+ Assert.assertEquals(rowCount - 1, row.getMetric("count").intValue());
+ Assert.assertEquals(1 + rowCount, row.getMetric("sum_of_x").intValue());
+ } else {
+ Assert.assertEquals(1, row.getMetric("count").intValue());
+ // The rows does not have the dim "x", hence metric is null (useDefaultValueForNull=false) or 0 (useDefaultValueForNull=true)
+ Assert.assertEquals(NullHandling.sqlCompatible() ? null : 0L, row.getMetric("sum_of_x"));
+ }
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testSchemaRollupWithRowWithOnlyExistingMetrics() throws IndexSizeExceededException
+ {
+ AggregatorFactory[] aggregatorFactories = new AggregatorFactory[]{
+ new CountAggregatorFactory("count"),
+ new LongSumAggregatorFactory("sum_of_x", "x")
+ };
+ final IncrementalIndex index = indexCreator.createIndex((Object) aggregatorFactories);
+ index.add(
+ new MapBasedInputRow(
+ 1481871600000L,
+ Arrays.asList("name", "host"),
+ ImmutableMap.of("name", "name1", "host", "host", "count", 2, "sum_of_x", 4)
+ )
+ );
+ index.add(
+ new MapBasedInputRow(
+ 1481871600000L,
+ Arrays.asList("name", "host"),
+ ImmutableMap.of("name", "name1", "host", "host", "count", 3, "x", 3, "sum_of_x", 5)
+ )
+ );
+
+ Assert.assertEquals(index.isRollup() ? 1 : 2, index.size());
+ Iterator<Row> iterator = index.iterator();
+ int rowCount = 0;
+ while (iterator.hasNext()) {
+ rowCount++;
+ Row row = iterator.next();
+ Assert.assertEquals(1481871600000L, row.getTimestampFromEpoch());
+ if (index.isRollup()) {
+ // All rows are rollup into one row
+ Assert.assertEquals(isPreserveExistingMetrics ? 5 : 2, row.getMetric("count").intValue());
+ Assert.assertEquals(isPreserveExistingMetrics ? 9 : 3, row.getMetric("sum_of_x").intValue());
+ } else {
+ // We still have 2 rows
+ if (rowCount == 1) {
+ if (isPreserveExistingMetrics) {
+ Assert.assertEquals(2, row.getMetric("count").intValue());
+ Assert.assertEquals(4, row.getMetric("sum_of_x").intValue());
+ } else {
+ Assert.assertEquals(1, row.getMetric("count").intValue());
+ // The rows does not have the dim "x", hence metric is null (useDefaultValueForNull=false) or 0 (useDefaultValueForNull=true)
+ Assert.assertEquals(NullHandling.sqlCompatible() ? null : 0L, row.getMetric("sum_of_x"));
+ }
+ } else {
+ Assert.assertEquals(isPreserveExistingMetrics ? 3 : 1, row.getMetric("count").intValue());
+ Assert.assertEquals(isPreserveExistingMetrics ? 5 : 3, row.getMetric("sum_of_x").intValue());
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testSchemaRollupWithRowsWithNoMetrics() throws IndexSizeExceededException
+ {
+ AggregatorFactory[] aggregatorFactories = new AggregatorFactory[]{
+ new CountAggregatorFactory("count"),
+ new LongSumAggregatorFactory("sum_of_x", "x")
+ };
+ final IncrementalIndex index = indexCreator.createIndex((Object) aggregatorFactories);
+ index.add(
+ new MapBasedInputRow(
+ 1481871600000L,
+ Arrays.asList("name", "host"),
+ ImmutableMap.of("name", "name1", "host", "host", "x", 4)
+ )
+ );
+ index.add(
+ new MapBasedInputRow(
+ 1481871600000L,
+ Arrays.asList("name", "host"),
+ ImmutableMap.of("name", "name1", "host", "host", "x", 3)
+ )
+ );
+
+ Assert.assertEquals(index.isRollup() ? 1 : 2, index.size());
+ Iterator<Row> iterator = index.iterator();
+ int rowCount = 0;
+ while (iterator.hasNext()) {
+ rowCount++;
+ Row row = iterator.next();
+ Assert.assertEquals(1481871600000L, row.getTimestampFromEpoch());
+ if (index.isRollup()) {
+ // All rows are rollup into one row
+ Assert.assertEquals(2, row.getMetric("count").intValue());
+ Assert.assertEquals(7, row.getMetric("sum_of_x").intValue());
+ } else {
+ // We still have 2 rows
+ if (rowCount == 1) {
+ Assert.assertEquals(1, row.getMetric("count").intValue());
+ Assert.assertEquals(4, row.getMetric("sum_of_x").intValue());
+ } else {
+ Assert.assertEquals(1, row.getMetric("count").intValue());
+ Assert.assertEquals(3, row.getMetric("sum_of_x").intValue());
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testSchemaRollupWithRowWithMixedTypeMetrics() throws IndexSizeExceededException
+ {
+ if (isPreserveExistingMetrics) {
+ expectedException.expect(ParseException.class);
+ }
+ AggregatorFactory[] aggregatorFactories = new AggregatorFactory[]{
+ new CountAggregatorFactory("count"),
+ new LongSumAggregatorFactory("sum_of_x", "x")
+ };
+ final IncrementalIndex index = indexCreator.createIndex((Object) aggregatorFactories);
+ index.add(
+ new MapBasedInputRow(
+ 1481871600000L,
+ Arrays.asList("name", "host"),
+ ImmutableMap.of("name", "name1", "host", "host", "count", "not a number 1", "sum_of_x", 4)
+ )
+ );
+ index.add(
+ new MapBasedInputRow(
+ 1481871600000L,
+ Arrays.asList("name", "host"),
+ ImmutableMap.of("name", "name1", "host", "host", "count", 3, "x", 3, "sum_of_x", "not a number 2")
+ )
+ );
+
+ Assert.assertEquals(index.isRollup() ? 1 : 2, index.size());
+ Iterator<Row> iterator = index.iterator();
+ int rowCount = 0;
+ while (iterator.hasNext()) {
+ rowCount++;
+ Row row = iterator.next();
+ Assert.assertEquals(1481871600000L, row.getTimestampFromEpoch());
+ if (index.isRollup()) {
+ // All rows are rollup into one row
+ Assert.assertEquals(2, row.getMetric("count").intValue());
+ Assert.assertEquals(3, row.getMetric("sum_of_x").intValue());
+ } else {
+ // We still have 2 rows
+ if (rowCount == 1) {
+ Assert.assertEquals(1, row.getMetric("count").intValue());
+ // The rows does not have the dim "x", hence metric is null (useDefaultValueForNull=false) or 0 (useDefaultValueForNull=true)
+ Assert.assertEquals(NullHandling.sqlCompatible() ? null : 0L, row.getMetric("sum_of_x"));
+ } else {
+ Assert.assertEquals(1, row.getMetric("count").intValue());
+ Assert.assertEquals(3, row.getMetric("sum_of_x").intValue());
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testSchemaRollupWithRowsWithNonRolledUpSameColumnName() throws IndexSizeExceededException
+ {
+ AggregatorFactory[] aggregatorFactories = new AggregatorFactory[]{
+ new CountAggregatorFactory("count"),
+ new LongSumAggregatorFactory("sum_of_x", "x")
+ };
+ final IncrementalIndex index = indexCreator.createIndex((Object) aggregatorFactories);
+ index.add(
+ new MapBasedInputRow(
+ 1481871600000L,
+ Arrays.asList("name", "host"),
+ ImmutableMap.of("name", "name1", "sum_of_x", 100, "x", 4)
+ )
+ );
+ index.add(
+ new MapBasedInputRow(
+ 1481871600000L,
+ Arrays.asList("name", "host"),
+ ImmutableMap.of("name", "name1", "sum_of_x", 100, "x", 3)
+ )
+ );
+
+ Assert.assertEquals(index.isRollup() ? 1 : 2, index.size());
+ Iterator<Row> iterator = index.iterator();
+ int rowCount = 0;
+ while (iterator.hasNext()) {
+ rowCount++;
+ Row row = iterator.next();
+ Assert.assertEquals(1481871600000L, row.getTimestampFromEpoch());
+ if (index.isRollup()) {
+ // All rows are rollup into one row
+ Assert.assertEquals(2, row.getMetric("count").intValue());
+ Assert.assertEquals(isPreserveExistingMetrics ? 200 : 7, row.getMetric("sum_of_x").intValue());
+ } else {
+ // We still have 2 rows
+ if (rowCount == 1) {
+ Assert.assertEquals(1, row.getMetric("count").intValue());
+ Assert.assertEquals(isPreserveExistingMetrics ? 100 : 4, row.getMetric("sum_of_x").intValue());
+ } else {
+ Assert.assertEquals(1, row.getMetric("count").intValue());
+ Assert.assertEquals(isPreserveExistingMetrics ? 100 : 3, row.getMetric("sum_of_x").intValue());
+ }
+ }
+ }
+ }
}
diff --git a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexAdapterTest.java b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexAdapterTest.java
index 5200ada8ef..096d5a0aef 100644
--- a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexAdapterTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexAdapterTest.java
@@ -62,7 +62,7 @@ public class IncrementalIndexAdapterTest extends InitializedNullHandlingTest
public IncrementalIndexAdapterTest(String indexType) throws JsonProcessingException
{
indexCreator = closer.closeLater(new IncrementalIndexCreator(indexType, (builder, args) -> builder
- .setSimpleTestingIndexSchema("rollup".equals(args[0]), new CountAggregatorFactory("count"))
+ .setSimpleTestingIndexSchema("rollup".equals(args[0]), null, new CountAggregatorFactory("count"))
.setMaxRowCount(1_000_000)
.build()
));
diff --git a/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java b/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java
index b33d16b07d..02d43e2750 100644
--- a/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java
+++ b/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java
@@ -124,6 +124,7 @@ public class OnheapIncrementalIndexBenchmark extends AbstractBenchmark
sortFacts,
maxRowCount,
maxBytesInMemory,
+ false,
true
);
}
@@ -147,6 +148,7 @@ public class OnheapIncrementalIndexBenchmark extends AbstractBenchmark
true,
maxRowCount,
maxBytesInMemory,
+ false,
true
);
}
diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQueryTuningConfig.java b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQueryTuningConfig.java
index 3e3d215e58..f35257e244 100644
--- a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQueryTuningConfig.java
+++ b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQueryTuningConfig.java
@@ -25,6 +25,8 @@ import org.apache.druid.data.input.SplitHintSpec;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig;
import org.joda.time.Duration;
@@ -72,6 +74,8 @@ public class ClientCompactionTaskQueryTuningConfig
private final Integer maxNumSegmentsToMerge;
@Nullable
private final Integer totalNumMergeTasks;
+ @Nullable
+ private final AppendableIndexSpec appendableIndexSpec;
public static ClientCompactionTaskQueryTuningConfig from(
@Nullable UserCompactionTaskQueryTuningConfig userCompactionTaskQueryTuningConfig,
@@ -81,6 +85,7 @@ public class ClientCompactionTaskQueryTuningConfig
if (userCompactionTaskQueryTuningConfig == null) {
return new ClientCompactionTaskQueryTuningConfig(
maxRowsPerSegment,
+ new OnheapIncrementalIndex.Spec(true),
null,
null,
null,
@@ -100,8 +105,12 @@ public class ClientCompactionTaskQueryTuningConfig
null
);
} else {
+ AppendableIndexSpec appendableIndexSpecToUse = userCompactionTaskQueryTuningConfig.getAppendableIndexSpec() != null
+ ? userCompactionTaskQueryTuningConfig.getAppendableIndexSpec()
+ : new OnheapIncrementalIndex.Spec(true);
return new ClientCompactionTaskQueryTuningConfig(
maxRowsPerSegment,
+ appendableIndexSpecToUse,
userCompactionTaskQueryTuningConfig.getMaxRowsInMemory(),
userCompactionTaskQueryTuningConfig.getMaxBytesInMemory(),
userCompactionTaskQueryTuningConfig.getMaxTotalRows(),
@@ -126,6 +135,7 @@ public class ClientCompactionTaskQueryTuningConfig
@JsonCreator
public ClientCompactionTaskQueryTuningConfig(
@JsonProperty("maxRowsPerSegment") @Deprecated @Nullable Integer maxRowsPerSegment,
+ @JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec,
@JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory,
@JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory,
@JsonProperty("maxTotalRows") @Deprecated @Nullable Long maxTotalRows,
@@ -146,6 +156,7 @@ public class ClientCompactionTaskQueryTuningConfig
)
{
this.maxRowsPerSegment = maxRowsPerSegment;
+ this.appendableIndexSpec = appendableIndexSpec;
this.maxRowsInMemory = maxRowsInMemory;
this.maxBytesInMemory = maxBytesInMemory;
this.maxTotalRows = maxTotalRows;
@@ -306,6 +317,13 @@ public class ClientCompactionTaskQueryTuningConfig
return totalNumMergeTasks;
}
+ @JsonProperty
+ @Nullable
+ public AppendableIndexSpec getAppendableIndexSpec()
+ {
+ return appendableIndexSpec;
+ }
+
@Override
public boolean equals(Object o)
{
@@ -333,7 +351,8 @@ public class ClientCompactionTaskQueryTuningConfig
Objects.equals(chatHandlerTimeout, that.chatHandlerTimeout) &&
Objects.equals(chatHandlerNumRetries, that.chatHandlerNumRetries) &&
Objects.equals(maxNumSegmentsToMerge, that.maxNumSegmentsToMerge) &&
- Objects.equals(totalNumMergeTasks, that.totalNumMergeTasks);
+ Objects.equals(totalNumMergeTasks, that.totalNumMergeTasks) &&
+ Objects.equals(appendableIndexSpec, that.appendableIndexSpec);
}
@Override
@@ -357,7 +376,8 @@ public class ClientCompactionTaskQueryTuningConfig
chatHandlerTimeout,
chatHandlerNumRetries,
maxNumSegmentsToMerge,
- totalNumMergeTasks
+ totalNumMergeTasks,
+ appendableIndexSpec
);
}
@@ -383,6 +403,7 @@ public class ClientCompactionTaskQueryTuningConfig
", chatHandlerNumRetries=" + chatHandlerNumRetries +
", maxNumSegmentsToMerge=" + maxNumSegmentsToMerge +
", totalNumMergeTasks=" + totalNumMergeTasks +
+ ", appendableIndexSpec=" + appendableIndexSpec +
'}';
}
}
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/UserCompactionTaskQueryTuningConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/UserCompactionTaskQueryTuningConfig.java
index 38f559036b..07ab96c343 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/UserCompactionTaskQueryTuningConfig.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/UserCompactionTaskQueryTuningConfig.java
@@ -26,6 +26,7 @@ import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
import org.apache.druid.data.input.SplitHintSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.joda.time.Duration;
@@ -36,6 +37,7 @@ public class UserCompactionTaskQueryTuningConfig extends ClientCompactionTaskQue
@JsonCreator
public UserCompactionTaskQueryTuningConfig(
@JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory,
+ @JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec,
@JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory,
@JsonProperty("maxTotalRows") @Deprecated @Nullable Long maxTotalRows,
@JsonProperty("splitHintSpec") @Nullable SplitHintSpec splitHintSpec,
@@ -56,6 +58,7 @@ public class UserCompactionTaskQueryTuningConfig extends ClientCompactionTaskQue
{
super(
null,
+ appendableIndexSpec,
maxRowsInMemory,
maxBytesInMemory,
maxTotalRows,
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java
index c643da154c..6725dd3428 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java
@@ -36,6 +36,7 @@ import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.data.BitmapSerde.DefaultBitmapSerdeFactory;
import org.apache.druid.segment.data.CompressionFactory.LongEncodingStrategy;
import org.apache.druid.segment.data.CompressionStrategy;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.joda.time.Duration;
@@ -123,6 +124,7 @@ public class DataSourceCompactionConfigTest extends InitializedNullHandlingTest
null,
new Period(3600),
new UserCompactionTaskQueryTuningConfig(
+ null,
null,
null,
10000L,
@@ -170,6 +172,7 @@ public class DataSourceCompactionConfigTest extends InitializedNullHandlingTest
10000,
new Period(3600),
new UserCompactionTaskQueryTuningConfig(
+ null,
null,
null,
10000L,
@@ -213,6 +216,47 @@ public class DataSourceCompactionConfigTest extends InitializedNullHandlingTest
{
final UserCompactionTaskQueryTuningConfig tuningConfig = new UserCompactionTaskQueryTuningConfig(
40000,
+ null,
+ 2000L,
+ null,
+ new SegmentsSplitHintSpec(new HumanReadableBytes(100000L), null),
+ new DynamicPartitionsSpec(1000, 20000L),
+ new IndexSpec(
+ new DefaultBitmapSerdeFactory(),
+ CompressionStrategy.LZ4,
+ CompressionStrategy.LZF,
+ LongEncodingStrategy.LONGS
+ ),
+ new IndexSpec(
+ new DefaultBitmapSerdeFactory(),
+ CompressionStrategy.LZ4,
+ CompressionStrategy.UNCOMPRESSED,
+ LongEncodingStrategy.AUTO
+ ),
+ 2,
+ 1000L,
+ TmpFileSegmentWriteOutMediumFactory.instance(),
+ 100,
+ 5,
+ 1000L,
+ new Duration(3000L),
+ 7,
+ 1000,
+ 100
+ );
+
+ final String json = OBJECT_MAPPER.writeValueAsString(tuningConfig);
+ final UserCompactionTaskQueryTuningConfig fromJson =
+ OBJECT_MAPPER.readValue(json, UserCompactionTaskQueryTuningConfig.class);
+ Assert.assertEquals(tuningConfig, fromJson);
+ }
+
+ @Test
+ public void testSerdeUserCompactionTuningConfigWithAppendableIndexSpec() throws IOException
+ {
+ final UserCompactionTaskQueryTuningConfig tuningConfig = new UserCompactionTaskQueryTuningConfig(
+ 40000,
+ new OnheapIncrementalIndex.Spec(true),
2000L,
null,
new SegmentsSplitHintSpec(new HumanReadableBytes(100000L), null),
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/UserCompactionTaskQueryTuningConfigTest.java b/server/src/test/java/org/apache/druid/server/coordinator/UserCompactionTaskQueryTuningConfigTest.java
index 7aa6540abf..dc52f78422 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/UserCompactionTaskQueryTuningConfigTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/UserCompactionTaskQueryTuningConfigTest.java
@@ -28,6 +28,7 @@ import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.data.BitmapSerde.DefaultBitmapSerdeFactory;
import org.apache.druid.segment.data.CompressionFactory.LongEncodingStrategy;
import org.apache.druid.segment.data.CompressionStrategy;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory;
import org.joda.time.Duration;
import org.junit.Assert;
@@ -60,6 +61,7 @@ public class UserCompactionTaskQueryTuningConfigTest
null,
null,
null,
+ null,
null
);
final String json = OBJECT_MAPPER.writeValueAsString(config);
@@ -75,6 +77,7 @@ public class UserCompactionTaskQueryTuningConfigTest
{
final UserCompactionTaskQueryTuningConfig tuningConfig = new UserCompactionTaskQueryTuningConfig(
40000,
+ new OnheapIncrementalIndex.Spec(true),
2000L,
null,
new SegmentsSplitHintSpec(new HumanReadableBytes(42L), null),
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
index b25ebbaaf1..8cc8fad3f1 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
@@ -722,6 +722,7 @@ public class CompactSegmentsTest
null,
null,
null,
+ null,
partitionsSpec,
null,
null,
@@ -786,6 +787,7 @@ public class CompactSegmentsTest
null,
null,
null,
+ null,
partitionsSpec,
null,
null,
@@ -844,6 +846,7 @@ public class CompactSegmentsTest
null,
null,
null,
+ null,
partitionsSpec,
null,
null,
@@ -902,6 +905,7 @@ public class CompactSegmentsTest
null,
null,
null,
+ null,
partitionsSpec,
null,
null,
@@ -968,6 +972,7 @@ public class CompactSegmentsTest
null,
null,
null,
+ null,
partitionsSpec,
null,
null,
@@ -1029,6 +1034,7 @@ public class CompactSegmentsTest
null,
null,
null,
+ null,
partitionsSpec,
null,
null,
@@ -1089,6 +1095,7 @@ public class CompactSegmentsTest
null,
null,
null,
+ null,
partitionsSpec,
null,
null,
@@ -1190,6 +1197,7 @@ public class CompactSegmentsTest
null,
null,
null,
+ null,
partitionsSpec,
null,
null,
@@ -1315,6 +1323,7 @@ public class CompactSegmentsTest
null,
null,
null,
+ null,
partitionsSpec,
null,
null,
@@ -1376,6 +1385,7 @@ public class CompactSegmentsTest
null,
null,
null,
+ null,
partitionsSpec,
null,
null,
@@ -1441,6 +1451,7 @@ public class CompactSegmentsTest
null,
null,
null,
+ null,
partitionsSpec,
null,
null,
@@ -1587,6 +1598,7 @@ public class CompactSegmentsTest
null,
null,
null,
+ null,
partitionsSpec,
null,
null,
@@ -1683,6 +1695,7 @@ public class CompactSegmentsTest
null,
null,
null,
+ null,
partitionsSpec,
null,
null,
@@ -2003,6 +2016,7 @@ public class CompactSegmentsTest
null,
null,
null,
+ null,
partitionsSpec,
null,
null,
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java
index 3ad596347a..fd7549c629 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java
@@ -120,6 +120,7 @@ public class NewestSegmentFirstIteratorTest
null,
null,
null,
+ null,
new DynamicPartitionsSpec(null, null),
null,
null,
@@ -163,6 +164,7 @@ public class NewestSegmentFirstIteratorTest
null,
null,
null,
+ null,
new DynamicPartitionsSpec(null, 1000L),
null,
null,
@@ -206,6 +208,7 @@ public class NewestSegmentFirstIteratorTest
null,
null,
null,
+ null,
new DynamicPartitionsSpec(100, 1000L),
null,
null,
@@ -245,6 +248,7 @@ public class NewestSegmentFirstIteratorTest
100,
null,
new UserCompactionTaskQueryTuningConfig(
+ null,
null,
null,
1000L,
@@ -292,6 +296,7 @@ public class NewestSegmentFirstIteratorTest
null,
null,
null,
+ null,
new DynamicPartitionsSpec(null, null),
null,
null,
@@ -331,6 +336,7 @@ public class NewestSegmentFirstIteratorTest
null,
null,
new UserCompactionTaskQueryTuningConfig(
+ null,
null,
null,
1000L,
@@ -378,6 +384,7 @@ public class NewestSegmentFirstIteratorTest
null,
null,
null,
+ null,
new HashedPartitionsSpec(null, 10, ImmutableList.of("dim")),
null,
null,
@@ -421,6 +428,7 @@ public class NewestSegmentFirstIteratorTest
null,
null,
null,
+ null,
new SingleDimensionPartitionsSpec(10000, null, "dim", false),
null,
null,
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java
index a29bd9eceb..2026873103 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java
@@ -45,10 +45,12 @@ import org.apache.druid.query.expression.TestExprMacroTable;
import org.apache.druid.query.filter.SelectorDimFilter;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.data.ConciseBitmapSerdeFactory;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskDimensionsConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig;
+import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskTransformConfig;
import org.apache.druid.timeline.CompactionState;
import org.apache.druid.timeline.DataSegment;
@@ -1490,6 +1492,96 @@ public class NewestSegmentFirstPolicyTest
Assert.assertFalse(iterator.hasNext());
}
+ @Test
+ public void testIteratorDoesNotReturnSegmentWithChangingAppendableIndexSpec()
+ {
+ NullHandling.initializeForTests();
+ PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null));
+ final VersionedIntervalTimeline<String, DataSegment> timeline = createTimeline(
+ new SegmentGenerateSpec(
+ Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"),
+ new Period("P1D"),
+ null,
+ new CompactionState(
+ partitionsSpec,
+ null,
+ null,
+ null,
+ mapper.convertValue(new IndexSpec(), new TypeReference<Map<String, Object>>() {}),
+ null
+ )
+ )
+ );
+
+ CompactionSegmentIterator iterator = policy.reset(
+ ImmutableMap.of(DATA_SOURCE, createCompactionConfig(
+ 130000,
+ new Period("P0D"),
+ null,
+ null,
+ null,
+ new UserCompactionTaskQueryTuningConfig(
+ null,
+ new OnheapIncrementalIndex.Spec(true),
+ null,
+ 1000L,
+ null,
+ partitionsSpec,
+ new IndexSpec(),
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null
+ ),
+ null
+ )),
+ ImmutableMap.of(DATA_SOURCE, timeline),
+ Collections.emptyMap()
+ );
+ Assert.assertFalse(iterator.hasNext());
+
+ iterator = policy.reset(
+ ImmutableMap.of(DATA_SOURCE, createCompactionConfig(
+ 130000,
+ new Period("P0D"),
+ null,
+ null,
+ null,
+ new UserCompactionTaskQueryTuningConfig(
+ null,
+ new OnheapIncrementalIndex.Spec(false),
+ null,
+ 1000L,
+ null,
+ partitionsSpec,
+ new IndexSpec(),
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null
+ ),
+ null
+ )),
+ ImmutableMap.of(DATA_SOURCE, timeline),
+ Collections.emptyMap()
+ );
+ Assert.assertFalse(iterator.hasNext());
+ }
+
private static void assertCompactSegmentIntervals(
CompactionSegmentIterator iterator,
Period segmentPeriod,
@@ -1586,7 +1678,19 @@ public class NewestSegmentFirstPolicyTest
UserCompactionTaskGranularityConfig granularitySpec
)
{
- return createCompactionConfig(inputSegmentSizeBytes, skipOffsetFromLatest, granularitySpec, null, null, null);
+ return createCompactionConfig(inputSegmentSizeBytes, skipOffsetFromLatest, granularitySpec, null, null, null, null);
+ }
+
+ private DataSourceCompactionConfig createCompactionConfig(
+ long inputSegmentSizeBytes,
+ Period skipOffsetFromLatest,
+ UserCompactionTaskGranularityConfig granularitySpec,
+ UserCompactionTaskDimensionsConfig dimensionsSpec,
+ UserCompactionTaskTransformConfig transformSpec,
+ AggregatorFactory[] metricsSpec
+ )
+ {
+ return createCompactionConfig(inputSegmentSizeBytes, skipOffsetFromLatest, granularitySpec, dimensionsSpec, transformSpec, null, metricsSpec);
}
private DataSourceCompactionConfig createCompactionConfig(
@@ -1595,6 +1699,7 @@ public class NewestSegmentFirstPolicyTest
UserCompactionTaskGranularityConfig granularitySpec,
UserCompactionTaskDimensionsConfig dimensionsSpec,
UserCompactionTaskTransformConfig transformSpec,
+ UserCompactionTaskQueryTuningConfig tuningConfig,
AggregatorFactory[] metricsSpec
)
{
@@ -1604,7 +1709,7 @@ public class NewestSegmentFirstPolicyTest
inputSegmentSizeBytes,
null,
skipOffsetFromLatest,
- null,
+ tuningConfig,
granularitySpec,
dimensionsSpec,
metricsSpec,
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org