You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ma...@apache.org on 2022/02/19 07:03:25 UTC

[druid] branch master updated: Allow coordinator run auto compaction duty period to be configured separately from other indexing duties (#12263)

This is an automated email from the ASF dual-hosted git repository.

maytasm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 6e2eded  Allow coordinator run auto compaction duty period to be configured separately from other indexing duties (#12263)
6e2eded is described below

commit 6e2eded277bec907466ce0b7823ce70fae6e388e
Author: Maytas Monsereenusorn <ma...@apache.org>
AuthorDate: Fri Feb 18 23:02:57 2022 -0800

    Allow coordinator run auto compaction duty period to be configured separately from other indexing duties (#12263)
    
    * add impl
    
    * add impl
    
    * add unit tests
    
    * add impl
    
    * add impl
    
    * add serde test
    
    * add tests
    
    * add docs
    
    * fix test
    
    * fix test
    
    * fix docs
    
    * fix docs
    
    * fix spelling
---
 docs/design/coordinator.md                         |   7 +
 .../druid/server/coordinator/DruidCoordinator.java |  47 ++++-
 .../server/coordinator/duty/CompactSegments.java   |  23 ++-
 .../coordinator/duty/CoordinatorCustomDuty.java    |   1 +
 .../server/coordinator/DruidCoordinatorTest.java   | 226 ++++++++++++++++++++-
 .../coordinator/TestDruidCoordinatorConfig.java    |  52 ++++-
 .../coordinator/duty/CompactSegmentsTest.java      |  24 +++
 7 files changed, 365 insertions(+), 15 deletions(-)

diff --git a/docs/design/coordinator.md b/docs/design/coordinator.md
index d5769a9..a3d33ec 100644
--- a/docs/design/coordinator.md
+++ b/docs/design/coordinator.md
@@ -98,6 +98,13 @@ Compaction tasks might fail due to the following reasons.
 
 Once a compaction task fails, the Coordinator simply checks the segments in the interval of the failed task again, and issues another compaction task in the next run.
 
+Note that Compacting Segments Coordinator Duty is automatically enabled and run as part of the Indexing Service Duties group. However, Compacting Segments Coordinator Duty can be configured to run in isolation as a separate coordinator duty group. This allows changing the period of Compacting Segments Coordinator Duty without impacting the period of other Indexing Service Duties. This can be done by setting the following properties (for more details see [custom pluggable Coordinator Duty [...]
+```
+druid.coordinator.dutyGroups=[<SOME_GROUP_NAME>]
+druid.coordinator.<SOME_GROUP_NAME>.duties=["compactSegments"]
+druid.coordinator.<SOME_GROUP_NAME>.period=<PERIOD_TO_RUN_COMPACTING_SEGMENTS_DUTY>
+```
+
 ### Segment search policy
 
 #### Recent segment first policy
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
index c901cb9..8c060ee 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.server.coordinator;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Ordering;
@@ -158,7 +159,7 @@ public class DruidCoordinator
   private final BalancerStrategyFactory factory;
   private final LookupCoordinatorManager lookupCoordinatorManager;
   private final DruidLeaderSelector coordLeaderSelector;
-
+  private final ObjectMapper objectMapper;
   private final CompactSegments compactSegments;
 
   private volatile boolean started = false;
@@ -194,7 +195,7 @@ public class DruidCoordinator
       BalancerStrategyFactory factory,
       LookupCoordinatorManager lookupCoordinatorManager,
       @Coordinator DruidLeaderSelector coordLeaderSelector,
-      CompactSegments compactSegments,
+      ObjectMapper objectMapper,
       ZkEnablementConfig zkEnablementConfig
   )
   {
@@ -219,7 +220,7 @@ public class DruidCoordinator
         factory,
         lookupCoordinatorManager,
         coordLeaderSelector,
-        compactSegments,
+        objectMapper,
         zkEnablementConfig
     );
   }
@@ -245,7 +246,7 @@ public class DruidCoordinator
       BalancerStrategyFactory factory,
       LookupCoordinatorManager lookupCoordinatorManager,
       DruidLeaderSelector coordLeaderSelector,
-      CompactSegments compactSegments,
+      ObjectMapper objectMapper,
       ZkEnablementConfig zkEnablementConfig
   )
   {
@@ -276,8 +277,8 @@ public class DruidCoordinator
     this.factory = factory;
     this.lookupCoordinatorManager = lookupCoordinatorManager;
     this.coordLeaderSelector = coordLeaderSelector;
-
-    this.compactSegments = compactSegments;
+    this.objectMapper = objectMapper;
+    this.compactSegments = initializeCompactSegmentsDuty();
   }
 
   public boolean isLeader()
@@ -769,14 +770,17 @@ public class DruidCoordinator
     );
   }
 
-  private List<CoordinatorDuty> makeIndexingServiceDuties()
+  @VisibleForTesting
+  List<CoordinatorDuty> makeIndexingServiceDuties()
   {
     List<CoordinatorDuty> duties = new ArrayList<>();
     duties.add(new LogUsedSegments());
     duties.addAll(indexingServiceDuties);
     // CompactSegmentsDuty should be the last duty as it can take a long time to complete
-    duties.addAll(makeCompactSegmentsDuty());
-
+    // We do not have to add compactSegments if it is already enabled in the custom duty group
+    if (getCompactSegmentsDutyFromCustomGroups().isEmpty()) {
+      duties.addAll(makeCompactSegmentsDuty());
+    }
     log.debug(
         "Done making indexing service duties %s",
         duties.stream().map(duty -> duty.getClass().getName()).collect(Collectors.toList())
@@ -797,6 +801,31 @@ public class DruidCoordinator
     return ImmutableList.copyOf(duties);
   }
 
+  @VisibleForTesting
+  CompactSegments initializeCompactSegmentsDuty()
+  {
+    List<CompactSegments> compactSegmentsDutyFromCustomGroups = getCompactSegmentsDutyFromCustomGroups();
+    if (compactSegmentsDutyFromCustomGroups.isEmpty()) {
+      return new CompactSegments(config, objectMapper, indexingServiceClient);
+    } else {
+      if (compactSegmentsDutyFromCustomGroups.size() > 1) {
+        log.warn("More than one compactSegments duty is configured in the Coordinator Custom Duty Group. The first duty will be picked up.");
+      }
+      return compactSegmentsDutyFromCustomGroups.get(0);
+    }
+  }
+
+  @VisibleForTesting
+  List<CompactSegments> getCompactSegmentsDutyFromCustomGroups()
+  {
+    return customDutyGroups.getCoordinatorCustomDutyGroups()
+                           .stream()
+                           .flatMap(coordinatorCustomDutyGroup -> coordinatorCustomDutyGroup.getCustomDutyList().stream())
+                           .filter(duty -> duty instanceof CompactSegments)
+                           .map(duty -> (CompactSegments) duty)
+                           .collect(Collectors.toList());
+  }
+
   private List<CoordinatorDuty> makeCompactSegmentsDuty()
   {
     return ImmutableList.of(compactSegments);
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
index 6598a6e..e3f47dd 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
@@ -19,6 +19,8 @@
 
 package org.apache.druid.server.coordinator.duty;
 
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.inject.Inject;
@@ -56,7 +58,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
-public class CompactSegments implements CoordinatorDuty
+public class CompactSegments implements CoordinatorCustomDuty
 {
   static final String COMPACTION_TASK_COUNT = "compactTaskCount";
   static final String AVAILABLE_COMPACTION_TASK_SLOT = "availableCompactionTaskSlot";
@@ -90,10 +92,11 @@ public class CompactSegments implements CoordinatorDuty
   private final AtomicReference<Map<String, AutoCompactionSnapshot>> autoCompactionSnapshotPerDataSource = new AtomicReference<>();
 
   @Inject
+  @JsonCreator
   public CompactSegments(
-      DruidCoordinatorConfig config,
-      ObjectMapper objectMapper,
-      IndexingServiceClient indexingServiceClient
+      @JacksonInject DruidCoordinatorConfig config,
+      @JacksonInject ObjectMapper objectMapper,
+      @JacksonInject IndexingServiceClient indexingServiceClient
   )
   {
     this.policy = new NewestSegmentFirstPolicy(objectMapper);
@@ -104,6 +107,18 @@ public class CompactSegments implements CoordinatorDuty
     LOG.info("Scheduling compaction with skipLockedIntervals [%s]", skipLockedIntervals);
   }
 
+  @VisibleForTesting
+  public boolean isSkipLockedIntervals()
+  {
+    return skipLockedIntervals;
+  }
+
+  @VisibleForTesting
+  IndexingServiceClient getIndexingServiceClient()
+  {
+    return indexingServiceClient;
+  }
+
   @Override
   public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
   {
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CoordinatorCustomDuty.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CoordinatorCustomDuty.java
index 86e2032..d481714 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CoordinatorCustomDuty.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CoordinatorCustomDuty.java
@@ -51,6 +51,7 @@ import org.apache.druid.initialization.DruidModule;
 @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
 @JsonSubTypes({
     @JsonSubTypes.Type(name = "killSupervisors", value = KillSupervisorsCustomDuty.class),
+    @JsonSubTypes.Type(name = "compactSegments", value = CompactSegments.class),
 })
 @ExtensionPoint
 public interface CoordinatorCustomDuty extends CoordinatorDuty
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
index ec0201b..0363748 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
@@ -52,9 +52,12 @@ import org.apache.druid.metadata.SegmentsMetadataManager;
 import org.apache.druid.server.DruidNode;
 import org.apache.druid.server.coordination.DruidServerMetadata;
 import org.apache.druid.server.coordination.ServerType;
+import org.apache.druid.server.coordinator.duty.CompactSegments;
 import org.apache.druid.server.coordinator.duty.CoordinatorCustomDuty;
 import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroup;
 import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroups;
+import org.apache.druid.server.coordinator.duty.CoordinatorDuty;
+import org.apache.druid.server.coordinator.duty.KillSupervisorsCustomDuty;
 import org.apache.druid.server.coordinator.rules.ForeverBroadcastDistributionRule;
 import org.apache.druid.server.coordinator.rules.ForeverLoadRule;
 import org.apache.druid.server.coordinator.rules.IntervalLoadRule;
@@ -74,6 +77,7 @@ import org.junit.Test;
 import javax.annotation.Nullable;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -735,7 +739,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
     EasyMock.replay(configManager, dynamicConfig, scheduledExecutorFactory);
 
     DruidCoordinator c = new DruidCoordinator(
-        null,
+        druidCoordinatorConfig,
         null,
         configManager,
         null,
@@ -786,6 +790,226 @@ public class DruidCoordinatorTest extends CuratorTestBase
     Assert.assertFalse(firstExec == thirdExec);
   }
 
+  @Test
+  public void testCompactSegmentsDutyWhenCustomDutyGroupEmpty()
+  {
+    CoordinatorCustomDutyGroups emptyCustomDutyGroups = new CoordinatorCustomDutyGroups(ImmutableSet.of());
+    coordinator = new DruidCoordinator(
+        druidCoordinatorConfig,
+        new ZkPathsConfig()
+        {
+
+          @Override
+          public String getBase()
+          {
+            return "druid";
+          }
+        },
+        null,
+        segmentsMetadataManager,
+        serverInventoryView,
+        metadataRuleManager,
+        () -> curator,
+        serviceEmitter,
+        scheduledExecutorFactory,
+        null,
+        null,
+        new NoopServiceAnnouncer()
+        {
+          @Override
+          public void announce(DruidNode node)
+          {
+            // count down when this coordinator becomes the leader
+            leaderAnnouncerLatch.countDown();
+          }
+
+          @Override
+          public void unannounce(DruidNode node)
+          {
+            leaderUnannouncerLatch.countDown();
+          }
+        },
+        druidNode,
+        loadManagementPeons,
+        ImmutableSet.of(),
+        new HashSet<>(),
+        emptyCustomDutyGroups,
+        new CostBalancerStrategyFactory(),
+        EasyMock.createNiceMock(LookupCoordinatorManager.class),
+        new TestDruidLeaderSelector(),
+        null,
+        ZkEnablementConfig.ENABLED
+    );
+    // Since CompactSegments is not enabled in Custom Duty Group, then CompactSegments must be created in IndexingServiceDuties
+    List<CoordinatorDuty> indexingDuties = coordinator.makeIndexingServiceDuties();
+    Assert.assertTrue(indexingDuties.stream().anyMatch(coordinatorDuty -> coordinatorDuty instanceof CompactSegments));
+
+    // CompactSegments should not exist in Custom Duty Group
+    List<CompactSegments> compactSegmentsDutyFromCustomGroups = coordinator.getCompactSegmentsDutyFromCustomGroups();
+    Assert.assertTrue(compactSegmentsDutyFromCustomGroups.isEmpty());
+
+    // CompactSegments returned by this method should be created using the DruidCoordinatorConfig in the DruidCoordinator
+    CompactSegments duty = coordinator.initializeCompactSegmentsDuty();
+    Assert.assertNotNull(duty);
+    Assert.assertEquals(druidCoordinatorConfig.getCompactionSkipLockedIntervals(), duty.isSkipLockedIntervals());
+  }
+
+  @Test
+  public void testInitializeCompactSegmentsDutyWhenCustomDutyGroupDoesNotContainsCompactSegments()
+  {
+    CoordinatorCustomDutyGroup group = new CoordinatorCustomDutyGroup("group1", Duration.standardSeconds(1), ImmutableList.of(new KillSupervisorsCustomDuty(new Duration("PT1S"), null)));
+    CoordinatorCustomDutyGroups customDutyGroups = new CoordinatorCustomDutyGroups(ImmutableSet.of(group));
+    coordinator = new DruidCoordinator(
+        druidCoordinatorConfig,
+        new ZkPathsConfig()
+        {
+
+          @Override
+          public String getBase()
+          {
+            return "druid";
+          }
+        },
+        null,
+        segmentsMetadataManager,
+        serverInventoryView,
+        metadataRuleManager,
+        () -> curator,
+        serviceEmitter,
+        scheduledExecutorFactory,
+        null,
+        null,
+        new NoopServiceAnnouncer()
+        {
+          @Override
+          public void announce(DruidNode node)
+          {
+            // count down when this coordinator becomes the leader
+            leaderAnnouncerLatch.countDown();
+          }
+
+          @Override
+          public void unannounce(DruidNode node)
+          {
+            leaderUnannouncerLatch.countDown();
+          }
+        },
+        druidNode,
+        loadManagementPeons,
+        ImmutableSet.of(),
+        new HashSet<>(),
+        customDutyGroups,
+        new CostBalancerStrategyFactory(),
+        EasyMock.createNiceMock(LookupCoordinatorManager.class),
+        new TestDruidLeaderSelector(),
+        null,
+        ZkEnablementConfig.ENABLED
+    );
+    // Since CompactSegments is not enabled in Custom Duty Group, then CompactSegments must be created in IndexingServiceDuties
+    List<CoordinatorDuty> indexingDuties = coordinator.makeIndexingServiceDuties();
+    Assert.assertTrue(indexingDuties.stream().anyMatch(coordinatorDuty -> coordinatorDuty instanceof CompactSegments));
+
+    // CompactSegments should not exist in Custom Duty Group
+    List<CompactSegments> compactSegmentsDutyFromCustomGroups = coordinator.getCompactSegmentsDutyFromCustomGroups();
+    Assert.assertTrue(compactSegmentsDutyFromCustomGroups.isEmpty());
+
+    // CompactSegments returned by this method should be created using the DruidCoordinatorConfig in the DruidCoordinator
+    CompactSegments duty = coordinator.initializeCompactSegmentsDuty();
+    Assert.assertNotNull(duty);
+    Assert.assertEquals(druidCoordinatorConfig.getCompactionSkipLockedIntervals(), duty.isSkipLockedIntervals());
+  }
+
+  @Test
+  public void testInitializeCompactSegmentsDutyWhenCustomDutyGroupContainsCompactSegments()
+  {
+    DruidCoordinatorConfig differentConfigUsedInCustomGroup = new TestDruidCoordinatorConfig(
+        new Duration(COORDINATOR_START_DELAY),
+        new Duration(COORDINATOR_PERIOD),
+        null,
+        null,
+        null,
+        new Duration(COORDINATOR_PERIOD),
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        10,
+        new Duration("PT0s"),
+        false
+    );
+    CoordinatorCustomDutyGroup compactSegmentCustomGroup = new CoordinatorCustomDutyGroup("group1", Duration.standardSeconds(1), ImmutableList.of(new CompactSegments(differentConfigUsedInCustomGroup, null, null)));
+    CoordinatorCustomDutyGroups customDutyGroups = new CoordinatorCustomDutyGroups(ImmutableSet.of(compactSegmentCustomGroup));
+    coordinator = new DruidCoordinator(
+        druidCoordinatorConfig,
+        new ZkPathsConfig()
+        {
+
+          @Override
+          public String getBase()
+          {
+            return "druid";
+          }
+        },
+        null,
+        segmentsMetadataManager,
+        serverInventoryView,
+        metadataRuleManager,
+        () -> curator,
+        serviceEmitter,
+        scheduledExecutorFactory,
+        null,
+        null,
+        new NoopServiceAnnouncer()
+        {
+          @Override
+          public void announce(DruidNode node)
+          {
+            // count down when this coordinator becomes the leader
+            leaderAnnouncerLatch.countDown();
+          }
+
+          @Override
+          public void unannounce(DruidNode node)
+          {
+            leaderUnannouncerLatch.countDown();
+          }
+        },
+        druidNode,
+        loadManagementPeons,
+        ImmutableSet.of(),
+        new HashSet<>(),
+        customDutyGroups,
+        new CostBalancerStrategyFactory(),
+        EasyMock.createNiceMock(LookupCoordinatorManager.class),
+        new TestDruidLeaderSelector(),
+        null,
+        ZkEnablementConfig.ENABLED
+    );
+    // Since CompactSegments is enabled in Custom Duty Group, then CompactSegments must not be created in IndexingServiceDuties
+    List<CoordinatorDuty> indexingDuties = coordinator.makeIndexingServiceDuties();
+    Assert.assertTrue(indexingDuties.stream().noneMatch(coordinatorDuty -> coordinatorDuty instanceof CompactSegments));
+
+    // CompactSegments should exist in Custom Duty Group
+    List<CompactSegments> compactSegmentsDutyFromCustomGroups = coordinator.getCompactSegmentsDutyFromCustomGroups();
+    Assert.assertFalse(compactSegmentsDutyFromCustomGroups.isEmpty());
+    Assert.assertEquals(1, compactSegmentsDutyFromCustomGroups.size());
+    Assert.assertNotNull(compactSegmentsDutyFromCustomGroups.get(0));
+    Assert.assertTrue(compactSegmentsDutyFromCustomGroups.get(0) instanceof CompactSegments);
+
+    // CompactSegments returned by this method should be from the Custom Duty Group
+    CompactSegments duty = coordinator.initializeCompactSegmentsDuty();
+    Assert.assertNotNull(duty);
+    Assert.assertNotEquals(druidCoordinatorConfig.getCompactionSkipLockedIntervals(), duty.isSkipLockedIntervals());
+    // We should get the CompactSegment from the custom duty group which was created with a different config than the config in DruidCoordinator
+    Assert.assertEquals(differentConfigUsedInCustomGroup.getCompactionSkipLockedIntervals(), duty.isSkipLockedIntervals());
+  }
+
   @Test(timeout = 3000)
   public void testCoordinatorCustomDutyGroupsRunAsExpected() throws Exception
   {
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java b/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java
index 196e205..abb500e 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java
@@ -23,7 +23,6 @@ import org.joda.time.Duration;
 
 public class TestDruidCoordinatorConfig extends DruidCoordinatorConfig
 {
-
   private final Duration coordinatorStartDelay;
   private final Duration coordinatorPeriod;
   private final Duration coordinatorIndexingPeriod;
@@ -42,6 +41,7 @@ public class TestDruidCoordinatorConfig extends DruidCoordinatorConfig
   private final Duration coordinatorDatasourceKillDurationToRetain;
   private final Duration getLoadQueuePeonRepeatDelay;
   private final int coordinatorKillMaxSegments;
+  private final boolean compactionSkipLockedIntervals;
 
   public TestDruidCoordinatorConfig(
       Duration coordinatorStartDelay,
@@ -82,6 +82,50 @@ public class TestDruidCoordinatorConfig extends DruidCoordinatorConfig
     this.coordinatorDatasourceKillDurationToRetain = coordinatorDatasourceKillDurationToRetain;
     this.coordinatorKillMaxSegments = coordinatorKillMaxSegments;
     this.getLoadQueuePeonRepeatDelay = getLoadQueuePeonRepeatDelay;
+    this.compactionSkipLockedIntervals = true;
+  }
+
+  public TestDruidCoordinatorConfig(
+      Duration coordinatorStartDelay,
+      Duration coordinatorPeriod,
+      Duration coordinatorIndexingPeriod,
+      Duration metadataStoreManagementPeriod,
+      Duration loadTimeoutDelay,
+      Duration coordinatorKillPeriod,
+      Duration coordinatorKillDurationToRetain,
+      Duration coordinatorSupervisorKillPeriod,
+      Duration coordinatorSupervisorKillDurationToRetain,
+      Duration coordinatorAuditKillPeriod,
+      Duration coordinatorAuditKillDurationToRetain,
+      Duration coordinatorCompactionKillPeriod,
+      Duration coordinatorRuleKillPeriod,
+      Duration coordinatorRuleKillDurationToRetain,
+      Duration coordinatorDatasourceKillPeriod,
+      Duration coordinatorDatasourceKillDurationToRetain,
+      int coordinatorKillMaxSegments,
+      Duration getLoadQueuePeonRepeatDelay,
+      boolean compactionSkipLockedIntervals
+  )
+  {
+    this.coordinatorStartDelay = coordinatorStartDelay;
+    this.coordinatorPeriod = coordinatorPeriod;
+    this.coordinatorIndexingPeriod = coordinatorIndexingPeriod;
+    this.metadataStoreManagementPeriod = metadataStoreManagementPeriod;
+    this.loadTimeoutDelay = loadTimeoutDelay;
+    this.coordinatorKillPeriod = coordinatorKillPeriod;
+    this.coordinatorKillDurationToRetain = coordinatorKillDurationToRetain;
+    this.coordinatorSupervisorKillPeriod = coordinatorSupervisorKillPeriod;
+    this.coordinatorSupervisorKillDurationToRetain = coordinatorSupervisorKillDurationToRetain;
+    this.coordinatorAuditKillPeriod = coordinatorAuditKillPeriod;
+    this.coordinatorAuditKillDurationToRetain = coordinatorAuditKillDurationToRetain;
+    this.coordinatorCompactionKillPeriod = coordinatorCompactionKillPeriod;
+    this.coordinatorRuleKillPeriod = coordinatorRuleKillPeriod;
+    this.coordinatorRuleKillDurationToRetain = coordinatorRuleKillDurationToRetain;
+    this.coordinatorDatasourceKillPeriod = coordinatorDatasourceKillPeriod;
+    this.coordinatorDatasourceKillDurationToRetain = coordinatorDatasourceKillDurationToRetain;
+    this.coordinatorKillMaxSegments = coordinatorKillMaxSegments;
+    this.getLoadQueuePeonRepeatDelay = getLoadQueuePeonRepeatDelay;
+    this.compactionSkipLockedIntervals = compactionSkipLockedIntervals;
   }
 
   @Override
@@ -191,4 +235,10 @@ public class TestDruidCoordinatorConfig extends DruidCoordinatorConfig
   {
     return getLoadQueuePeonRepeatDelay;
   }
+
+  @Override
+  public boolean getCompactionSkipLockedIntervals()
+  {
+    return compactionSkipLockedIntervals;
+  }
 }
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
index 2892d44..bd4e601 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
@@ -21,6 +21,7 @@ package org.apache.druid.server.coordinator.duty;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.InjectableValues;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
@@ -38,6 +39,7 @@ import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
 import org.apache.druid.client.indexing.ClientCompactionTaskTransformSpec;
 import org.apache.druid.client.indexing.ClientTaskQuery;
 import org.apache.druid.client.indexing.HttpIndexingServiceClient;
+import org.apache.druid.client.indexing.IndexingServiceClient;
 import org.apache.druid.client.indexing.IndexingTotalWorkerCapacityInfo;
 import org.apache.druid.client.indexing.IndexingWorker;
 import org.apache.druid.client.indexing.IndexingWorkerInfo;
@@ -238,6 +240,28 @@ public class CompactSegmentsTest
   }
 
   @Test
+  public void testSerde() throws Exception
+  {
+    final TestDruidLeaderClient leaderClient = new TestDruidLeaderClient(JSON_MAPPER);
+    final HttpIndexingServiceClient indexingServiceClient = new HttpIndexingServiceClient(JSON_MAPPER, leaderClient);
+
+    JSON_MAPPER.setInjectableValues(
+        new InjectableValues.Std()
+            .addValue(DruidCoordinatorConfig.class, COORDINATOR_CONFIG)
+            .addValue(ObjectMapper.class, JSON_MAPPER)
+            .addValue(IndexingServiceClient.class, indexingServiceClient)
+    );
+
+    final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, indexingServiceClient);
+    String compactSegmentString = JSON_MAPPER.writeValueAsString(compactSegments);
+    CompactSegments serdeCompactSegments = JSON_MAPPER.readValue(compactSegmentString, CompactSegments.class);
+
+    Assert.assertNotNull(serdeCompactSegments);
+    Assert.assertEquals(COORDINATOR_CONFIG.getCompactionSkipLockedIntervals(), serdeCompactSegments.isSkipLockedIntervals());
+    Assert.assertEquals(indexingServiceClient, serdeCompactSegments.getIndexingServiceClient());
+  }
+
+  @Test
   public void testRun()
   {
     final TestDruidLeaderClient leaderClient = new TestDruidLeaderClient(JSON_MAPPER);

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org