You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by su...@apache.org on 2023/02/27 15:57:12 UTC
[druid] branch master updated: Make CompactionSearchPolicy injectable (#13842)
This is an automated email from the ASF dual-hosted git repository.
suneet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 31c7de1087 Make CompactionSearchPolicy injectable (#13842)
31c7de1087 is described below
commit 31c7de1087a7b0ce096aed78fd472d1e8821892c
Author: Suneet Saldanha <su...@apache.org>
AuthorDate: Mon Feb 27 07:57:03 2023 -0800
Make CompactionSearchPolicy injectable (#13842)
* Make CompactionSearchPolicy injectable
A small refactoring that makes the search policy for compaction injectable.
Future changes can introduce new search policies that can be configured and
injected so that operators can choose which search policy is best suited for
their cluster.
This will also allow us to de-couple the scheduling of compaction jobs from
the CompactSegments duty, allowing the co-ordinator to schedule compaction
jobs faster than the duty lifecycle.
This PR is made so that it easy to review the future changes.
* fix tests
---
.../druid/server/coordinator/DruidCoordinator.java | 15 +++---
.../server/coordinator/duty/CompactSegments.java | 4 +-
.../coordinator/duty/NewestSegmentFirstPolicy.java | 2 +
.../server/coordinator/DruidCoordinatorTest.java | 9 ++--
.../coordinator/duty/CompactSegmentsTest.java | 57 +++++++++++-----------
.../simulate/CoordinatorSimulationBuilder.java | 7 ++-
.../java/org/apache/druid/cli/CliCoordinator.java | 5 ++
7 files changed, 56 insertions(+), 43 deletions(-)
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
index 38fcd9efa5..a5912a765c 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
@@ -65,6 +65,7 @@ import org.apache.druid.query.DruidMetrics;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordinator.duty.BalanceSegments;
import org.apache.druid.server.coordinator.duty.CompactSegments;
+import org.apache.druid.server.coordinator.duty.CompactionSegmentSearchPolicy;
import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroup;
import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroups;
import org.apache.druid.server.coordinator.duty.CoordinatorDuty;
@@ -150,7 +151,6 @@ public class DruidCoordinator
private final BalancerStrategyFactory factory;
private final LookupCoordinatorManager lookupCoordinatorManager;
private final DruidLeaderSelector coordLeaderSelector;
- private final ObjectMapper objectMapper;
private final CompactSegments compactSegments;
private volatile boolean started = false;
@@ -184,7 +184,7 @@ public class DruidCoordinator
BalancerStrategyFactory factory,
LookupCoordinatorManager lookupCoordinatorManager,
@Coordinator DruidLeaderSelector coordLeaderSelector,
- ObjectMapper objectMapper
+ CompactionSegmentSearchPolicy compactionSegmentSearchPolicy
)
{
this(
@@ -206,7 +206,7 @@ public class DruidCoordinator
factory,
lookupCoordinatorManager,
coordLeaderSelector,
- objectMapper
+ compactionSegmentSearchPolicy
);
}
@@ -229,7 +229,7 @@ public class DruidCoordinator
BalancerStrategyFactory factory,
LookupCoordinatorManager lookupCoordinatorManager,
DruidLeaderSelector coordLeaderSelector,
- ObjectMapper objectMapper
+ CompactionSegmentSearchPolicy compactionSegmentSearchPolicy
)
{
this.config = config;
@@ -253,8 +253,7 @@ public class DruidCoordinator
this.factory = factory;
this.lookupCoordinatorManager = lookupCoordinatorManager;
this.coordLeaderSelector = coordLeaderSelector;
- this.objectMapper = objectMapper;
- this.compactSegments = initializeCompactSegmentsDuty();
+ this.compactSegments = initializeCompactSegmentsDuty(compactionSegmentSearchPolicy);
}
public boolean isLeader()
@@ -790,11 +789,11 @@ public class DruidCoordinator
}
@VisibleForTesting
- CompactSegments initializeCompactSegmentsDuty()
+ CompactSegments initializeCompactSegmentsDuty(CompactionSegmentSearchPolicy compactionSegmentSearchPolicy)
{
List<CompactSegments> compactSegmentsDutyFromCustomGroups = getCompactSegmentsDutyFromCustomGroups();
if (compactSegmentsDutyFromCustomGroups.isEmpty()) {
- return new CompactSegments(config, objectMapper, indexingServiceClient);
+ return new CompactSegments(config, compactionSegmentSearchPolicy, indexingServiceClient);
} else {
if (compactSegmentsDutyFromCustomGroups.size() > 1) {
log.warn("More than one compactSegments duty is configured in the Coordinator Custom Duty Group. The first duty will be picked up.");
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
index 153b573eb1..47456a28db 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
@@ -94,11 +94,11 @@ public class CompactSegments implements CoordinatorCustomDuty
@JsonCreator
public CompactSegments(
@JacksonInject DruidCoordinatorConfig config,
- @JacksonInject ObjectMapper objectMapper,
+ @JacksonInject CompactionSegmentSearchPolicy policy,
@JacksonInject IndexingServiceClient indexingServiceClient
)
{
- this.policy = new NewestSegmentFirstPolicy(objectMapper);
+ this.policy = policy;
this.indexingServiceClient = indexingServiceClient;
this.skipLockedIntervals = config.getCompactionSkipLockedIntervals();
autoCompactionSnapshotPerDataSource.set(new HashMap<>());
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicy.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicy.java
index 8306fab85e..ce4f0e1066 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicy.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicy.java
@@ -20,6 +20,7 @@
package org.apache.druid.server.coordinator.duty;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.inject.Inject;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.timeline.SegmentTimeline;
import org.joda.time.Interval;
@@ -34,6 +35,7 @@ public class NewestSegmentFirstPolicy implements CompactionSegmentSearchPolicy
{
private final ObjectMapper objectMapper;
+ @Inject
public NewestSegmentFirstPolicy(ObjectMapper objectMapper)
{
this.objectMapper = objectMapper;
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
index a0502d9544..4076ce9d3a 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
@@ -59,6 +59,7 @@ import org.apache.druid.server.coordinator.duty.CoordinatorDuty;
import org.apache.druid.server.coordinator.duty.EmitClusterStatsAndMetrics;
import org.apache.druid.server.coordinator.duty.KillSupervisorsCustomDuty;
import org.apache.druid.server.coordinator.duty.LogUsedSegments;
+import org.apache.druid.server.coordinator.duty.NewestSegmentFirstPolicy;
import org.apache.druid.server.coordinator.rules.ForeverBroadcastDistributionRule;
import org.apache.druid.server.coordinator.rules.ForeverLoadRule;
import org.apache.druid.server.coordinator.rules.IntervalLoadRule;
@@ -111,6 +112,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
private DruidCoordinatorConfig druidCoordinatorConfig;
private ObjectMapper objectMapper;
private DruidNode druidNode;
+ private NewestSegmentFirstPolicy newestSegmentFirstPolicy;
private final LatchableServiceEmitter serviceEmitter = new LatchableServiceEmitter();
@Before
@@ -144,6 +146,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
curator.blockUntilConnected();
curator.create().creatingParentsIfNeeded().forPath(LOADPATH);
objectMapper = new DefaultObjectMapper();
+ newestSegmentFirstPolicy = new NewestSegmentFirstPolicy(objectMapper);
druidCoordinatorConfig = new TestDruidCoordinatorConfig.Builder()
.withCoordinatorStartDelay(new Duration(COORDINATOR_START_DELAY))
.withCoordinatorPeriod(new Duration(COORDINATOR_PERIOD))
@@ -779,7 +782,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
Assert.assertTrue(compactSegmentsDutyFromCustomGroups.isEmpty());
// CompactSegments returned by this method should be created using the DruidCoordinatorConfig in the DruidCoordinator
- CompactSegments duty = coordinator.initializeCompactSegmentsDuty();
+ CompactSegments duty = coordinator.initializeCompactSegmentsDuty(newestSegmentFirstPolicy);
Assert.assertNotNull(duty);
Assert.assertEquals(druidCoordinatorConfig.getCompactionSkipLockedIntervals(), duty.isSkipLockedIntervals());
}
@@ -819,7 +822,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
Assert.assertTrue(compactSegmentsDutyFromCustomGroups.isEmpty());
// CompactSegments returned by this method should be created using the DruidCoordinatorConfig in the DruidCoordinator
- CompactSegments duty = coordinator.initializeCompactSegmentsDuty();
+ CompactSegments duty = coordinator.initializeCompactSegmentsDuty(newestSegmentFirstPolicy);
Assert.assertNotNull(duty);
Assert.assertEquals(druidCoordinatorConfig.getCompactionSkipLockedIntervals(), duty.isSkipLockedIntervals());
}
@@ -870,7 +873,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
Assert.assertTrue(compactSegmentsDutyFromCustomGroups.get(0) instanceof CompactSegments);
// CompactSegments returned by this method should be from the Custom Duty Group
- CompactSegments duty = coordinator.initializeCompactSegmentsDuty();
+ CompactSegments duty = coordinator.initializeCompactSegmentsDuty(newestSegmentFirstPolicy);
Assert.assertNotNull(duty);
Assert.assertNotEquals(druidCoordinatorConfig.getCompactionSkipLockedIntervals(), duty.isSkipLockedIntervals());
// We should get the CompactSegment from the custom duty group which was created with a different config than the config in DruidCoordinator
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
index b0a92fb7fb..510689c9ee 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
@@ -139,6 +139,7 @@ public class CompactSegmentsTest
private static final int TOTAL_SEGMENT_PER_DATASOURCE = 44;
private static final int TOTAL_INTERVAL_PER_DATASOURCE = 11;
private static final int MAXIMUM_CAPACITY_WITH_AUTO_SCALE = 10;
+ private static final NewestSegmentFirstPolicy SEARCH_POLICY = new NewestSegmentFirstPolicy(JSON_MAPPER);
@Parameterized.Parameters(name = "{0}")
public static Collection<Object[]> constructorFeeder()
@@ -250,11 +251,11 @@ public class CompactSegmentsTest
JSON_MAPPER.setInjectableValues(
new InjectableValues.Std()
.addValue(DruidCoordinatorConfig.class, COORDINATOR_CONFIG)
- .addValue(ObjectMapper.class, JSON_MAPPER)
.addValue(IndexingServiceClient.class, indexingServiceClient)
+ .addValue(CompactionSegmentSearchPolicy.class, SEARCH_POLICY)
);
- final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, indexingServiceClient);
+ final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, indexingServiceClient);
String compactSegmentString = JSON_MAPPER.writeValueAsString(compactSegments);
CompactSegments serdeCompactSegments = JSON_MAPPER.readValue(compactSegmentString, CompactSegments.class);
@@ -269,7 +270,7 @@ public class CompactSegmentsTest
final TestDruidLeaderClient leaderClient = new TestDruidLeaderClient(JSON_MAPPER);
leaderClient.start();
final HttpIndexingServiceClient indexingServiceClient = new HttpIndexingServiceClient(JSON_MAPPER, leaderClient);
- final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, indexingServiceClient);
+ final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, indexingServiceClient);
final Supplier<String> expectedVersionSupplier = new Supplier<String>()
{
@@ -347,7 +348,7 @@ public class CompactSegmentsTest
final TestDruidLeaderClient leaderClient = new TestDruidLeaderClient(JSON_MAPPER);
leaderClient.start();
final HttpIndexingServiceClient indexingServiceClient = new HttpIndexingServiceClient(JSON_MAPPER, leaderClient);
- final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, indexingServiceClient);
+ final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, indexingServiceClient);
// Before any compaction, we do not have any snapshot of compactions
Map<String, AutoCompactionSnapshot> autoCompactionSnapshots = compactSegments.getAutoCompactionSnapshot();
@@ -446,7 +447,7 @@ public class CompactSegmentsTest
final TestDruidLeaderClient leaderClient = new TestDruidLeaderClient(JSON_MAPPER);
leaderClient.start();
final HttpIndexingServiceClient indexingServiceClient = new HttpIndexingServiceClient(JSON_MAPPER, leaderClient);
- final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, indexingServiceClient);
+ final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, indexingServiceClient);
// Before any compaction, we do not have any snapshot of compactions
Map<String, AutoCompactionSnapshot> autoCompactionSnapshots = compactSegments.getAutoCompactionSnapshot();
@@ -510,7 +511,7 @@ public class CompactSegmentsTest
final TestDruidLeaderClient leaderClient = new TestDruidLeaderClient(JSON_MAPPER);
leaderClient.start();
final HttpIndexingServiceClient indexingServiceClient = new HttpIndexingServiceClient(JSON_MAPPER, leaderClient);
- final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, indexingServiceClient);
+ final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, indexingServiceClient);
// Before any compaction, we do not have any snapshot of compactions
Map<String, AutoCompactionSnapshot> autoCompactionSnapshots = compactSegments.getAutoCompactionSnapshot();
@@ -606,7 +607,7 @@ public class CompactSegmentsTest
final TestDruidLeaderClient leaderClient = new TestDruidLeaderClient(JSON_MAPPER);
leaderClient.start();
final HttpIndexingServiceClient indexingServiceClient = new HttpIndexingServiceClient(JSON_MAPPER, leaderClient);
- final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, indexingServiceClient);
+ final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, indexingServiceClient);
// Before any compaction, we do not have any snapshot of compactions
Map<String, AutoCompactionSnapshot> autoCompactionSnapshots = compactSegments.getAutoCompactionSnapshot();
@@ -667,7 +668,7 @@ public class CompactSegmentsTest
final TestDruidLeaderClient leaderClient = new TestDruidLeaderClient(JSON_MAPPER);
leaderClient.start();
final HttpIndexingServiceClient indexingServiceClient = new HttpIndexingServiceClient(JSON_MAPPER, leaderClient);
- final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, indexingServiceClient);
+ final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, indexingServiceClient);
final CoordinatorStats stats = doCompactSegments(compactSegments, 3);
Assert.assertEquals(3, stats.getGlobalStat(CompactSegments.AVAILABLE_COMPACTION_TASK_SLOT));
@@ -683,7 +684,7 @@ public class CompactSegmentsTest
final TestDruidLeaderClient leaderClient = new TestDruidLeaderClient(JSON_MAPPER);
leaderClient.start();
final HttpIndexingServiceClient indexingServiceClient = new HttpIndexingServiceClient(JSON_MAPPER, leaderClient);
- final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, indexingServiceClient);
+ final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, indexingServiceClient);
final CoordinatorStats stats = doCompactSegments(compactSegments, createCompactionConfigs(), maxCompactionSlot, true);
Assert.assertEquals(maxCompactionSlot, stats.getGlobalStat(CompactSegments.AVAILABLE_COMPACTION_TASK_SLOT));
Assert.assertEquals(maxCompactionSlot, stats.getGlobalStat(CompactSegments.MAX_COMPACTION_TASK_SLOT));
@@ -698,7 +699,7 @@ public class CompactSegmentsTest
final TestDruidLeaderClient leaderClient = new TestDruidLeaderClient(JSON_MAPPER);
leaderClient.start();
final HttpIndexingServiceClient indexingServiceClient = new HttpIndexingServiceClient(JSON_MAPPER, leaderClient);
- final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, indexingServiceClient);
+ final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, indexingServiceClient);
final CoordinatorStats stats = doCompactSegments(compactSegments, createCompactionConfigs(), maxCompactionSlot, true);
Assert.assertEquals(MAXIMUM_CAPACITY_WITH_AUTO_SCALE, stats.getGlobalStat(CompactSegments.AVAILABLE_COMPACTION_TASK_SLOT));
Assert.assertEquals(MAXIMUM_CAPACITY_WITH_AUTO_SCALE, stats.getGlobalStat(CompactSegments.MAX_COMPACTION_TASK_SLOT));
@@ -709,7 +710,7 @@ public class CompactSegmentsTest
public void testCompactWithoutGranularitySpec()
{
final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class);
- final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, mockIndexingServiceClient);
+ final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, mockIndexingServiceClient);
final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
final String dataSource = DATA_SOURCE_PREFIX + 0;
compactionConfigs.add(
@@ -775,7 +776,7 @@ public class CompactSegmentsTest
public void testCompactWithNotNullIOConfig()
{
final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class);
- final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, mockIndexingServiceClient);
+ final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, mockIndexingServiceClient);
final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
final String dataSource = DATA_SOURCE_PREFIX + 0;
compactionConfigs.add(
@@ -835,7 +836,7 @@ public class CompactSegmentsTest
public void testCompactWithNullIOConfig()
{
final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class);
- final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, mockIndexingServiceClient);
+ final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, mockIndexingServiceClient);
final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
final String dataSource = DATA_SOURCE_PREFIX + 0;
compactionConfigs.add(
@@ -895,7 +896,7 @@ public class CompactSegmentsTest
public void testCompactWithGranularitySpec()
{
final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class);
- final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, mockIndexingServiceClient);
+ final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, mockIndexingServiceClient);
final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
final String dataSource = DATA_SOURCE_PREFIX + 0;
compactionConfigs.add(
@@ -963,7 +964,7 @@ public class CompactSegmentsTest
public void testCompactWithDimensionSpec()
{
final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class);
- final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, mockIndexingServiceClient);
+ final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, mockIndexingServiceClient);
final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
final String dataSource = DATA_SOURCE_PREFIX + 0;
compactionConfigs.add(
@@ -1026,7 +1027,7 @@ public class CompactSegmentsTest
public void testCompactWithoutDimensionSpec()
{
final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class);
- final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, mockIndexingServiceClient);
+ final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, mockIndexingServiceClient);
final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
final String dataSource = DATA_SOURCE_PREFIX + 0;
compactionConfigs.add(
@@ -1088,7 +1089,7 @@ public class CompactSegmentsTest
public void testCompactWithRollupInGranularitySpec()
{
final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class);
- final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, mockIndexingServiceClient);
+ final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, mockIndexingServiceClient);
final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
final String dataSource = DATA_SOURCE_PREFIX + 0;
compactionConfigs.add(
@@ -1192,7 +1193,7 @@ public class CompactSegmentsTest
Mockito.when(mockIndexingServiceClient.getActiveTasks()).thenReturn(ImmutableList.of(runningConflictCompactionTask));
Mockito.when(mockIndexingServiceClient.getTaskPayload(ArgumentMatchers.eq(conflictTaskId))).thenReturn(runningConflictCompactionTaskPayload);
- final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, mockIndexingServiceClient);
+ final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, mockIndexingServiceClient);
final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
compactionConfigs.add(
new DataSourceCompactionConfig(
@@ -1266,7 +1267,7 @@ public class CompactSegmentsTest
final TestDruidLeaderClient leaderClient = new TestDruidLeaderClient(JSON_MAPPER);
leaderClient.start();
final HttpIndexingServiceClient indexingServiceClient = new HttpIndexingServiceClient(JSON_MAPPER, leaderClient);
- final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, indexingServiceClient);
+ final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, indexingServiceClient);
final CoordinatorStats stats = doCompactSegments(compactSegments, createCompactionConfigs(2), 4);
Assert.assertEquals(4, stats.getGlobalStat(CompactSegments.AVAILABLE_COMPACTION_TASK_SLOT));
@@ -1300,7 +1301,7 @@ public class CompactSegmentsTest
// Verify that locked intervals are skipped and only one compaction task
// is submitted for dataSource_0
- CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, indexingServiceClient);
+ CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, indexingServiceClient);
final CoordinatorStats stats = doCompactSegments(compactSegments, createCompactionConfigs(2), 4);
Assert.assertEquals(1, stats.getGlobalStat(CompactSegments.COMPACTION_TASK_COUNT));
Assert.assertEquals(1, leaderClient.submittedCompactionTasks.size());
@@ -1318,7 +1319,7 @@ public class CompactSegmentsTest
{
NullHandling.initializeForTests();
final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class);
- final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, mockIndexingServiceClient);
+ final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, mockIndexingServiceClient);
final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
final String dataSource = DATA_SOURCE_PREFIX + 0;
compactionConfigs.add(
@@ -1381,7 +1382,7 @@ public class CompactSegmentsTest
public void testCompactWithoutCustomSpecs()
{
final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class);
- final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, mockIndexingServiceClient);
+ final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, mockIndexingServiceClient);
final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
final String dataSource = DATA_SOURCE_PREFIX + 0;
compactionConfigs.add(
@@ -1448,7 +1449,7 @@ public class CompactSegmentsTest
NullHandling.initializeForTests();
AggregatorFactory[] aggregatorFactories = new AggregatorFactory[] {new CountAggregatorFactory("cnt")};
final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class);
- final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, mockIndexingServiceClient);
+ final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, mockIndexingServiceClient);
final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
final String dataSource = DATA_SOURCE_PREFIX + 0;
compactionConfigs.add(
@@ -1532,7 +1533,7 @@ public class CompactSegmentsTest
.add(Intervals.of("2017/2018"));
// Verify that no locked intervals are skipped
- CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, indexingServiceClient);
+ CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, indexingServiceClient);
int maxTaskSlots = partitionsSpec instanceof SingleDimensionPartitionsSpec ? 5 : 3;
final CoordinatorStats stats = doCompactSegments(compactSegments, createCompactionConfigs(1), maxTaskSlots);
Assert.assertEquals(3, stats.getGlobalStat(CompactSegments.COMPACTION_TASK_COUNT));
@@ -1597,7 +1598,7 @@ public class CompactSegmentsTest
final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class);
- final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, mockIndexingServiceClient);
+ final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, mockIndexingServiceClient);
final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
compactionConfigs.add(
new DataSourceCompactionConfig(
@@ -1695,7 +1696,7 @@ public class CompactSegmentsTest
final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class);
- final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, mockIndexingServiceClient);
+ final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, mockIndexingServiceClient);
final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
compactionConfigs.add(
new DataSourceCompactionConfig(
@@ -1760,7 +1761,7 @@ public class CompactSegmentsTest
public void testCompactWithMetricsSpecShouldSetPreserveExistingMetricsTrue()
{
final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class);
- final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, mockIndexingServiceClient);
+ final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, mockIndexingServiceClient);
final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
final String dataSource = DATA_SOURCE_PREFIX + 0;
compactionConfigs.add(
@@ -1823,7 +1824,7 @@ public class CompactSegmentsTest
public void testCompactWithoutMetricsSpecShouldSetPreserveExistingMetricsFalse()
{
final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class);
- final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, mockIndexingServiceClient);
+ final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, mockIndexingServiceClient);
final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
final String dataSource = DATA_SOURCE_PREFIX + 0;
compactionConfigs.add(
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
index 7066183c3a..054d34422d 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
@@ -47,7 +47,9 @@ import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
import org.apache.druid.server.coordinator.LoadQueueTaskMaster;
import org.apache.druid.server.coordinator.RandomBalancerStrategyFactory;
import org.apache.druid.server.coordinator.TestDruidCoordinatorConfig;
+import org.apache.druid.server.coordinator.duty.CompactionSegmentSearchPolicy;
import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroups;
+import org.apache.druid.server.coordinator.duty.NewestSegmentFirstPolicy;
import org.apache.druid.server.coordinator.rules.Rule;
import org.apache.druid.server.lookup.cache.LookupCoordinatorManager;
import org.apache.druid.timeline.DataSegment;
@@ -78,7 +80,8 @@ public class CoordinatorSimulationBuilder
DataSegment.PruneSpecsHolder.DEFAULT
)
);
-
+ private static final CompactionSegmentSearchPolicy COMPACTION_SEGMENT_SEARCH_POLICY =
+ new NewestSegmentFirstPolicy(OBJECT_MAPPER);
private String balancerStrategy;
private CoordinatorDynamicConfig dynamicConfig =
CoordinatorDynamicConfig.builder()
@@ -210,7 +213,7 @@ public class CoordinatorSimulationBuilder
createBalancerStrategy(env),
env.lookupCoordinatorManager,
env.leaderSelector,
- OBJECT_MAPPER
+ COMPACTION_SEGMENT_SEARCH_POLICY
);
return new SimulationImpl(coordinator, env);
diff --git a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
index 841034e2be..5684c3871c 100644
--- a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
+++ b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
@@ -83,6 +83,7 @@ import org.apache.druid.server.coordinator.DruidCoordinator;
import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
import org.apache.druid.server.coordinator.KillStalePendingSegments;
import org.apache.druid.server.coordinator.LoadQueueTaskMaster;
+import org.apache.druid.server.coordinator.duty.CompactionSegmentSearchPolicy;
import org.apache.druid.server.coordinator.duty.CoordinatorCustomDuty;
import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroup;
import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroups;
@@ -93,6 +94,7 @@ import org.apache.druid.server.coordinator.duty.KillDatasourceMetadata;
import org.apache.druid.server.coordinator.duty.KillRules;
import org.apache.druid.server.coordinator.duty.KillSupervisors;
import org.apache.druid.server.coordinator.duty.KillUnusedSegments;
+import org.apache.druid.server.coordinator.duty.NewestSegmentFirstPolicy;
import org.apache.druid.server.http.ClusterResource;
import org.apache.druid.server.http.CompactionResource;
import org.apache.druid.server.http.CoordinatorCompactionConfigsResource;
@@ -312,6 +314,9 @@ public class CliCoordinator extends ServerRunnable
KillCompactionConfig.class
);
+ //TODO: make this configurable when there are multiple search policies
+ binder.bind(CompactionSegmentSearchPolicy.class).to(NewestSegmentFirstPolicy.class);
+
bindAnnouncer(
binder,
Coordinator.class,
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org