You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by su...@apache.org on 2023/10/30 17:43:04 UTC

(druid) branch master updated: LoadRules with 0 replicas should be treated as handoff complete (#15274)

This is an automated email from the ASF dual-hosted git repository.

suneet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new e6b7c36e740 LoadRules with 0 replicas should be treated as handoff complete (#15274)
e6b7c36e740 is described below

commit e6b7c36e740067742aa7f578e72e8a3c927572ca
Author: Suneet Saldanha <su...@apache.org>
AuthorDate: Mon Oct 30 10:42:58 2023 -0700

    LoadRules with 0 replicas should be treated as handoff complete (#15274)
    
    * LoadRules with 0 replicas should be treated as handoff complete
    
    * fix it
    
    * pr feedback
    
    * fixit
---
 .../druid/server/coordinator/rules/LoadRule.java   | 37 +++++++++++++++++++---
 .../druid/server/http/DataSourcesResource.java     | 15 +++++----
 .../server/coordinator/rules/LoadRuleTest.java     |  9 ++++--
 .../coordinator/rules/PeriodLoadRuleTest.java      |  1 +
 4 files changed, 49 insertions(+), 13 deletions(-)

diff --git a/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java b/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java
index 5d7b724c845..5463dd85498 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.server.coordinator.rules;
 
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.collect.ImmutableMap;
 import org.apache.druid.client.DruidServer;
@@ -40,11 +41,14 @@ public abstract class LoadRule implements Rule
    */
   private final boolean useDefaultTierForNull;
 
+  private final boolean shouldSegmentBeLoaded;
+
   protected LoadRule(Map<String, Integer> tieredReplicants, Boolean useDefaultTierForNull)
   {
     this.useDefaultTierForNull = Configs.valueOrDefault(useDefaultTierForNull, true);
     this.tieredReplicants = handleNullTieredReplicants(tieredReplicants, this.useDefaultTierForNull);
     validateTieredReplicants(this.tieredReplicants);
+    this.shouldSegmentBeLoaded = this.tieredReplicants.values().stream().reduce(0, Integer::sum) > 0;
   }
 
   @JsonProperty
@@ -65,6 +69,18 @@ public abstract class LoadRule implements Rule
     handler.replicateSegment(segment, getTieredReplicants());
   }
 
+
+  /**
+   * @return Whether a segment that matches this rule needs to be loaded on a tier.
+   *
+   * Used in making handoff decisions.
+   */
+  @JsonIgnore
+  public boolean shouldMatchingSegmentBeLoaded()
+  {
+    return shouldSegmentBeLoaded;
+  }
+
   /**
    * Returns the given {@code tieredReplicants} map unchanged if it is non-null (including empty).
    * Returns the following default values if the given map is null.
@@ -73,10 +89,16 @@ public abstract class LoadRule implements Rule
    * <li>If {@code useDefaultTierForNull} is false, returns an empty map. This causes segments to have a replication factor of 0 and not get assigned to any historical.</li>
    * </ul>
    */
-  private static Map<String, Integer> handleNullTieredReplicants(final Map<String, Integer> tieredReplicants, boolean useDefaultTierForNull)
+  private static Map<String, Integer> handleNullTieredReplicants(
+      final Map<String, Integer> tieredReplicants,
+      boolean useDefaultTierForNull
+  )
   {
     if (useDefaultTierForNull) {
-      return Configs.valueOrDefault(tieredReplicants, ImmutableMap.of(DruidServer.DEFAULT_TIER, DruidServer.DEFAULT_NUM_REPLICANTS));
+      return Configs.valueOrDefault(
+          tieredReplicants,
+          ImmutableMap.of(DruidServer.DEFAULT_TIER, DruidServer.DEFAULT_NUM_REPLICANTS)
+      );
     } else {
       return Configs.valueOrDefault(tieredReplicants, ImmutableMap.of());
     }
@@ -86,10 +108,17 @@ public abstract class LoadRule implements Rule
   {
     for (Map.Entry<String, Integer> entry : tieredReplicants.entrySet()) {
       if (entry.getValue() == null) {
-        throw InvalidInput.exception("Invalid number of replicas for tier [%s]. Value must not be null.", entry.getKey());
+        throw InvalidInput.exception(
+            "Invalid number of replicas for tier [%s]. Value must not be null.",
+            entry.getKey()
+        );
       }
       if (entry.getValue() < 0) {
-        throw InvalidInput.exception("Invalid number of replicas for tier [%s]. Value [%d] must be positive.", entry.getKey(), entry.getValue());
+        throw InvalidInput.exception(
+            "Invalid number of replicas for tier [%s]. Value [%d] must be positive.",
+            entry.getKey(),
+            entry.getValue()
+        );
       }
     }
   }
diff --git a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
index 301d9631b7d..e362745a72a 100644
--- a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
+++ b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
@@ -96,6 +96,7 @@ import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
 /**
+ *
  */
 @Path("/druid/coordinator/v1/datasources")
 public class DataSourcesResource
@@ -186,7 +187,8 @@ public class DataSourcesResource
   @ResourceFilters(DatasourceResourceFilter.class)
   public Response markAsUsedAllNonOvershadowedSegments(@PathParam("dataSourceName") final String dataSourceName)
   {
-    MarkSegments markSegments = () -> segmentsMetadataManager.markAsUsedAllNonOvershadowedSegmentsInDataSource(dataSourceName);
+    MarkSegments markSegments = () -> segmentsMetadataManager.markAsUsedAllNonOvershadowedSegmentsInDataSource(
+        dataSourceName);
     return doMarkSegments("markAsUsedAllNonOvershadowedSegments", dataSourceName, markSegments);
   }
 
@@ -480,7 +482,8 @@ public class DataSourcesResource
       return Response.ok(
           ImmutableMap.of(
               dataSourceName,
-              100 * ((double) (segmentsLoadStatistics.getNumLoadedSegments()) / (double) segmentsLoadStatistics.getNumPublishedSegments())
+              100 * ((double) (segmentsLoadStatistics.getNumLoadedSegments())
+                     / (double) segmentsLoadStatistics.getNumPublishedSegments())
           )
       ).build();
     }
@@ -873,16 +876,14 @@ public class DataSourcesResource
       final DateTime now = DateTimes.nowUtc();
 
       // A segment that is not eligible for load will never be handed off
-      boolean notEligibleForLoad = true;
+      boolean eligibleForLoad = false;
       for (Rule rule : rules) {
         if (rule.appliesTo(theInterval, now)) {
-          if (rule instanceof LoadRule) {
-            notEligibleForLoad = false;
-          }
+          eligibleForLoad = rule instanceof LoadRule && ((LoadRule) rule).shouldMatchingSegmentBeLoaded();
           break;
         }
       }
-      if (notEligibleForLoad) {
+      if (!eligibleForLoad) {
         return Response.ok(true).build();
       }
 
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/rules/LoadRuleTest.java b/server/src/test/java/org/apache/druid/server/coordinator/rules/LoadRuleTest.java
index 801df8ebd76..1e43d89bdda 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/rules/LoadRuleTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/rules/LoadRuleTest.java
@@ -114,6 +114,7 @@ public class LoadRuleTest
 
     final DataSegment segment = createDataSegment(DS_WIKI);
     LoadRule rule = loadForever(ImmutableMap.of(Tier.T1, 1, Tier.T2, 2));
+    Assert.assertTrue(rule.shouldMatchingSegmentBeLoaded());
     CoordinatorRunStats stats = runRuleAndGetStats(rule, segment, druidCluster);
 
     Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, Tier.T1, DS_WIKI));
@@ -267,6 +268,7 @@ public class LoadRuleTest
         .build();
 
     LoadRule rule = loadForever(ImmutableMap.of(Tier.T1, 0, Tier.T2, 0));
+    Assert.assertFalse(rule.shouldMatchingSegmentBeLoaded());
     CoordinatorRunStats stats = runRuleAndGetStats(rule, segment, druidCluster);
 
     Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.DROPPED, Tier.T1, DS_WIKI));
@@ -284,7 +286,7 @@ public class LoadRuleTest
 
     final DataSegment segment = createDataSegment(DS_WIKI);
     LoadRule rule = loadForever(ImmutableMap.of("invalidTier", 1, Tier.T1, 1));
-
+    Assert.assertTrue(rule.shouldMatchingSegmentBeLoaded());
     CoordinatorRunStats stats = runRuleAndGetStats(rule, segment, druidCluster);
     Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, Tier.T1, DS_WIKI));
     Assert.assertEquals(0L, stats.getSegmentStat(Stats.Segments.ASSIGNED, "invalidTier", DS_WIKI));
@@ -347,6 +349,7 @@ public class LoadRuleTest
         .build();
 
     final LoadRule rule = loadForever(ImmutableMap.of(Tier.T1, 1));
+    Assert.assertTrue(rule.shouldMatchingSegmentBeLoaded());
     CoordinatorRunStats stats1 = runRuleAndGetStats(rule, dataSegment1, params);
     CoordinatorRunStats stats2 = runRuleAndGetStats(rule, dataSegment2, params);
     CoordinatorRunStats stats3 = runRuleAndGetStats(rule, dataSegment3, params);
@@ -370,6 +373,7 @@ public class LoadRuleTest
 
     // Load rule requires 1 replica on each tier
     LoadRule rule = loadForever(ImmutableMap.of(Tier.T1, 1, Tier.T2, 1));
+    Assert.assertTrue(rule.shouldMatchingSegmentBeLoaded());
     DataSegment segment = createDataSegment(DS_WIKI);
     CoordinatorRunStats stats = runRuleAndGetStats(rule, segment, druidCluster);
 
@@ -427,7 +431,7 @@ public class LoadRuleTest
 
     DruidCoordinatorRuntimeParams params = makeCoordinatorRuntimeParams(druidCluster, segment1, segment2);
     final LoadRule rule = loadForever(ImmutableMap.of(Tier.T1, 0));
-
+    Assert.assertFalse(rule.shouldMatchingSegmentBeLoaded());
     CoordinatorRunStats stats = runRuleAndGetStats(rule, segment1, params);
     Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.DROPPED, Tier.T1, segment1.getDataSource()));
     Assert.assertTrue(server1.getPeon().getSegmentsToDrop().contains(segment1));
@@ -531,6 +535,7 @@ public class LoadRuleTest
   {
     EqualsVerifier.forClass(LoadRule.class)
                   .withNonnullFields("tieredReplicants")
+                  .withIgnoredFields("shouldSegmentBeLoaded")
                   .usingGetClass()
                   .verify();
   }
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/rules/PeriodLoadRuleTest.java b/server/src/test/java/org/apache/druid/server/coordinator/rules/PeriodLoadRuleTest.java
index 7b1dd2085f0..86ef92a3ed5 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/rules/PeriodLoadRuleTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/rules/PeriodLoadRuleTest.java
@@ -280,6 +280,7 @@ public class PeriodLoadRuleTest
   {
     EqualsVerifier.forClass(PeriodLoadRule.class)
                   .withNonnullFields("tieredReplicants")
+                  .withIgnoredFields("shouldSegmentBeLoaded")
                   .usingGetClass()
                   .verify();
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org