You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ma...@apache.org on 2022/04/15 22:47:54 UTC
[druid] branch master updated: Fix bug in auto compaction preserveExistingMetrics feature (#12438)
This is an automated email from the ASF dual-hosted git repository.
maytasm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new c25a556827 Fix bug in auto compaction preserveExistingMetrics feature (#12438)
c25a556827 is described below
commit c25a5568275c506115914cc5622a829c89f8e384
Author: Maytas Monsereenusorn <ma...@apache.org>
AuthorDate: Fri Apr 15 15:47:47 2022 -0700
Fix bug in auto compaction preserveExistingMetrics feature (#12438)
* fix bug
* fix test
* fix IT
---
.../coordinator/duty/ITAutoCompactionTest.java | 96 ++++++++++++++++
.../incremental/OnheapIncrementalIndex.java | 10 +-
.../druid/segment/data/IncrementalIndexTest.java | 69 ++++++++++++
.../ClientCompactionTaskQueryTuningConfig.java | 7 +-
.../server/coordinator/duty/CompactSegments.java | 2 +-
.../duty/NewestSegmentFirstIterator.java | 2 +-
.../coordinator/duty/CompactSegmentsTest.java | 125 +++++++++++++++++++++
.../duty/NewestSegmentFirstIteratorTest.java | 18 +--
.../duty/NewestSegmentFirstPolicyTest.java | 26 ++---
9 files changed, 323 insertions(+), 32 deletions(-)
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
index 7dd7591615..32d563f25b 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
@@ -45,6 +45,7 @@ import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
+import org.apache.druid.query.aggregation.FloatSumAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.aggregation.datasketches.hll.HllSketchBuildAggregatorFactory;
import org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchAggregatorFactory;
@@ -121,6 +122,101 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
fullDatasourceName = "wikipedia_index_test_" + UUID.randomUUID() + config.getExtraDatasourceNameSuffix();
}
+ @Test
+ public void testAutoCompactionRowWithMetricAndRowWithoutMetricShouldPreserveExistingMetricsUsingAggregatorWithDifferentReturnType() throws Exception
+ {
+ // added = null, count = 2, sum_added = 62, quantilesDoublesSketch = 2, thetaSketch = 2, HLLSketchBuild = 2
+ loadData(INDEX_TASK_WITH_ROLLUP_FOR_PRESERVE_METRICS);
+ // added = 31, count = null, sum_added = null, quantilesDoublesSketch = null, thetaSketch = null, HLLSketchBuild = null
+ loadData(INDEX_TASK_WITHOUT_ROLLUP_FOR_PRESERVE_METRICS);
+ try (final Closeable ignored = unloader(fullDatasourceName)) {
+ final List<String> intervalsBeforeCompaction = coordinator.getSegmentIntervals(fullDatasourceName);
+ intervalsBeforeCompaction.sort(null);
+ // 2 segments across 1 days...
+ verifySegmentsCount(2);
+ ArrayList<Object> nullList = new ArrayList<Object>();
+ nullList.add(null);
+ Map<String, Object> queryAndResultFields = ImmutableMap.of(
+ "%%FIELD_TO_QUERY%%", "added",
+ "%%EXPECTED_COUNT_RESULT%%", 2,
+ "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(nullList)), ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(31))))
+ );
+ verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
+ queryAndResultFields = ImmutableMap.of(
+ "%%FIELD_TO_QUERY%%", "count",
+ "%%EXPECTED_COUNT_RESULT%%", 2,
+ "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(2))), ImmutableMap.of("events", ImmutableList.of(nullList)))
+ );
+ verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
+ queryAndResultFields = ImmutableMap.of(
+ "%%FIELD_TO_QUERY%%", "sum_added",
+ "%%EXPECTED_COUNT_RESULT%%", 2,
+ "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(62))), ImmutableMap.of("events", ImmutableList.of(nullList)))
+ );
+ verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
+ queryAndResultFields = ImmutableMap.of(
+ "%%QUANTILESRESULT%%", 2,
+ "%%THETARESULT%%", 2.0,
+ "%%HLLRESULT%%", 2
+ );
+ verifyQuery(INDEX_ROLLUP_SKETCH_QUERIES_RESOURCE, queryAndResultFields);
+
+ submitCompactionConfig(
+ MAX_ROWS_PER_SEGMENT_COMPACTED,
+ NO_SKIP_OFFSET,
+ new UserCompactionTaskGranularityConfig(null, null, true),
+ new UserCompactionTaskDimensionsConfig(DimensionsSpec.getDefaultSchemas(ImmutableList.of("language"))),
+ null,
+ new AggregatorFactory[]{
+ new CountAggregatorFactory("count"),
+ // FloatSumAggregator combine method takes in two Float but return Double
+ new FloatSumAggregatorFactory("sum_added", "added"),
+ new SketchMergeAggregatorFactory("thetaSketch", "user", 16384, true, false, null),
+ new HllSketchBuildAggregatorFactory("HLLSketchBuild", "user", 12, TgtHllType.HLL_4.name(), false),
+ new DoublesSketchAggregatorFactory("quantilesDoublesSketch", "delta", 128, 1000000000L)
+ },
+ false
+ );
+ // should now only have 1 row after compaction
+ // added = null, count = 3, sum_added = 93.0
+ forceTriggerAutoCompaction(1);
+
+ queryAndResultFields = ImmutableMap.of(
+ "%%FIELD_TO_QUERY%%", "added",
+ "%%EXPECTED_COUNT_RESULT%%", 1,
+ "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(nullList)))
+ );
+ verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
+ queryAndResultFields = ImmutableMap.of(
+ "%%FIELD_TO_QUERY%%", "count",
+ "%%EXPECTED_COUNT_RESULT%%", 1,
+ "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(3))))
+ );
+ verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
+ queryAndResultFields = ImmutableMap.of(
+ "%%FIELD_TO_QUERY%%", "sum_added",
+ "%%EXPECTED_COUNT_RESULT%%", 1,
+ "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(93.0f))))
+ );
+ verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
+ queryAndResultFields = ImmutableMap.of(
+ "%%QUANTILESRESULT%%", 3,
+ "%%THETARESULT%%", 3.0,
+ "%%HLLRESULT%%", 3
+ );
+ verifyQuery(INDEX_ROLLUP_SKETCH_QUERIES_RESOURCE, queryAndResultFields);
+
+ verifySegmentsCompacted(1, MAX_ROWS_PER_SEGMENT_COMPACTED);
+ checkCompactionIntervals(intervalsBeforeCompaction);
+
+ List<TaskResponseObject> compactTasksBefore = indexer.getCompleteTasksForDataSource(fullDatasourceName);
+ // Verify rollup segments does not get compacted again
+ forceTriggerAutoCompaction(1);
+ List<TaskResponseObject> compactTasksAfter = indexer.getCompleteTasksForDataSource(fullDatasourceName);
+ Assert.assertEquals(compactTasksAfter.size(), compactTasksBefore.size());
+ }
+ }
+
@Test
public void testAutoCompactionRowWithMetricAndRowWithoutMetricShouldPreserveExistingMetrics() throws Exception
{
diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
index a28a0a620f..f521c3913b 100644
--- a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
+++ b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
@@ -452,13 +452,13 @@ public class OnheapIncrementalIndex extends IncrementalIndex
@Override
public float getMetricFloatValue(int rowOffset, int aggOffset)
{
- return getMetricHelper(getMetricAggs(), concurrentGet(rowOffset), aggOffset, Aggregator::getFloat);
+ return ((Number) getMetricHelper(getMetricAggs(), concurrentGet(rowOffset), aggOffset, Aggregator::getFloat)).floatValue();
}
@Override
public long getMetricLongValue(int rowOffset, int aggOffset)
{
- return getMetricHelper(getMetricAggs(), concurrentGet(rowOffset), aggOffset, Aggregator::getLong);
+ return ((Number) getMetricHelper(getMetricAggs(), concurrentGet(rowOffset), aggOffset, Aggregator::getLong)).longValue();
}
@Override
@@ -470,7 +470,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex
@Override
protected double getMetricDoubleValue(int rowOffset, int aggOffset)
{
- return getMetricHelper(getMetricAggs(), concurrentGet(rowOffset), aggOffset, Aggregator::getDouble);
+ return ((Number) getMetricHelper(getMetricAggs(), concurrentGet(rowOffset), aggOffset, Aggregator::getDouble)).doubleValue();
}
@Override
@@ -544,7 +544,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex
* If preserveExistingMetrics flag is set, then this method will combine values from two aggregators, the aggregator
* for aggregating from input into output field and the aggregator for combining already aggregated field, as needed
*/
- private <T> T getMetricHelper(AggregatorFactory[] metrics, Aggregator[] aggs, int aggOffset, Function<Aggregator, T> getMetricTypeFunction)
+ private <T> Object getMetricHelper(AggregatorFactory[] metrics, Aggregator[] aggs, int aggOffset, Function<Aggregator, T> getMetricTypeFunction)
{
if (preserveExistingMetrics) {
// Since the preserveExistingMetrics flag is set, we will have to check and possibly retrieve the aggregated values
@@ -564,7 +564,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex
AggregatorFactory aggregatorFactory = metrics[aggOffset];
T aggregatedFromSource = getMetricTypeFunction.apply(aggs[aggOffset]);
T aggregatedFromCombined = getMetricTypeFunction.apply(aggs[aggOffset + metrics.length]);
- return (T) aggregatorFactory.combine(aggregatedFromSource, aggregatedFromCombined);
+ return aggregatorFactory.combine(aggregatedFromSource, aggregatedFromCombined);
}
} else {
// If preserveExistingMetrics flag is not set then we simply get metrics from the list of Aggregator, aggs, using the
diff --git a/processing/src/test/java/org/apache/druid/segment/data/IncrementalIndexTest.java b/processing/src/test/java/org/apache/druid/segment/data/IncrementalIndexTest.java
index 3257ec6d49..77309ff277 100644
--- a/processing/src/test/java/org/apache/druid/segment/data/IncrementalIndexTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/data/IncrementalIndexTest.java
@@ -50,6 +50,7 @@ import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.query.aggregation.FilteredAggregatorFactory;
+import org.apache.druid.query.aggregation.FloatSumAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.filter.BoundDimFilter;
import org.apache.druid.query.filter.SelectorDimFilter;
@@ -799,6 +800,74 @@ public class IncrementalIndexTest extends InitializedNullHandlingTest
}
}
+ @Test
+ public void testSchemaRollupWithRowWithExistingMetricsAndWithoutMetricUsingAggregatorWithDifferentReturnType() throws IndexSizeExceededException
+ {
+ AggregatorFactory[] aggregatorFactories = new AggregatorFactory[]{
+ new CountAggregatorFactory("count"),
+ // FloatSumAggregator combine method takes in two Float but return Double
+ new FloatSumAggregatorFactory("sum_of_x", "x")
+ };
+ final IncrementalIndex index = indexCreator.createIndex((Object) aggregatorFactories);
+ index.add(
+ new MapBasedInputRow(
+ 1481871600000L,
+ Arrays.asList("name", "host"),
+ ImmutableMap.of("name", "name1", "host", "host", "x", 2)
+ )
+ );
+ index.add(
+ new MapBasedInputRow(
+ 1481871600000L,
+ Arrays.asList("name", "host"),
+ ImmutableMap.of("name", "name1", "host", "host", "x", 3)
+ )
+ );
+ index.add(
+ new MapBasedInputRow(
+ 1481871600000L,
+ Arrays.asList("name", "host"),
+ ImmutableMap.of("name", "name1", "host", "host", "count", 2, "sum_of_x", 4)
+ )
+ );
+ index.add(
+ new MapBasedInputRow(
+ 1481871600000L,
+ Arrays.asList("name", "host"),
+ ImmutableMap.of("name", "name1", "host", "host", "count", 3, "sum_of_x", 5)
+ )
+ );
+
+ Assert.assertEquals(index.isRollup() ? 1 : 4, index.size());
+ Iterator<Row> iterator = index.iterator();
+ int rowCount = 0;
+ while (iterator.hasNext()) {
+ rowCount++;
+ Row row = iterator.next();
+ Assert.assertEquals(1481871600000L, row.getTimestampFromEpoch());
+ if (index.isRollup()) {
+ // All rows are rollup into one row
+ Assert.assertEquals(isPreserveExistingMetrics ? 7 : 4, row.getMetric("count").intValue());
+ Assert.assertEquals(isPreserveExistingMetrics ? 14 : 5, row.getMetric("sum_of_x").intValue());
+ } else {
+ // We still have 4 rows
+ if (rowCount == 1 || rowCount == 2) {
+ Assert.assertEquals(1, row.getMetric("count").intValue());
+ Assert.assertEquals(1 + rowCount, row.getMetric("sum_of_x").intValue());
+ } else {
+ if (isPreserveExistingMetrics) {
+ Assert.assertEquals(rowCount - 1, row.getMetric("count").intValue());
+ Assert.assertEquals(1 + rowCount, row.getMetric("sum_of_x").intValue());
+ } else {
+ Assert.assertEquals(1, row.getMetric("count").intValue());
+ // The rows does not have the dim "x", hence metric is null (useDefaultValueForNull=false) or 0 (useDefaultValueForNull=true)
+ Assert.assertEquals(NullHandling.sqlCompatible() ? null : 0.0f, row.getMetric("sum_of_x"));
+ }
+ }
+ }
+ }
+ }
+
@Test
public void testSchemaRollupWithRowWithOnlyExistingMetrics() throws IndexSizeExceededException
{
diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQueryTuningConfig.java b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQueryTuningConfig.java
index f35257e244..374f61c512 100644
--- a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQueryTuningConfig.java
+++ b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQueryTuningConfig.java
@@ -79,13 +79,14 @@ public class ClientCompactionTaskQueryTuningConfig
public static ClientCompactionTaskQueryTuningConfig from(
@Nullable UserCompactionTaskQueryTuningConfig userCompactionTaskQueryTuningConfig,
- @Nullable Integer maxRowsPerSegment
+ @Nullable Integer maxRowsPerSegment,
+ @Nullable Boolean preserveExistingMetrics
)
{
if (userCompactionTaskQueryTuningConfig == null) {
return new ClientCompactionTaskQueryTuningConfig(
maxRowsPerSegment,
- new OnheapIncrementalIndex.Spec(true),
+ new OnheapIncrementalIndex.Spec(preserveExistingMetrics),
null,
null,
null,
@@ -107,7 +108,7 @@ public class ClientCompactionTaskQueryTuningConfig
} else {
AppendableIndexSpec appendableIndexSpecToUse = userCompactionTaskQueryTuningConfig.getAppendableIndexSpec() != null
? userCompactionTaskQueryTuningConfig.getAppendableIndexSpec()
- : new OnheapIncrementalIndex.Spec(true);
+ : new OnheapIncrementalIndex.Spec(preserveExistingMetrics);
return new ClientCompactionTaskQueryTuningConfig(
maxRowsPerSegment,
appendableIndexSpecToUse,
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
index ddae02298b..c809e8168f 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
@@ -452,7 +452,7 @@ public class CompactSegments implements CoordinatorCustomDuty
"coordinator-issued",
segmentsToCompact,
config.getTaskPriority(),
- ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment()),
+ ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), config.getMetricsSpec() != null),
granularitySpec,
dimensionsSpec,
config.getMetricsSpec(),
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java
index 8e2f5f3d2c..4f2f1afca5 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java
@@ -338,7 +338,7 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
{
Preconditions.checkState(!candidates.isEmpty(), "Empty candidates");
final ClientCompactionTaskQueryTuningConfig tuningConfig =
- ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment());
+ ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null);
final PartitionsSpec partitionsSpecFromConfig = findPartitionsSpecFromConfig(tuningConfig);
final CompactionState lastCompactionState = candidates.segments.get(0).getLastCompactionState();
if (lastCompactionState == null) {
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
index 2b91a89abb..08b32feff3 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
@@ -69,6 +69,7 @@ import org.apache.druid.java.util.http.client.response.StringFullResponseHolder;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.filter.SelectorDimFilter;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
@@ -1741,6 +1742,130 @@ public class CompactSegmentsTest
Assert.assertEquals(expected, actual);
}
+ @Test
+ public void testCompactWithMetricsSpecShouldSetPreserveExistingMetricsTrue()
+ {
+ final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class);
+ final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, mockIndexingServiceClient);
+ final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
+ final String dataSource = DATA_SOURCE_PREFIX + 0;
+ compactionConfigs.add(
+ new DataSourceCompactionConfig(
+ dataSource,
+ 0,
+ 500L,
+ null,
+ new Period("PT0H"), // smaller than segment interval
+ new UserCompactionTaskQueryTuningConfig(
+ null,
+ null,
+ null,
+ null,
+ null,
+ partitionsSpec,
+ null,
+ null,
+ null,
+ null,
+ null,
+ 3,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null
+ ),
+ null,
+ null,
+ new AggregatorFactory[] {new CountAggregatorFactory("cnt")},
+ null,
+ null,
+ null
+ )
+ );
+ doCompactSegments(compactSegments, compactionConfigs);
+ ArgumentCaptor<ClientCompactionTaskQueryTuningConfig> clientCompactionTaskQueryTuningConfigArgumentCaptor = ArgumentCaptor.forClass(
+ ClientCompactionTaskQueryTuningConfig.class);
+ Mockito.verify(mockIndexingServiceClient).compactSegments(
+ ArgumentMatchers.anyString(),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.anyInt(),
+ clientCompactionTaskQueryTuningConfigArgumentCaptor.capture(),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any()
+ );
+ Assert.assertNotNull(clientCompactionTaskQueryTuningConfigArgumentCaptor.getValue());
+ Assert.assertNotNull(clientCompactionTaskQueryTuningConfigArgumentCaptor.getValue().getAppendableIndexSpec());
+ Assert.assertTrue(((OnheapIncrementalIndex.Spec) clientCompactionTaskQueryTuningConfigArgumentCaptor.getValue().getAppendableIndexSpec()).isPreserveExistingMetrics());
+ }
+
+ @Test
+ public void testCompactWithoutMetricsSpecShouldSetPreserveExistingMetricsFalse()
+ {
+ final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class);
+ final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, mockIndexingServiceClient);
+ final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
+ final String dataSource = DATA_SOURCE_PREFIX + 0;
+ compactionConfigs.add(
+ new DataSourceCompactionConfig(
+ dataSource,
+ 0,
+ 500L,
+ null,
+ new Period("PT0H"), // smaller than segment interval
+ new UserCompactionTaskQueryTuningConfig(
+ null,
+ null,
+ null,
+ null,
+ null,
+ partitionsSpec,
+ null,
+ null,
+ null,
+ null,
+ null,
+ 3,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null
+ ),
+ null,
+ null,
+ null,
+ null,
+ null,
+ null
+ )
+ );
+ doCompactSegments(compactSegments, compactionConfigs);
+ ArgumentCaptor<ClientCompactionTaskQueryTuningConfig> clientCompactionTaskQueryTuningConfigArgumentCaptor = ArgumentCaptor.forClass(
+ ClientCompactionTaskQueryTuningConfig.class);
+ Mockito.verify(mockIndexingServiceClient).compactSegments(
+ ArgumentMatchers.anyString(),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.anyInt(),
+ clientCompactionTaskQueryTuningConfigArgumentCaptor.capture(),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any()
+ );
+ Assert.assertNotNull(clientCompactionTaskQueryTuningConfigArgumentCaptor.getValue());
+ Assert.assertNotNull(clientCompactionTaskQueryTuningConfigArgumentCaptor.getValue().getAppendableIndexSpec());
+ Assert.assertFalse(((OnheapIncrementalIndex.Spec) clientCompactionTaskQueryTuningConfigArgumentCaptor.getValue().getAppendableIndexSpec()).isPreserveExistingMetrics());
+ }
+
private void verifySnapshot(
CompactSegments compactSegments,
AutoCompactionSnapshot.AutoCompactionScheduleStatus scheduleStatus,
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java
index fd7549c629..cbcd1b906b 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java
@@ -101,7 +101,7 @@ public class NewestSegmentFirstIteratorTest
Assert.assertEquals(
new DynamicPartitionsSpec(null, Long.MAX_VALUE),
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
- ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment())
+ ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null)
)
);
}
@@ -145,7 +145,7 @@ public class NewestSegmentFirstIteratorTest
Assert.assertEquals(
new DynamicPartitionsSpec(null, Long.MAX_VALUE),
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
- ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment())
+ ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null)
)
);
}
@@ -189,7 +189,7 @@ public class NewestSegmentFirstIteratorTest
Assert.assertEquals(
new DynamicPartitionsSpec(null, 1000L),
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
- ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment())
+ ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null)
)
);
}
@@ -233,7 +233,7 @@ public class NewestSegmentFirstIteratorTest
Assert.assertEquals(
new DynamicPartitionsSpec(100, 1000L),
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
- ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment())
+ ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null)
)
);
}
@@ -277,7 +277,7 @@ public class NewestSegmentFirstIteratorTest
Assert.assertEquals(
new DynamicPartitionsSpec(100, 1000L),
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
- ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment())
+ ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null)
)
);
}
@@ -321,7 +321,7 @@ public class NewestSegmentFirstIteratorTest
Assert.assertEquals(
new DynamicPartitionsSpec(null, Long.MAX_VALUE),
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
- ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment())
+ ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null)
)
);
}
@@ -365,7 +365,7 @@ public class NewestSegmentFirstIteratorTest
Assert.assertEquals(
new DynamicPartitionsSpec(null, Long.MAX_VALUE),
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
- ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment())
+ ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null)
)
);
}
@@ -409,7 +409,7 @@ public class NewestSegmentFirstIteratorTest
Assert.assertEquals(
new HashedPartitionsSpec(null, 10, ImmutableList.of("dim")),
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
- ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment())
+ ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null)
)
);
}
@@ -453,7 +453,7 @@ public class NewestSegmentFirstIteratorTest
Assert.assertEquals(
new SingleDimensionPartitionsSpec(10000, null, "dim", false),
NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
- ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment())
+ ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null)
)
);
}
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java
index 2026873103..1b7570a972 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java
@@ -700,7 +700,7 @@ public class NewestSegmentFirstPolicyTest
// Same indexSpec as what is set in the auto compaction config
Map<String, Object> indexSpec = mapper.convertValue(new IndexSpec(), new TypeReference<Map<String, Object>>() {});
// Same partitionsSpec as what is set in the auto compaction config
- PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null));
+ PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
// Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY
final VersionedIntervalTimeline<String, DataSegment> timeline = createTimeline(
@@ -733,7 +733,7 @@ public class NewestSegmentFirstPolicyTest
// Same indexSpec as what is set in the auto compaction config
Map<String, Object> indexSpec = mapper.convertValue(new IndexSpec(), new TypeReference<Map<String, Object>>() {});
// Same partitionsSpec as what is set in the auto compaction config
- PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null));
+ PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
// Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY
final VersionedIntervalTimeline<String, DataSegment> timeline = createTimeline(
@@ -766,7 +766,7 @@ public class NewestSegmentFirstPolicyTest
// Same indexSpec as what is set in the auto compaction config
Map<String, Object> indexSpec = mapper.convertValue(new IndexSpec(), new TypeReference<Map<String, Object>>() {});
// Same partitionsSpec as what is set in the auto compaction config
- PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null));
+ PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
// Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY
final VersionedIntervalTimeline<String, DataSegment> timeline = createTimeline(
@@ -809,7 +809,7 @@ public class NewestSegmentFirstPolicyTest
// Same indexSpec as what is set in the auto compaction config
Map<String, Object> indexSpec = mapper.convertValue(new IndexSpec(), new TypeReference<Map<String, Object>>() {});
// Same partitionsSpec as what is set in the auto compaction config
- PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null));
+ PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
// Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY
final VersionedIntervalTimeline<String, DataSegment> timeline = createTimeline(
@@ -852,7 +852,7 @@ public class NewestSegmentFirstPolicyTest
// Same indexSpec as what is set in the auto compaction config
Map<String, Object> indexSpec = mapper.convertValue(new IndexSpec(), new TypeReference<Map<String, Object>>() {});
// Same partitionsSpec as what is set in the auto compaction config
- PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null));
+ PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
// Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY
final VersionedIntervalTimeline<String, DataSegment> timeline = createTimeline(
@@ -904,7 +904,7 @@ public class NewestSegmentFirstPolicyTest
// Same indexSpec as what is set in the auto compaction config
Map<String, Object> indexSpec = mapper.convertValue(new IndexSpec(), new TypeReference<Map<String, Object>>() {});
// Same partitionsSpec as what is set in the auto compaction config
- PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null));
+ PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
// Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY
final VersionedIntervalTimeline<String, DataSegment> timeline = createTimeline(
@@ -955,7 +955,7 @@ public class NewestSegmentFirstPolicyTest
// Same indexSpec as what is set in the auto compaction config
Map<String, Object> indexSpec = mapper.convertValue(new IndexSpec(), new TypeReference<Map<String, Object>>() {});
// Same partitionsSpec as what is set in the auto compaction config
- PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null));
+ PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
// Create segments that were compacted (CompactionState != null) and have
// rollup=false for interval 2017-10-01T00:00:00/2017-10-02T00:00:00,
@@ -1015,7 +1015,7 @@ public class NewestSegmentFirstPolicyTest
// Same indexSpec as what is set in the auto compaction config
Map<String, Object> indexSpec = mapper.convertValue(new IndexSpec(), new TypeReference<Map<String, Object>>() {});
// Same partitionsSpec as what is set in the auto compaction config
- PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null));
+ PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
// Create segments that were compacted (CompactionState != null) and have
// queryGranularity=DAY for interval 2017-10-01T00:00:00/2017-10-02T00:00:00,
@@ -1075,7 +1075,7 @@ public class NewestSegmentFirstPolicyTest
// Same indexSpec as what is set in the auto compaction config
Map<String, Object> indexSpec = mapper.convertValue(new IndexSpec(), new TypeReference<Map<String, Object>>() {});
// Same partitionsSpec as what is set in the auto compaction config
- PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null));
+ PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
// Create segments that were compacted (CompactionState != null) and have
// Dimensions=["foo", "bar"] for interval 2017-10-01T00:00:00/2017-10-02T00:00:00,
@@ -1174,7 +1174,7 @@ public class NewestSegmentFirstPolicyTest
// Same indexSpec as what is set in the auto compaction config
Map<String, Object> indexSpec = mapper.convertValue(new IndexSpec(), new TypeReference<Map<String, Object>>() {});
// Same partitionsSpec as what is set in the auto compaction config
- PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null));
+ PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
// Create segments that were compacted (CompactionState != null) and have
// filter=SelectorDimFilter("dim1", "foo", null) for interval 2017-10-01T00:00:00/2017-10-02T00:00:00,
@@ -1298,7 +1298,7 @@ public class NewestSegmentFirstPolicyTest
// Same indexSpec as what is set in the auto compaction config
Map<String, Object> indexSpec = mapper.convertValue(new IndexSpec(), new TypeReference<Map<String, Object>>() {});
// Same partitionsSpec as what is set in the auto compaction config
- PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null));
+ PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
// Create segments that were compacted (CompactionState != null) and have
// metricsSpec={CountAggregatorFactory("cnt")} for interval 2017-10-01T00:00:00/2017-10-02T00:00:00,
@@ -1447,7 +1447,7 @@ public class NewestSegmentFirstPolicyTest
// Different indexSpec as what is set in the auto compaction config
IndexSpec newIndexSpec = new IndexSpec(new ConciseBitmapSerdeFactory(), null, null, null);
Map<String, Object> newIndexSpecMap = mapper.convertValue(newIndexSpec, new TypeReference<Map<String, Object>>() {});
- PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null));
+ PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
// Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY
final VersionedIntervalTimeline<String, DataSegment> timeline = createTimeline(
@@ -1496,7 +1496,7 @@ public class NewestSegmentFirstPolicyTest
public void testIteratorDoesNotReturnSegmentWithChangingAppendableIndexSpec()
{
NullHandling.initializeForTests();
- PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null));
+ PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
final VersionedIntervalTimeline<String, DataSegment> timeline = createTimeline(
new SegmentGenerateSpec(
Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"),
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org