You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by su...@apache.org on 2023/02/27 15:57:12 UTC

[druid] branch master updated: Make CompactionSearchPolicy injectable (#13842)

This is an automated email from the ASF dual-hosted git repository.

suneet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 31c7de1087 Make CompactionSearchPolicy injectable (#13842)
31c7de1087 is described below

commit 31c7de1087a7b0ce096aed78fd472d1e8821892c
Author: Suneet Saldanha <su...@apache.org>
AuthorDate: Mon Feb 27 07:57:03 2023 -0800

    Make CompactionSearchPolicy injectable (#13842)
    
    * Make CompactionSearchPolicy injectable
    
    A small refactoring that makes the search policy for compaction injectable.
    
    Future changes can introduce new search policies that can be configured and
    injected so that operators can choose which search policy is best suited for
    their cluster.
    
    This will also allow us to de-couple the scheduling of compaction jobs from
    the CompactSegments duty, allowing the co-ordinator to schedule compaction
    jobs faster than the duty lifecycle.
    
    This PR is made so that it easy to review the future changes.
    
    * fix tests
---
 .../druid/server/coordinator/DruidCoordinator.java | 15 +++---
 .../server/coordinator/duty/CompactSegments.java   |  4 +-
 .../coordinator/duty/NewestSegmentFirstPolicy.java |  2 +
 .../server/coordinator/DruidCoordinatorTest.java   |  9 ++--
 .../coordinator/duty/CompactSegmentsTest.java      | 57 +++++++++++-----------
 .../simulate/CoordinatorSimulationBuilder.java     |  7 ++-
 .../java/org/apache/druid/cli/CliCoordinator.java  |  5 ++
 7 files changed, 56 insertions(+), 43 deletions(-)

diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
index 38fcd9efa5..a5912a765c 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
@@ -65,6 +65,7 @@ import org.apache.druid.query.DruidMetrics;
 import org.apache.druid.server.DruidNode;
 import org.apache.druid.server.coordinator.duty.BalanceSegments;
 import org.apache.druid.server.coordinator.duty.CompactSegments;
+import org.apache.druid.server.coordinator.duty.CompactionSegmentSearchPolicy;
 import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroup;
 import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroups;
 import org.apache.druid.server.coordinator.duty.CoordinatorDuty;
@@ -150,7 +151,6 @@ public class DruidCoordinator
   private final BalancerStrategyFactory factory;
   private final LookupCoordinatorManager lookupCoordinatorManager;
   private final DruidLeaderSelector coordLeaderSelector;
-  private final ObjectMapper objectMapper;
   private final CompactSegments compactSegments;
 
   private volatile boolean started = false;
@@ -184,7 +184,7 @@ public class DruidCoordinator
       BalancerStrategyFactory factory,
       LookupCoordinatorManager lookupCoordinatorManager,
       @Coordinator DruidLeaderSelector coordLeaderSelector,
-      ObjectMapper objectMapper
+      CompactionSegmentSearchPolicy compactionSegmentSearchPolicy
   )
   {
     this(
@@ -206,7 +206,7 @@ public class DruidCoordinator
         factory,
         lookupCoordinatorManager,
         coordLeaderSelector,
-        objectMapper
+        compactionSegmentSearchPolicy
     );
   }
 
@@ -229,7 +229,7 @@ public class DruidCoordinator
       BalancerStrategyFactory factory,
       LookupCoordinatorManager lookupCoordinatorManager,
       DruidLeaderSelector coordLeaderSelector,
-      ObjectMapper objectMapper
+      CompactionSegmentSearchPolicy compactionSegmentSearchPolicy
   )
   {
     this.config = config;
@@ -253,8 +253,7 @@ public class DruidCoordinator
     this.factory = factory;
     this.lookupCoordinatorManager = lookupCoordinatorManager;
     this.coordLeaderSelector = coordLeaderSelector;
-    this.objectMapper = objectMapper;
-    this.compactSegments = initializeCompactSegmentsDuty();
+    this.compactSegments = initializeCompactSegmentsDuty(compactionSegmentSearchPolicy);
   }
 
   public boolean isLeader()
@@ -790,11 +789,11 @@ public class DruidCoordinator
   }
 
   @VisibleForTesting
-  CompactSegments initializeCompactSegmentsDuty()
+  CompactSegments initializeCompactSegmentsDuty(CompactionSegmentSearchPolicy compactionSegmentSearchPolicy)
   {
     List<CompactSegments> compactSegmentsDutyFromCustomGroups = getCompactSegmentsDutyFromCustomGroups();
     if (compactSegmentsDutyFromCustomGroups.isEmpty()) {
-      return new CompactSegments(config, objectMapper, indexingServiceClient);
+      return new CompactSegments(config, compactionSegmentSearchPolicy, indexingServiceClient);
     } else {
       if (compactSegmentsDutyFromCustomGroups.size() > 1) {
         log.warn("More than one compactSegments duty is configured in the Coordinator Custom Duty Group. The first duty will be picked up.");
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
index 153b573eb1..47456a28db 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
@@ -94,11 +94,11 @@ public class CompactSegments implements CoordinatorCustomDuty
   @JsonCreator
   public CompactSegments(
       @JacksonInject DruidCoordinatorConfig config,
-      @JacksonInject ObjectMapper objectMapper,
+      @JacksonInject CompactionSegmentSearchPolicy policy,
       @JacksonInject IndexingServiceClient indexingServiceClient
   )
   {
-    this.policy = new NewestSegmentFirstPolicy(objectMapper);
+    this.policy = policy;
     this.indexingServiceClient = indexingServiceClient;
     this.skipLockedIntervals = config.getCompactionSkipLockedIntervals();
     autoCompactionSnapshotPerDataSource.set(new HashMap<>());
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicy.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicy.java
index 8306fab85e..ce4f0e1066 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicy.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicy.java
@@ -20,6 +20,7 @@
 package org.apache.druid.server.coordinator.duty;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.inject.Inject;
 import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
 import org.apache.druid.timeline.SegmentTimeline;
 import org.joda.time.Interval;
@@ -34,6 +35,7 @@ public class NewestSegmentFirstPolicy implements CompactionSegmentSearchPolicy
 {
   private final ObjectMapper objectMapper;
 
+  @Inject
   public NewestSegmentFirstPolicy(ObjectMapper objectMapper)
   {
     this.objectMapper = objectMapper;
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
index a0502d9544..4076ce9d3a 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
@@ -59,6 +59,7 @@ import org.apache.druid.server.coordinator.duty.CoordinatorDuty;
 import org.apache.druid.server.coordinator.duty.EmitClusterStatsAndMetrics;
 import org.apache.druid.server.coordinator.duty.KillSupervisorsCustomDuty;
 import org.apache.druid.server.coordinator.duty.LogUsedSegments;
+import org.apache.druid.server.coordinator.duty.NewestSegmentFirstPolicy;
 import org.apache.druid.server.coordinator.rules.ForeverBroadcastDistributionRule;
 import org.apache.druid.server.coordinator.rules.ForeverLoadRule;
 import org.apache.druid.server.coordinator.rules.IntervalLoadRule;
@@ -111,6 +112,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
   private DruidCoordinatorConfig druidCoordinatorConfig;
   private ObjectMapper objectMapper;
   private DruidNode druidNode;
+  private NewestSegmentFirstPolicy newestSegmentFirstPolicy;
   private final LatchableServiceEmitter serviceEmitter = new LatchableServiceEmitter();
 
   @Before
@@ -144,6 +146,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
     curator.blockUntilConnected();
     curator.create().creatingParentsIfNeeded().forPath(LOADPATH);
     objectMapper = new DefaultObjectMapper();
+    newestSegmentFirstPolicy = new NewestSegmentFirstPolicy(objectMapper);
     druidCoordinatorConfig = new TestDruidCoordinatorConfig.Builder()
         .withCoordinatorStartDelay(new Duration(COORDINATOR_START_DELAY))
         .withCoordinatorPeriod(new Duration(COORDINATOR_PERIOD))
@@ -779,7 +782,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
     Assert.assertTrue(compactSegmentsDutyFromCustomGroups.isEmpty());
 
     // CompactSegments returned by this method should be created using the DruidCoordinatorConfig in the DruidCoordinator
-    CompactSegments duty = coordinator.initializeCompactSegmentsDuty();
+    CompactSegments duty = coordinator.initializeCompactSegmentsDuty(newestSegmentFirstPolicy);
     Assert.assertNotNull(duty);
     Assert.assertEquals(druidCoordinatorConfig.getCompactionSkipLockedIntervals(), duty.isSkipLockedIntervals());
   }
@@ -819,7 +822,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
     Assert.assertTrue(compactSegmentsDutyFromCustomGroups.isEmpty());
 
     // CompactSegments returned by this method should be created using the DruidCoordinatorConfig in the DruidCoordinator
-    CompactSegments duty = coordinator.initializeCompactSegmentsDuty();
+    CompactSegments duty = coordinator.initializeCompactSegmentsDuty(newestSegmentFirstPolicy);
     Assert.assertNotNull(duty);
     Assert.assertEquals(druidCoordinatorConfig.getCompactionSkipLockedIntervals(), duty.isSkipLockedIntervals());
   }
@@ -870,7 +873,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
     Assert.assertTrue(compactSegmentsDutyFromCustomGroups.get(0) instanceof CompactSegments);
 
     // CompactSegments returned by this method should be from the Custom Duty Group
-    CompactSegments duty = coordinator.initializeCompactSegmentsDuty();
+    CompactSegments duty = coordinator.initializeCompactSegmentsDuty(newestSegmentFirstPolicy);
     Assert.assertNotNull(duty);
     Assert.assertNotEquals(druidCoordinatorConfig.getCompactionSkipLockedIntervals(), duty.isSkipLockedIntervals());
     // We should get the CompactSegment from the custom duty group which was created with a different config than the config in DruidCoordinator
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
index b0a92fb7fb..510689c9ee 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
@@ -139,6 +139,7 @@ public class CompactSegmentsTest
   private static final int TOTAL_SEGMENT_PER_DATASOURCE = 44;
   private static final int TOTAL_INTERVAL_PER_DATASOURCE = 11;
   private static final int MAXIMUM_CAPACITY_WITH_AUTO_SCALE = 10;
+  private static final NewestSegmentFirstPolicy SEARCH_POLICY = new NewestSegmentFirstPolicy(JSON_MAPPER);
 
   @Parameterized.Parameters(name = "{0}")
   public static Collection<Object[]> constructorFeeder()
@@ -250,11 +251,11 @@ public class CompactSegmentsTest
     JSON_MAPPER.setInjectableValues(
         new InjectableValues.Std()
             .addValue(DruidCoordinatorConfig.class, COORDINATOR_CONFIG)
-            .addValue(ObjectMapper.class, JSON_MAPPER)
             .addValue(IndexingServiceClient.class, indexingServiceClient)
+            .addValue(CompactionSegmentSearchPolicy.class, SEARCH_POLICY)
     );
 
-    final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, indexingServiceClient);
+    final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, indexingServiceClient);
     String compactSegmentString = JSON_MAPPER.writeValueAsString(compactSegments);
     CompactSegments serdeCompactSegments = JSON_MAPPER.readValue(compactSegmentString, CompactSegments.class);
 
@@ -269,7 +270,7 @@ public class CompactSegmentsTest
     final TestDruidLeaderClient leaderClient = new TestDruidLeaderClient(JSON_MAPPER);
     leaderClient.start();
     final HttpIndexingServiceClient indexingServiceClient = new HttpIndexingServiceClient(JSON_MAPPER, leaderClient);
-    final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, indexingServiceClient);
+    final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, indexingServiceClient);
 
     final Supplier<String> expectedVersionSupplier = new Supplier<String>()
     {
@@ -347,7 +348,7 @@ public class CompactSegmentsTest
     final TestDruidLeaderClient leaderClient = new TestDruidLeaderClient(JSON_MAPPER);
     leaderClient.start();
     final HttpIndexingServiceClient indexingServiceClient = new HttpIndexingServiceClient(JSON_MAPPER, leaderClient);
-    final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, indexingServiceClient);
+    final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, indexingServiceClient);
 
     // Before any compaction, we do not have any snapshot of compactions
     Map<String, AutoCompactionSnapshot> autoCompactionSnapshots = compactSegments.getAutoCompactionSnapshot();
@@ -446,7 +447,7 @@ public class CompactSegmentsTest
     final TestDruidLeaderClient leaderClient = new TestDruidLeaderClient(JSON_MAPPER);
     leaderClient.start();
     final HttpIndexingServiceClient indexingServiceClient = new HttpIndexingServiceClient(JSON_MAPPER, leaderClient);
-    final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, indexingServiceClient);
+    final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, indexingServiceClient);
 
     // Before any compaction, we do not have any snapshot of compactions
     Map<String, AutoCompactionSnapshot> autoCompactionSnapshots = compactSegments.getAutoCompactionSnapshot();
@@ -510,7 +511,7 @@ public class CompactSegmentsTest
     final TestDruidLeaderClient leaderClient = new TestDruidLeaderClient(JSON_MAPPER);
     leaderClient.start();
     final HttpIndexingServiceClient indexingServiceClient = new HttpIndexingServiceClient(JSON_MAPPER, leaderClient);
-    final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, indexingServiceClient);
+    final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, indexingServiceClient);
 
     // Before any compaction, we do not have any snapshot of compactions
     Map<String, AutoCompactionSnapshot> autoCompactionSnapshots = compactSegments.getAutoCompactionSnapshot();
@@ -606,7 +607,7 @@ public class CompactSegmentsTest
     final TestDruidLeaderClient leaderClient = new TestDruidLeaderClient(JSON_MAPPER);
     leaderClient.start();
     final HttpIndexingServiceClient indexingServiceClient = new HttpIndexingServiceClient(JSON_MAPPER, leaderClient);
-    final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, indexingServiceClient);
+    final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, indexingServiceClient);
 
     // Before any compaction, we do not have any snapshot of compactions
     Map<String, AutoCompactionSnapshot> autoCompactionSnapshots = compactSegments.getAutoCompactionSnapshot();
@@ -667,7 +668,7 @@ public class CompactSegmentsTest
     final TestDruidLeaderClient leaderClient = new TestDruidLeaderClient(JSON_MAPPER);
     leaderClient.start();
     final HttpIndexingServiceClient indexingServiceClient = new HttpIndexingServiceClient(JSON_MAPPER, leaderClient);
-    final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, indexingServiceClient);
+    final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, indexingServiceClient);
 
     final CoordinatorStats stats = doCompactSegments(compactSegments, 3);
     Assert.assertEquals(3, stats.getGlobalStat(CompactSegments.AVAILABLE_COMPACTION_TASK_SLOT));
@@ -683,7 +684,7 @@ public class CompactSegmentsTest
     final TestDruidLeaderClient leaderClient = new TestDruidLeaderClient(JSON_MAPPER);
     leaderClient.start();
     final HttpIndexingServiceClient indexingServiceClient = new HttpIndexingServiceClient(JSON_MAPPER, leaderClient);
-    final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, indexingServiceClient);
+    final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, indexingServiceClient);
     final CoordinatorStats stats = doCompactSegments(compactSegments, createCompactionConfigs(), maxCompactionSlot, true);
     Assert.assertEquals(maxCompactionSlot, stats.getGlobalStat(CompactSegments.AVAILABLE_COMPACTION_TASK_SLOT));
     Assert.assertEquals(maxCompactionSlot, stats.getGlobalStat(CompactSegments.MAX_COMPACTION_TASK_SLOT));
@@ -698,7 +699,7 @@ public class CompactSegmentsTest
     final TestDruidLeaderClient leaderClient = new TestDruidLeaderClient(JSON_MAPPER);
     leaderClient.start();
     final HttpIndexingServiceClient indexingServiceClient = new HttpIndexingServiceClient(JSON_MAPPER, leaderClient);
-    final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, indexingServiceClient);
+    final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, indexingServiceClient);
     final CoordinatorStats stats = doCompactSegments(compactSegments, createCompactionConfigs(), maxCompactionSlot, true);
     Assert.assertEquals(MAXIMUM_CAPACITY_WITH_AUTO_SCALE, stats.getGlobalStat(CompactSegments.AVAILABLE_COMPACTION_TASK_SLOT));
     Assert.assertEquals(MAXIMUM_CAPACITY_WITH_AUTO_SCALE, stats.getGlobalStat(CompactSegments.MAX_COMPACTION_TASK_SLOT));
@@ -709,7 +710,7 @@ public class CompactSegmentsTest
   public void testCompactWithoutGranularitySpec()
   {
     final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class);
-    final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, mockIndexingServiceClient);
+    final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, mockIndexingServiceClient);
     final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
     final String dataSource = DATA_SOURCE_PREFIX + 0;
     compactionConfigs.add(
@@ -775,7 +776,7 @@ public class CompactSegmentsTest
   public void testCompactWithNotNullIOConfig()
   {
     final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class);
-    final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, mockIndexingServiceClient);
+    final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, mockIndexingServiceClient);
     final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
     final String dataSource = DATA_SOURCE_PREFIX + 0;
     compactionConfigs.add(
@@ -835,7 +836,7 @@ public class CompactSegmentsTest
   public void testCompactWithNullIOConfig()
   {
     final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class);
-    final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, mockIndexingServiceClient);
+    final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, mockIndexingServiceClient);
     final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
     final String dataSource = DATA_SOURCE_PREFIX + 0;
     compactionConfigs.add(
@@ -895,7 +896,7 @@ public class CompactSegmentsTest
   public void testCompactWithGranularitySpec()
   {
     final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class);
-    final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, mockIndexingServiceClient);
+    final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, mockIndexingServiceClient);
     final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
     final String dataSource = DATA_SOURCE_PREFIX + 0;
     compactionConfigs.add(
@@ -963,7 +964,7 @@ public class CompactSegmentsTest
   public void testCompactWithDimensionSpec()
   {
     final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class);
-    final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, mockIndexingServiceClient);
+    final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, mockIndexingServiceClient);
     final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
     final String dataSource = DATA_SOURCE_PREFIX + 0;
     compactionConfigs.add(
@@ -1026,7 +1027,7 @@ public class CompactSegmentsTest
   public void testCompactWithoutDimensionSpec()
   {
     final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class);
-    final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, mockIndexingServiceClient);
+    final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, mockIndexingServiceClient);
     final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
     final String dataSource = DATA_SOURCE_PREFIX + 0;
     compactionConfigs.add(
@@ -1088,7 +1089,7 @@ public class CompactSegmentsTest
   public void testCompactWithRollupInGranularitySpec()
   {
     final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class);
-    final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, mockIndexingServiceClient);
+    final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, mockIndexingServiceClient);
     final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
     final String dataSource = DATA_SOURCE_PREFIX + 0;
     compactionConfigs.add(
@@ -1192,7 +1193,7 @@ public class CompactSegmentsTest
     Mockito.when(mockIndexingServiceClient.getActiveTasks()).thenReturn(ImmutableList.of(runningConflictCompactionTask));
     Mockito.when(mockIndexingServiceClient.getTaskPayload(ArgumentMatchers.eq(conflictTaskId))).thenReturn(runningConflictCompactionTaskPayload);
 
-    final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, mockIndexingServiceClient);
+    final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, mockIndexingServiceClient);
     final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
     compactionConfigs.add(
         new DataSourceCompactionConfig(
@@ -1266,7 +1267,7 @@ public class CompactSegmentsTest
     final TestDruidLeaderClient leaderClient = new TestDruidLeaderClient(JSON_MAPPER);
     leaderClient.start();
     final HttpIndexingServiceClient indexingServiceClient = new HttpIndexingServiceClient(JSON_MAPPER, leaderClient);
-    final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, indexingServiceClient);
+    final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, indexingServiceClient);
 
     final CoordinatorStats stats = doCompactSegments(compactSegments, createCompactionConfigs(2), 4);
     Assert.assertEquals(4, stats.getGlobalStat(CompactSegments.AVAILABLE_COMPACTION_TASK_SLOT));
@@ -1300,7 +1301,7 @@ public class CompactSegmentsTest
 
     // Verify that locked intervals are skipped and only one compaction task
     // is submitted for dataSource_0
-    CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, indexingServiceClient);
+    CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, indexingServiceClient);
     final CoordinatorStats stats = doCompactSegments(compactSegments, createCompactionConfigs(2), 4);
     Assert.assertEquals(1, stats.getGlobalStat(CompactSegments.COMPACTION_TASK_COUNT));
     Assert.assertEquals(1, leaderClient.submittedCompactionTasks.size());
@@ -1318,7 +1319,7 @@ public class CompactSegmentsTest
   {
     NullHandling.initializeForTests();
     final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class);
-    final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, mockIndexingServiceClient);
+    final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, mockIndexingServiceClient);
     final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
     final String dataSource = DATA_SOURCE_PREFIX + 0;
     compactionConfigs.add(
@@ -1381,7 +1382,7 @@ public class CompactSegmentsTest
   public void testCompactWithoutCustomSpecs()
   {
     final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class);
-    final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, mockIndexingServiceClient);
+    final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, mockIndexingServiceClient);
     final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
     final String dataSource = DATA_SOURCE_PREFIX + 0;
     compactionConfigs.add(
@@ -1448,7 +1449,7 @@ public class CompactSegmentsTest
     NullHandling.initializeForTests();
     AggregatorFactory[] aggregatorFactories = new AggregatorFactory[] {new CountAggregatorFactory("cnt")};
     final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class);
-    final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, mockIndexingServiceClient);
+    final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, mockIndexingServiceClient);
     final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
     final String dataSource = DATA_SOURCE_PREFIX + 0;
     compactionConfigs.add(
@@ -1532,7 +1533,7 @@ public class CompactSegmentsTest
         .add(Intervals.of("2017/2018"));
 
     // Verify that no locked intervals are skipped
-    CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, indexingServiceClient);
+    CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, indexingServiceClient);
     int maxTaskSlots = partitionsSpec instanceof SingleDimensionPartitionsSpec ? 5 : 3;
     final CoordinatorStats stats = doCompactSegments(compactSegments, createCompactionConfigs(1), maxTaskSlots);
     Assert.assertEquals(3, stats.getGlobalStat(CompactSegments.COMPACTION_TASK_COUNT));
@@ -1597,7 +1598,7 @@ public class CompactSegmentsTest
 
 
     final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class);
-    final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, mockIndexingServiceClient);
+    final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, mockIndexingServiceClient);
     final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
     compactionConfigs.add(
         new DataSourceCompactionConfig(
@@ -1695,7 +1696,7 @@ public class CompactSegmentsTest
 
 
     final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class);
-    final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, mockIndexingServiceClient);
+    final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, mockIndexingServiceClient);
     final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
     compactionConfigs.add(
         new DataSourceCompactionConfig(
@@ -1760,7 +1761,7 @@ public class CompactSegmentsTest
   public void testCompactWithMetricsSpecShouldSetPreserveExistingMetricsTrue()
   {
     final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class);
-    final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, mockIndexingServiceClient);
+    final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, mockIndexingServiceClient);
     final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
     final String dataSource = DATA_SOURCE_PREFIX + 0;
     compactionConfigs.add(
@@ -1823,7 +1824,7 @@ public class CompactSegmentsTest
   public void testCompactWithoutMetricsSpecShouldSetPreserveExistingMetricsFalse()
   {
     final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class);
-    final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, mockIndexingServiceClient);
+    final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, mockIndexingServiceClient);
     final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
     final String dataSource = DATA_SOURCE_PREFIX + 0;
     compactionConfigs.add(
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
index 7066183c3a..054d34422d 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
@@ -47,7 +47,9 @@ import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
 import org.apache.druid.server.coordinator.LoadQueueTaskMaster;
 import org.apache.druid.server.coordinator.RandomBalancerStrategyFactory;
 import org.apache.druid.server.coordinator.TestDruidCoordinatorConfig;
+import org.apache.druid.server.coordinator.duty.CompactionSegmentSearchPolicy;
 import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroups;
+import org.apache.druid.server.coordinator.duty.NewestSegmentFirstPolicy;
 import org.apache.druid.server.coordinator.rules.Rule;
 import org.apache.druid.server.lookup.cache.LookupCoordinatorManager;
 import org.apache.druid.timeline.DataSegment;
@@ -78,7 +80,8 @@ public class CoordinatorSimulationBuilder
               DataSegment.PruneSpecsHolder.DEFAULT
           )
       );
-
+  private static final CompactionSegmentSearchPolicy COMPACTION_SEGMENT_SEARCH_POLICY =
+      new NewestSegmentFirstPolicy(OBJECT_MAPPER);
   private String balancerStrategy;
   private CoordinatorDynamicConfig dynamicConfig =
       CoordinatorDynamicConfig.builder()
@@ -210,7 +213,7 @@ public class CoordinatorSimulationBuilder
         createBalancerStrategy(env),
         env.lookupCoordinatorManager,
         env.leaderSelector,
-        OBJECT_MAPPER
+        COMPACTION_SEGMENT_SEARCH_POLICY
     );
 
     return new SimulationImpl(coordinator, env);
diff --git a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
index 841034e2be..5684c3871c 100644
--- a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
+++ b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
@@ -83,6 +83,7 @@ import org.apache.druid.server.coordinator.DruidCoordinator;
 import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
 import org.apache.druid.server.coordinator.KillStalePendingSegments;
 import org.apache.druid.server.coordinator.LoadQueueTaskMaster;
+import org.apache.druid.server.coordinator.duty.CompactionSegmentSearchPolicy;
 import org.apache.druid.server.coordinator.duty.CoordinatorCustomDuty;
 import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroup;
 import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroups;
@@ -93,6 +94,7 @@ import org.apache.druid.server.coordinator.duty.KillDatasourceMetadata;
 import org.apache.druid.server.coordinator.duty.KillRules;
 import org.apache.druid.server.coordinator.duty.KillSupervisors;
 import org.apache.druid.server.coordinator.duty.KillUnusedSegments;
+import org.apache.druid.server.coordinator.duty.NewestSegmentFirstPolicy;
 import org.apache.druid.server.http.ClusterResource;
 import org.apache.druid.server.http.CompactionResource;
 import org.apache.druid.server.http.CoordinatorCompactionConfigsResource;
@@ -312,6 +314,9 @@ public class CliCoordinator extends ServerRunnable
                 KillCompactionConfig.class
             );
 
+            //TODO: make this configurable when there are multiple search policies
+            binder.bind(CompactionSegmentSearchPolicy.class).to(NewestSegmentFirstPolicy.class);
+
             bindAnnouncer(
                 binder,
                 Coordinator.class,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org