You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ma...@apache.org on 2022/04/15 22:47:54 UTC

[druid] branch master updated: Fix bug in auto compaction preserveExistingMetrics feature (#12438)

This is an automated email from the ASF dual-hosted git repository.

maytasm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new c25a556827 Fix bug in auto compaction preserveExistingMetrics feature (#12438)
c25a556827 is described below

commit c25a5568275c506115914cc5622a829c89f8e384
Author: Maytas Monsereenusorn <ma...@apache.org>
AuthorDate: Fri Apr 15 15:47:47 2022 -0700

    Fix bug in auto compaction preserveExistingMetrics feature (#12438)
    
    * fix bug
    
    * fix test
    
    * fix IT
---
 .../coordinator/duty/ITAutoCompactionTest.java     |  96 ++++++++++++++++
 .../incremental/OnheapIncrementalIndex.java        |  10 +-
 .../druid/segment/data/IncrementalIndexTest.java   |  69 ++++++++++++
 .../ClientCompactionTaskQueryTuningConfig.java     |   7 +-
 .../server/coordinator/duty/CompactSegments.java   |   2 +-
 .../duty/NewestSegmentFirstIterator.java           |   2 +-
 .../coordinator/duty/CompactSegmentsTest.java      | 125 +++++++++++++++++++++
 .../duty/NewestSegmentFirstIteratorTest.java       |  18 +--
 .../duty/NewestSegmentFirstPolicyTest.java         |  26 ++---
 9 files changed, 323 insertions(+), 32 deletions(-)

diff --git a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
index 7dd7591615..32d563f25b 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
@@ -45,6 +45,7 @@ import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.aggregation.CountAggregatorFactory;
 import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
+import org.apache.druid.query.aggregation.FloatSumAggregatorFactory;
 import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
 import org.apache.druid.query.aggregation.datasketches.hll.HllSketchBuildAggregatorFactory;
 import org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchAggregatorFactory;
@@ -121,6 +122,101 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
     fullDatasourceName = "wikipedia_index_test_" + UUID.randomUUID() + config.getExtraDatasourceNameSuffix();
   }
 
+  @Test
+  public void testAutoCompactionRowWithMetricAndRowWithoutMetricShouldPreserveExistingMetricsUsingAggregatorWithDifferentReturnType() throws Exception
+  {
+    // added = null, count = 2, sum_added = 62, quantilesDoublesSketch = 2, thetaSketch = 2, HLLSketchBuild = 2
+    loadData(INDEX_TASK_WITH_ROLLUP_FOR_PRESERVE_METRICS);
+    // added = 31, count = null, sum_added = null, quantilesDoublesSketch = null, thetaSketch = null, HLLSketchBuild = null
+    loadData(INDEX_TASK_WITHOUT_ROLLUP_FOR_PRESERVE_METRICS);
+    try (final Closeable ignored = unloader(fullDatasourceName)) {
+      final List<String> intervalsBeforeCompaction = coordinator.getSegmentIntervals(fullDatasourceName);
+      intervalsBeforeCompaction.sort(null);
+      // 2 segments across 1 days...
+      verifySegmentsCount(2);
+      ArrayList<Object> nullList = new ArrayList<Object>();
+      nullList.add(null);
+      Map<String, Object> queryAndResultFields = ImmutableMap.of(
+          "%%FIELD_TO_QUERY%%", "added",
+          "%%EXPECTED_COUNT_RESULT%%", 2,
+          "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(nullList)), ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(31))))
+      );
+      verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
+      queryAndResultFields = ImmutableMap.of(
+          "%%FIELD_TO_QUERY%%", "count",
+          "%%EXPECTED_COUNT_RESULT%%", 2,
+          "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(2))), ImmutableMap.of("events", ImmutableList.of(nullList)))
+      );
+      verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
+      queryAndResultFields = ImmutableMap.of(
+          "%%FIELD_TO_QUERY%%", "sum_added",
+          "%%EXPECTED_COUNT_RESULT%%", 2,
+          "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(62))), ImmutableMap.of("events", ImmutableList.of(nullList)))
+      );
+      verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
+      queryAndResultFields = ImmutableMap.of(
+          "%%QUANTILESRESULT%%", 2,
+          "%%THETARESULT%%", 2.0,
+          "%%HLLRESULT%%", 2
+      );
+      verifyQuery(INDEX_ROLLUP_SKETCH_QUERIES_RESOURCE, queryAndResultFields);
+
+      submitCompactionConfig(
+          MAX_ROWS_PER_SEGMENT_COMPACTED,
+          NO_SKIP_OFFSET,
+          new UserCompactionTaskGranularityConfig(null, null, true),
+          new UserCompactionTaskDimensionsConfig(DimensionsSpec.getDefaultSchemas(ImmutableList.of("language"))),
+          null,
+          new AggregatorFactory[]{
+              new CountAggregatorFactory("count"),
+              // FloatSumAggregator combine method takes in two Float but return Double
+              new FloatSumAggregatorFactory("sum_added", "added"),
+              new SketchMergeAggregatorFactory("thetaSketch", "user", 16384, true, false, null),
+              new HllSketchBuildAggregatorFactory("HLLSketchBuild", "user", 12, TgtHllType.HLL_4.name(), false),
+              new DoublesSketchAggregatorFactory("quantilesDoublesSketch", "delta", 128, 1000000000L)
+          },
+          false
+      );
+      // should now only have 1 row after compaction
+      // added = null, count = 3, sum_added = 93.0
+      forceTriggerAutoCompaction(1);
+
+      queryAndResultFields = ImmutableMap.of(
+          "%%FIELD_TO_QUERY%%", "added",
+          "%%EXPECTED_COUNT_RESULT%%", 1,
+          "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(nullList)))
+      );
+      verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
+      queryAndResultFields = ImmutableMap.of(
+          "%%FIELD_TO_QUERY%%", "count",
+          "%%EXPECTED_COUNT_RESULT%%", 1,
+          "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(3))))
+      );
+      verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
+      queryAndResultFields = ImmutableMap.of(
+          "%%FIELD_TO_QUERY%%", "sum_added",
+          "%%EXPECTED_COUNT_RESULT%%", 1,
+          "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(93.0f))))
+      );
+      verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
+      queryAndResultFields = ImmutableMap.of(
+          "%%QUANTILESRESULT%%", 3,
+          "%%THETARESULT%%", 3.0,
+          "%%HLLRESULT%%", 3
+      );
+      verifyQuery(INDEX_ROLLUP_SKETCH_QUERIES_RESOURCE, queryAndResultFields);
+
+      verifySegmentsCompacted(1, MAX_ROWS_PER_SEGMENT_COMPACTED);
+      checkCompactionIntervals(intervalsBeforeCompaction);
+
+      List<TaskResponseObject> compactTasksBefore = indexer.getCompleteTasksForDataSource(fullDatasourceName);
+      // Verify rollup segments does not get compacted again
+      forceTriggerAutoCompaction(1);
+      List<TaskResponseObject> compactTasksAfter = indexer.getCompleteTasksForDataSource(fullDatasourceName);
+      Assert.assertEquals(compactTasksAfter.size(), compactTasksBefore.size());
+    }
+  }
+
   @Test
   public void testAutoCompactionRowWithMetricAndRowWithoutMetricShouldPreserveExistingMetrics() throws Exception
   {
diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
index a28a0a620f..f521c3913b 100644
--- a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
+++ b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
@@ -452,13 +452,13 @@ public class OnheapIncrementalIndex extends IncrementalIndex
   @Override
   public float getMetricFloatValue(int rowOffset, int aggOffset)
   {
-    return getMetricHelper(getMetricAggs(), concurrentGet(rowOffset), aggOffset, Aggregator::getFloat);
+    return ((Number) getMetricHelper(getMetricAggs(), concurrentGet(rowOffset), aggOffset, Aggregator::getFloat)).floatValue();
   }
 
   @Override
   public long getMetricLongValue(int rowOffset, int aggOffset)
   {
-    return getMetricHelper(getMetricAggs(), concurrentGet(rowOffset), aggOffset, Aggregator::getLong);
+    return ((Number) getMetricHelper(getMetricAggs(), concurrentGet(rowOffset), aggOffset, Aggregator::getLong)).longValue();
   }
 
   @Override
@@ -470,7 +470,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex
   @Override
   protected double getMetricDoubleValue(int rowOffset, int aggOffset)
   {
-    return getMetricHelper(getMetricAggs(), concurrentGet(rowOffset), aggOffset, Aggregator::getDouble);
+    return ((Number) getMetricHelper(getMetricAggs(), concurrentGet(rowOffset), aggOffset, Aggregator::getDouble)).doubleValue();
   }
 
   @Override
@@ -544,7 +544,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex
    * If preserveExistingMetrics flag is set, then this method will combine values from two aggregators, the aggregator
    * for aggregating from input into output field and the aggregator for combining already aggregated field, as needed
    */
-  private <T> T getMetricHelper(AggregatorFactory[] metrics, Aggregator[] aggs, int aggOffset, Function<Aggregator, T> getMetricTypeFunction)
+  private <T> Object getMetricHelper(AggregatorFactory[] metrics, Aggregator[] aggs, int aggOffset, Function<Aggregator, T> getMetricTypeFunction)
   {
     if (preserveExistingMetrics) {
       // Since the preserveExistingMetrics flag is set, we will have to check and possibly retrieve the aggregated values
@@ -564,7 +564,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex
         AggregatorFactory aggregatorFactory = metrics[aggOffset];
         T aggregatedFromSource = getMetricTypeFunction.apply(aggs[aggOffset]);
         T aggregatedFromCombined = getMetricTypeFunction.apply(aggs[aggOffset + metrics.length]);
-        return (T) aggregatorFactory.combine(aggregatedFromSource, aggregatedFromCombined);
+        return aggregatorFactory.combine(aggregatedFromSource, aggregatedFromCombined);
       }
     } else {
       // If preserveExistingMetrics flag is not set then we simply get metrics from the list of Aggregator, aggs, using the
diff --git a/processing/src/test/java/org/apache/druid/segment/data/IncrementalIndexTest.java b/processing/src/test/java/org/apache/druid/segment/data/IncrementalIndexTest.java
index 3257ec6d49..77309ff277 100644
--- a/processing/src/test/java/org/apache/druid/segment/data/IncrementalIndexTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/data/IncrementalIndexTest.java
@@ -50,6 +50,7 @@ import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.aggregation.CountAggregatorFactory;
 import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
 import org.apache.druid.query.aggregation.FilteredAggregatorFactory;
+import org.apache.druid.query.aggregation.FloatSumAggregatorFactory;
 import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
 import org.apache.druid.query.filter.BoundDimFilter;
 import org.apache.druid.query.filter.SelectorDimFilter;
@@ -799,6 +800,74 @@ public class IncrementalIndexTest extends InitializedNullHandlingTest
     }
   }
 
+  @Test
+  public void testSchemaRollupWithRowWithExistingMetricsAndWithoutMetricUsingAggregatorWithDifferentReturnType() throws IndexSizeExceededException
+  {
+    AggregatorFactory[] aggregatorFactories = new AggregatorFactory[]{
+        new CountAggregatorFactory("count"),
+        // FloatSumAggregator combine method takes in two Float but return Double
+        new FloatSumAggregatorFactory("sum_of_x", "x")
+    };
+    final IncrementalIndex index = indexCreator.createIndex((Object) aggregatorFactories);
+    index.add(
+        new MapBasedInputRow(
+            1481871600000L,
+            Arrays.asList("name", "host"),
+            ImmutableMap.of("name", "name1", "host", "host", "x", 2)
+        )
+    );
+    index.add(
+        new MapBasedInputRow(
+            1481871600000L,
+            Arrays.asList("name", "host"),
+            ImmutableMap.of("name", "name1", "host", "host", "x", 3)
+        )
+    );
+    index.add(
+        new MapBasedInputRow(
+            1481871600000L,
+            Arrays.asList("name", "host"),
+            ImmutableMap.of("name", "name1", "host", "host", "count", 2, "sum_of_x", 4)
+        )
+    );
+    index.add(
+        new MapBasedInputRow(
+            1481871600000L,
+            Arrays.asList("name", "host"),
+            ImmutableMap.of("name", "name1", "host", "host", "count", 3, "sum_of_x", 5)
+        )
+    );
+
+    Assert.assertEquals(index.isRollup() ? 1 : 4, index.size());
+    Iterator<Row> iterator = index.iterator();
+    int rowCount = 0;
+    while (iterator.hasNext()) {
+      rowCount++;
+      Row row = iterator.next();
+      Assert.assertEquals(1481871600000L, row.getTimestampFromEpoch());
+      if (index.isRollup()) {
+        // All rows are rollup into one row
+        Assert.assertEquals(isPreserveExistingMetrics ? 7 : 4, row.getMetric("count").intValue());
+        Assert.assertEquals(isPreserveExistingMetrics ? 14 : 5, row.getMetric("sum_of_x").intValue());
+      } else {
+        // We still have 4 rows
+        if (rowCount == 1 || rowCount == 2) {
+          Assert.assertEquals(1, row.getMetric("count").intValue());
+          Assert.assertEquals(1 + rowCount, row.getMetric("sum_of_x").intValue());
+        } else {
+          if (isPreserveExistingMetrics) {
+            Assert.assertEquals(rowCount - 1, row.getMetric("count").intValue());
+            Assert.assertEquals(1 + rowCount, row.getMetric("sum_of_x").intValue());
+          } else {
+            Assert.assertEquals(1, row.getMetric("count").intValue());
+            // The rows does not have the dim "x", hence metric is null (useDefaultValueForNull=false) or 0 (useDefaultValueForNull=true)
+            Assert.assertEquals(NullHandling.sqlCompatible() ? null : 0.0f, row.getMetric("sum_of_x"));
+          }
+        }
+      }
+    }
+  }
+
   @Test
   public void testSchemaRollupWithRowWithOnlyExistingMetrics() throws IndexSizeExceededException
   {
diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQueryTuningConfig.java b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQueryTuningConfig.java
index f35257e244..374f61c512 100644
--- a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQueryTuningConfig.java
+++ b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQueryTuningConfig.java
@@ -79,13 +79,14 @@ public class ClientCompactionTaskQueryTuningConfig
 
   public static ClientCompactionTaskQueryTuningConfig from(
       @Nullable UserCompactionTaskQueryTuningConfig userCompactionTaskQueryTuningConfig,
-      @Nullable Integer maxRowsPerSegment
+      @Nullable Integer maxRowsPerSegment,
+      @Nullable Boolean preserveExistingMetrics
   )
   {
     if (userCompactionTaskQueryTuningConfig == null) {
       return new ClientCompactionTaskQueryTuningConfig(
           maxRowsPerSegment,
-          new OnheapIncrementalIndex.Spec(true),
+          new OnheapIncrementalIndex.Spec(preserveExistingMetrics),
           null,
           null,
           null,
@@ -107,7 +108,7 @@ public class ClientCompactionTaskQueryTuningConfig
     } else {
       AppendableIndexSpec appendableIndexSpecToUse = userCompactionTaskQueryTuningConfig.getAppendableIndexSpec() != null
                                                      ? userCompactionTaskQueryTuningConfig.getAppendableIndexSpec()
-                                                     : new OnheapIncrementalIndex.Spec(true);
+                                                     : new OnheapIncrementalIndex.Spec(preserveExistingMetrics);
       return new ClientCompactionTaskQueryTuningConfig(
           maxRowsPerSegment,
           appendableIndexSpecToUse,
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
index ddae02298b..c809e8168f 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
@@ -452,7 +452,7 @@ public class CompactSegments implements CoordinatorCustomDuty
             "coordinator-issued",
             segmentsToCompact,
             config.getTaskPriority(),
-            ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment()),
+            ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), config.getMetricsSpec() != null),
             granularitySpec,
             dimensionsSpec,
             config.getMetricsSpec(),
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java
index 8e2f5f3d2c..4f2f1afca5 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java
@@ -338,7 +338,7 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
   {
     Preconditions.checkState(!candidates.isEmpty(), "Empty candidates");
     final ClientCompactionTaskQueryTuningConfig tuningConfig =
-        ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment());
+        ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null);
     final PartitionsSpec partitionsSpecFromConfig = findPartitionsSpecFromConfig(tuningConfig);
     final CompactionState lastCompactionState = candidates.segments.get(0).getLastCompactionState();
     if (lastCompactionState == null) {
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
index 2b91a89abb..08b32feff3 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
@@ -69,6 +69,7 @@ import org.apache.druid.java.util.http.client.response.StringFullResponseHolder;
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.aggregation.CountAggregatorFactory;
 import org.apache.druid.query.filter.SelectorDimFilter;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
 import org.apache.druid.segment.transform.TransformSpec;
 import org.apache.druid.server.DruidNode;
 import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
@@ -1741,6 +1742,130 @@ public class CompactSegmentsTest
     Assert.assertEquals(expected, actual);
   }
 
+  @Test
+  public void testCompactWithMetricsSpecShouldSetPreserveExistingMetricsTrue()
+  {
+    final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class);
+    final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, mockIndexingServiceClient);
+    final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
+    final String dataSource = DATA_SOURCE_PREFIX + 0;
+    compactionConfigs.add(
+        new DataSourceCompactionConfig(
+            dataSource,
+            0,
+            500L,
+            null,
+            new Period("PT0H"), // smaller than segment interval
+            new UserCompactionTaskQueryTuningConfig(
+                null,
+                null,
+                null,
+                null,
+                null,
+                partitionsSpec,
+                null,
+                null,
+                null,
+                null,
+                null,
+                3,
+                null,
+                null,
+                null,
+                null,
+                null,
+                null
+            ),
+            null,
+            null,
+            new AggregatorFactory[] {new CountAggregatorFactory("cnt")},
+            null,
+            null,
+            null
+        )
+    );
+    doCompactSegments(compactSegments, compactionConfigs);
+    ArgumentCaptor<ClientCompactionTaskQueryTuningConfig> clientCompactionTaskQueryTuningConfigArgumentCaptor = ArgumentCaptor.forClass(
+        ClientCompactionTaskQueryTuningConfig.class);
+    Mockito.verify(mockIndexingServiceClient).compactSegments(
+        ArgumentMatchers.anyString(),
+        ArgumentMatchers.any(),
+        ArgumentMatchers.anyInt(),
+        clientCompactionTaskQueryTuningConfigArgumentCaptor.capture(),
+        ArgumentMatchers.any(),
+        ArgumentMatchers.any(),
+        ArgumentMatchers.any(),
+        ArgumentMatchers.any(),
+        ArgumentMatchers.any(),
+        ArgumentMatchers.any()
+    );
+    Assert.assertNotNull(clientCompactionTaskQueryTuningConfigArgumentCaptor.getValue());
+    Assert.assertNotNull(clientCompactionTaskQueryTuningConfigArgumentCaptor.getValue().getAppendableIndexSpec());
+    Assert.assertTrue(((OnheapIncrementalIndex.Spec) clientCompactionTaskQueryTuningConfigArgumentCaptor.getValue().getAppendableIndexSpec()).isPreserveExistingMetrics());
+  }
+
+  @Test
+  public void testCompactWithoutMetricsSpecShouldSetPreserveExistingMetricsFalse()
+  {
+    final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class);
+    final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, mockIndexingServiceClient);
+    final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
+    final String dataSource = DATA_SOURCE_PREFIX + 0;
+    compactionConfigs.add(
+        new DataSourceCompactionConfig(
+            dataSource,
+            0,
+            500L,
+            null,
+            new Period("PT0H"), // smaller than segment interval
+            new UserCompactionTaskQueryTuningConfig(
+                null,
+                null,
+                null,
+                null,
+                null,
+                partitionsSpec,
+                null,
+                null,
+                null,
+                null,
+                null,
+                3,
+                null,
+                null,
+                null,
+                null,
+                null,
+                null
+            ),
+            null,
+            null,
+            null,
+            null,
+            null,
+            null
+        )
+    );
+    doCompactSegments(compactSegments, compactionConfigs);
+    ArgumentCaptor<ClientCompactionTaskQueryTuningConfig> clientCompactionTaskQueryTuningConfigArgumentCaptor = ArgumentCaptor.forClass(
+        ClientCompactionTaskQueryTuningConfig.class);
+    Mockito.verify(mockIndexingServiceClient).compactSegments(
+        ArgumentMatchers.anyString(),
+        ArgumentMatchers.any(),
+        ArgumentMatchers.anyInt(),
+        clientCompactionTaskQueryTuningConfigArgumentCaptor.capture(),
+        ArgumentMatchers.any(),
+        ArgumentMatchers.any(),
+        ArgumentMatchers.any(),
+        ArgumentMatchers.any(),
+        ArgumentMatchers.any(),
+        ArgumentMatchers.any()
+    );
+    Assert.assertNotNull(clientCompactionTaskQueryTuningConfigArgumentCaptor.getValue());
+    Assert.assertNotNull(clientCompactionTaskQueryTuningConfigArgumentCaptor.getValue().getAppendableIndexSpec());
+    Assert.assertFalse(((OnheapIncrementalIndex.Spec) clientCompactionTaskQueryTuningConfigArgumentCaptor.getValue().getAppendableIndexSpec()).isPreserveExistingMetrics());
+  }
+
   private void verifySnapshot(
       CompactSegments compactSegments,
       AutoCompactionSnapshot.AutoCompactionScheduleStatus scheduleStatus,
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java
index fd7549c629..cbcd1b906b 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java
@@ -101,7 +101,7 @@ public class NewestSegmentFirstIteratorTest
     Assert.assertEquals(
         new DynamicPartitionsSpec(null, Long.MAX_VALUE),
         NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
-            ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment())
+            ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null)
         )
     );
   }
@@ -145,7 +145,7 @@ public class NewestSegmentFirstIteratorTest
     Assert.assertEquals(
         new DynamicPartitionsSpec(null, Long.MAX_VALUE),
         NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
-            ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment())
+            ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null)
         )
     );
   }
@@ -189,7 +189,7 @@ public class NewestSegmentFirstIteratorTest
     Assert.assertEquals(
         new DynamicPartitionsSpec(null, 1000L),
         NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
-            ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment())
+            ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null)
         )
     );
   }
@@ -233,7 +233,7 @@ public class NewestSegmentFirstIteratorTest
     Assert.assertEquals(
         new DynamicPartitionsSpec(100, 1000L),
         NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
-            ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment())
+            ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null)
         )
     );
   }
@@ -277,7 +277,7 @@ public class NewestSegmentFirstIteratorTest
     Assert.assertEquals(
         new DynamicPartitionsSpec(100, 1000L),
         NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
-            ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment())
+            ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null)
         )
     );
   }
@@ -321,7 +321,7 @@ public class NewestSegmentFirstIteratorTest
     Assert.assertEquals(
         new DynamicPartitionsSpec(null, Long.MAX_VALUE),
         NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
-            ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment())
+            ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null)
         )
     );
   }
@@ -365,7 +365,7 @@ public class NewestSegmentFirstIteratorTest
     Assert.assertEquals(
         new DynamicPartitionsSpec(null, Long.MAX_VALUE),
         NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
-            ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment())
+            ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null)
         )
     );
   }
@@ -409,7 +409,7 @@ public class NewestSegmentFirstIteratorTest
     Assert.assertEquals(
         new HashedPartitionsSpec(null, 10, ImmutableList.of("dim")),
         NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
-            ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment())
+            ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null)
         )
     );
   }
@@ -453,7 +453,7 @@ public class NewestSegmentFirstIteratorTest
     Assert.assertEquals(
         new SingleDimensionPartitionsSpec(10000, null, "dim", false),
         NewestSegmentFirstIterator.findPartitionsSpecFromConfig(
-            ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment())
+            ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null)
         )
     );
   }
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java
index 2026873103..1b7570a972 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java
@@ -700,7 +700,7 @@ public class NewestSegmentFirstPolicyTest
     // Same indexSpec as what is set in the auto compaction config
     Map<String, Object> indexSpec = mapper.convertValue(new IndexSpec(), new TypeReference<Map<String, Object>>() {});
     // Same partitionsSpec as what is set in the auto compaction config
-    PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null));
+    PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
 
     // Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY
     final VersionedIntervalTimeline<String, DataSegment> timeline = createTimeline(
@@ -733,7 +733,7 @@ public class NewestSegmentFirstPolicyTest
     // Same indexSpec as what is set in the auto compaction config
     Map<String, Object> indexSpec = mapper.convertValue(new IndexSpec(), new TypeReference<Map<String, Object>>() {});
     // Same partitionsSpec as what is set in the auto compaction config
-    PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null));
+    PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
 
     // Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY
     final VersionedIntervalTimeline<String, DataSegment> timeline = createTimeline(
@@ -766,7 +766,7 @@ public class NewestSegmentFirstPolicyTest
     // Same indexSpec as what is set in the auto compaction config
     Map<String, Object> indexSpec = mapper.convertValue(new IndexSpec(), new TypeReference<Map<String, Object>>() {});
     // Same partitionsSpec as what is set in the auto compaction config
-    PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null));
+    PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
 
     // Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY
     final VersionedIntervalTimeline<String, DataSegment> timeline = createTimeline(
@@ -809,7 +809,7 @@ public class NewestSegmentFirstPolicyTest
     // Same indexSpec as what is set in the auto compaction config
     Map<String, Object> indexSpec = mapper.convertValue(new IndexSpec(), new TypeReference<Map<String, Object>>() {});
     // Same partitionsSpec as what is set in the auto compaction config
-    PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null));
+    PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
 
     // Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY
     final VersionedIntervalTimeline<String, DataSegment> timeline = createTimeline(
@@ -852,7 +852,7 @@ public class NewestSegmentFirstPolicyTest
     // Same indexSpec as what is set in the auto compaction config
     Map<String, Object> indexSpec = mapper.convertValue(new IndexSpec(), new TypeReference<Map<String, Object>>() {});
     // Same partitionsSpec as what is set in the auto compaction config
-    PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null));
+    PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
 
     // Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY
     final VersionedIntervalTimeline<String, DataSegment> timeline = createTimeline(
@@ -904,7 +904,7 @@ public class NewestSegmentFirstPolicyTest
     // Same indexSpec as what is set in the auto compaction config
     Map<String, Object> indexSpec = mapper.convertValue(new IndexSpec(), new TypeReference<Map<String, Object>>() {});
     // Same partitionsSpec as what is set in the auto compaction config
-    PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null));
+    PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
 
     // Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY
     final VersionedIntervalTimeline<String, DataSegment> timeline = createTimeline(
@@ -955,7 +955,7 @@ public class NewestSegmentFirstPolicyTest
     // Same indexSpec as what is set in the auto compaction config
     Map<String, Object> indexSpec = mapper.convertValue(new IndexSpec(), new TypeReference<Map<String, Object>>() {});
     // Same partitionsSpec as what is set in the auto compaction config
-    PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null));
+    PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
 
     // Create segments that were compacted (CompactionState != null) and have
     // rollup=false for interval 2017-10-01T00:00:00/2017-10-02T00:00:00,
@@ -1015,7 +1015,7 @@ public class NewestSegmentFirstPolicyTest
     // Same indexSpec as what is set in the auto compaction config
     Map<String, Object> indexSpec = mapper.convertValue(new IndexSpec(), new TypeReference<Map<String, Object>>() {});
     // Same partitionsSpec as what is set in the auto compaction config
-    PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null));
+    PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
 
     // Create segments that were compacted (CompactionState != null) and have
     // queryGranularity=DAY for interval 2017-10-01T00:00:00/2017-10-02T00:00:00,
@@ -1075,7 +1075,7 @@ public class NewestSegmentFirstPolicyTest
     // Same indexSpec as what is set in the auto compaction config
     Map<String, Object> indexSpec = mapper.convertValue(new IndexSpec(), new TypeReference<Map<String, Object>>() {});
     // Same partitionsSpec as what is set in the auto compaction config
-    PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null));
+    PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
 
     // Create segments that were compacted (CompactionState != null) and have
     // Dimensions=["foo", "bar"] for interval 2017-10-01T00:00:00/2017-10-02T00:00:00,
@@ -1174,7 +1174,7 @@ public class NewestSegmentFirstPolicyTest
     // Same indexSpec as what is set in the auto compaction config
     Map<String, Object> indexSpec = mapper.convertValue(new IndexSpec(), new TypeReference<Map<String, Object>>() {});
     // Same partitionsSpec as what is set in the auto compaction config
-    PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null));
+    PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
 
     // Create segments that were compacted (CompactionState != null) and have
     // filter=SelectorDimFilter("dim1", "foo", null) for interval 2017-10-01T00:00:00/2017-10-02T00:00:00,
@@ -1298,7 +1298,7 @@ public class NewestSegmentFirstPolicyTest
     // Same indexSpec as what is set in the auto compaction config
     Map<String, Object> indexSpec = mapper.convertValue(new IndexSpec(), new TypeReference<Map<String, Object>>() {});
     // Same partitionsSpec as what is set in the auto compaction config
-    PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null));
+    PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
 
     // Create segments that were compacted (CompactionState != null) and have
     // metricsSpec={CountAggregatorFactory("cnt")} for interval 2017-10-01T00:00:00/2017-10-02T00:00:00,
@@ -1447,7 +1447,7 @@ public class NewestSegmentFirstPolicyTest
     // Different indexSpec as what is set in the auto compaction config
     IndexSpec newIndexSpec = new IndexSpec(new ConciseBitmapSerdeFactory(), null, null, null);
     Map<String, Object> newIndexSpecMap = mapper.convertValue(newIndexSpec, new TypeReference<Map<String, Object>>() {});
-    PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null));
+    PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
 
     // Create segments that were compacted (CompactionState != null) and have segmentGranularity=DAY
     final VersionedIntervalTimeline<String, DataSegment> timeline = createTimeline(
@@ -1496,7 +1496,7 @@ public class NewestSegmentFirstPolicyTest
   public void testIteratorDoesNotReturnSegmentWithChangingAppendableIndexSpec()
   {
     NullHandling.initializeForTests();
-    PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null));
+    PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null, null));
     final VersionedIntervalTimeline<String, DataSegment> timeline = createTimeline(
         new SegmentGenerateSpec(
             Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org