You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by "clintropolis (via GitHub)" <gi...@apache.org> on 2023/02/28 05:21:10 UTC

[GitHub] [druid] clintropolis commented on a diff in pull request #13852: Continuous automatic compaction

clintropolis commented on code in PR #13852:
URL: https://github.com/apache/druid/pull/13852#discussion_r1119537778


##########
server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicy.java:
##########
@@ -20,34 +20,78 @@
 package org.apache.druid.server.coordinator.duty;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
 import com.google.inject.Inject;
+import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
+import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
 import org.apache.druid.timeline.SegmentTimeline;
 import org.joda.time.Interval;
 
+import java.time.Clock;
 import java.util.List;
 import java.util.Map;
 
 /**
  * This policy searches segments for compaction from the newest one to oldest one.
+ * The {@link #resetIfNeeded} functionality is inspired by {@link com.google.common.base.Suppliers.ExpiringMemoizingSupplier}.
  */
 public class NewestSegmentFirstPolicy implements CompactionSegmentSearchPolicy
 {
   private final ObjectMapper objectMapper;
+  private final long durationMillis;
+  private transient volatile NewestSegmentFirstIterator iterator;

Review Comment:
   is `transient` really needed here? (tbh im not very familiar with this keyword... 😅 )
   
   Afaict its related to `java.io.Serializable` which afaik we don't really use, but i could totally be wrong. I see barely any places using it in our codebase anyway...



##########
server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicy.java:
##########
@@ -20,34 +20,78 @@
 package org.apache.druid.server.coordinator.duty;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
 import com.google.inject.Inject;
+import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
+import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
 import org.apache.druid.timeline.SegmentTimeline;
 import org.joda.time.Interval;
 
+import java.time.Clock;
 import java.util.List;
 import java.util.Map;
 
 /**
  * This policy searches segments for compaction from the newest one to oldest one.
+ * The {@link #resetIfNeeded} functionality is inspired by {@link com.google.common.base.Suppliers.ExpiringMemoizingSupplier}.
  */
 public class NewestSegmentFirstPolicy implements CompactionSegmentSearchPolicy
 {
   private final ObjectMapper objectMapper;
+  private final long durationMillis;
+  private transient volatile NewestSegmentFirstIterator iterator;
+  // The special value 0 means "not yet initialized".
+  private transient volatile long expirationMillis;
+  private final Clock clock;
 
   @Inject
-  public NewestSegmentFirstPolicy(ObjectMapper objectMapper)
+  public NewestSegmentFirstPolicy(ObjectMapper objectMapper, DruidCoordinatorConfig config, Clock clock)
   {
     this.objectMapper = objectMapper;
+    this.durationMillis = config.getCompactionSearchPolicyRefreshPeriod().getMillis();
+    this.clock = clock;
+    Preconditions.checkArgument(durationMillis > 0);
   }
 
   @Override
-  public CompactionSegmentIterator reset(
+  public Pair<CompactionSegmentIterator, Boolean> resetIfNeeded(
       Map<String, DataSourceCompactionConfig> compactionConfigs,
       Map<String, SegmentTimeline> dataSources,
       Map<String, List<Interval>> skipIntervals
   )
   {
-    return new NewestSegmentFirstIterator(objectMapper, compactionConfigs, dataSources, skipIntervals);
+    long millis = expirationMillis;
+    long now = clock.millis();
+    if (millis == 0 || now - millis >= 0) {
+      synchronized (this) {

Review Comment:
   why does this need synchronized (or why doesn't `reset` need synchronized?)



##########
server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java:
##########
@@ -137,4 +137,7 @@ public boolean getCompactionSkipLockedIntervals()
     return true;
   }
 
+  @Config("druid.coordinator.compaction.searchPolicyRefreshPeriod")
+  @Default("PT5M")

Review Comment:
   curious, why here instead of on the `CompactSegments` duty? I guess that would make the property `druid.coordinator.compaction.duty.searchPolicyRefreshPeriod` instead
   
   side note, coordinator config in general seems really complicated 😅 I had to read a bunch of code to understand how custom duties work and get wired up to stuff... and its kind of strange.
   
   I guess where I am getting at is that it seems like having this refresh period be more frequent than the duty period seems like it would be an incorrect configuration (or at least useless since it would always reset), but I'm not entirely sure how such a check could actually be wired up. Maybe if the duty period was added to the properties that get injected so the compaction duty could pick it up or something?
   
   There is also `CoordinatorCompactionConfig` to add to the confusion... not to mention `druid.coordinator.kill.compaction.period` which _does_ live here...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org