You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2018/11/01 16:43:37 UTC

[GitHub] jihoonson closed pull request #6414: Period load/drop/broadcast rules should include the future by default

jihoonson closed pull request #6414: Period load/drop/broadcast rules should include the future by default
URL: https://github.com/apache/incubator-druid/pull/6414
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/content/operations/rule-configuration.md b/docs/content/operations/rule-configuration.md
index 404e583663f..f41823c8c2c 100644
--- a/docs/content/operations/rule-configuration.md
+++ b/docs/content/operations/rule-configuration.md
@@ -64,6 +64,7 @@ Period load rules are of the form:
 {
   "type" : "loadByPeriod",
   "period" : "P1M",
+  "includeFuture" : true,
   "tieredReplicants": {
       "hot": 1,
       "_default_tier" : 1
@@ -73,9 +74,10 @@ Period load rules are of the form:
 
 * `type` - this should always be "loadByPeriod"
 * `period` - A JSON Object representing ISO-8601 Periods
+* `includeFuture` - A JSON Boolean indicating whether the load period should include the future. This property is optional, Default is true.
 * `tieredReplicants` - A JSON Object where the keys are the tier names and values are the number of replicas for that tier.
 
-The interval of a segment will be compared against the specified period. The rule matches if the period overlaps the interval.
+The interval of a segment will be compared against the specified period. The period is from some time in the past to the future or to the current time, which depends on `includeFuture` is true or false. The rule matches if the period *overlaps* the interval.
 
 Drop Rules
 ----------
@@ -120,14 +122,16 @@ Period drop rules are of the form:
 ```json
 {
   "type" : "dropByPeriod",
-  "period" : "P1M"
+  "period" : "P1M",
+  "includeFuture" : true
 }
 ```
 
 * `type` - this should always be "dropByPeriod"
 * `period` - A JSON Object representing ISO-8601 Periods
+* `includeFuture` - A JSON Boolean indicating whether the load period should include the future. This property is optional, Default is true.
 
-The interval of a segment will be compared against the specified period. The period is from some time in the past to the current time. The rule matches if the period contains the interval.
+The interval of a segment will be compared against the specified period. The period is from some time in the past to the future or to the current time, which depends on `includeFuture` is true or false. The rule matches if the period *contains* the interval. This drop rule always dropping recent data.
 
 Broadcast Rules
 ---------------
@@ -173,15 +177,17 @@ Period broadcast rules are of the form:
 {
   "type" : "broadcastByPeriod",
   "colocatedDataSources" : [ "target_source1", "target_source2" ],
-  "period" : "P1M"
+  "period" : "P1M",
+  "includeFuture" : true
 }
 ```
 
 * `type` - this should always be "broadcastByPeriod"
 * `colocatedDataSources` - A JSON List containing data source names to be co-located. `null` and empty list means broadcasting to every node in the cluster.
 * `period` - A JSON Object representing ISO-8601 Periods
+* `includeFuture` - A JSON Boolean indicating whether the load period should include the future. This property is optional, Default is true.
 
-The interval of a segment will be compared against the specified period. The period is from some time in the past to the current time. The rule matches if the period contains the interval.
+The interval of a segment will be compared against the specified period. The period is from some time in the past to the future or to the current time, which depends on `includeFuture` is true or false. The rule matches if the period *overlaps* the interval.
 
 <div class="note caution">
 broadcast rules don't guarantee that segments of the data sources are always co-located because segments for the colocated data sources are not loaded together atomically.
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/rules/PeriodBroadcastDistributionRule.java b/server/src/main/java/org/apache/druid/server/coordinator/rules/PeriodBroadcastDistributionRule.java
index 7d00bf7f413..97c6e11cfba 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/rules/PeriodBroadcastDistributionRule.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/rules/PeriodBroadcastDistributionRule.java
@@ -32,17 +32,21 @@
 public class PeriodBroadcastDistributionRule extends BroadcastDistributionRule
 {
   static final String TYPE = "broadcastByPeriod";
+  private static final boolean DEFAULT_INCLUDE_FUTURE = true;
 
   private final Period period;
+  private final boolean includeFuture;
   private final List<String> colocatedDataSources;
 
   @JsonCreator
   public PeriodBroadcastDistributionRule(
       @JsonProperty("period") Period period,
+      @JsonProperty("includeFuture") Boolean includeFuture,
       @JsonProperty("colocatedDataSources") List<String> colocatedDataSources
   )
   {
     this.period = period;
+    this.includeFuture = includeFuture == null ? DEFAULT_INCLUDE_FUTURE : includeFuture;
     this.colocatedDataSources = colocatedDataSources;
   }
 
@@ -69,7 +73,7 @@ public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp)
   @Override
   public boolean appliesTo(Interval interval, DateTime referenceTimestamp)
   {
-    return Rules.eligibleForLoad(period, interval, referenceTimestamp);
+    return Rules.eligibleForLoad(period, interval, referenceTimestamp, includeFuture);
   }
 
   @JsonProperty
@@ -78,6 +82,12 @@ public Period getPeriod()
     return period;
   }
 
+  @JsonProperty
+  public boolean isIncludeFuture()
+  {
+    return includeFuture;
+  }
+
   @Override
   public boolean equals(Object o)
   {
@@ -94,7 +104,9 @@ public boolean equals(Object o)
     if (!Objects.equals(period, that.period)) {
       return false;
     }
-
+    if (includeFuture != that.includeFuture) {
+      return false;
+    }
     return Objects.equals(colocatedDataSources, that.colocatedDataSources);
   }
 
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/rules/PeriodDropRule.java b/server/src/main/java/org/apache/druid/server/coordinator/rules/PeriodDropRule.java
index f26c3be49df..da17b4c2a9d 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/rules/PeriodDropRule.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/rules/PeriodDropRule.java
@@ -30,14 +30,19 @@
  */
 public class PeriodDropRule extends DropRule
 {
+  private static final boolean DEFAULT_INCLUDE_FUTURE = true;
+
   private final Period period;
+  private final boolean includeFuture;
 
   @JsonCreator
   public PeriodDropRule(
-      @JsonProperty("period") Period period
+      @JsonProperty("period") Period period,
+      @JsonProperty("includeFuture") Boolean includeFuture
   )
   {
     this.period = period;
+    this.includeFuture = includeFuture == null ? DEFAULT_INCLUDE_FUTURE : includeFuture;
   }
 
   @Override
@@ -53,6 +58,12 @@ public Period getPeriod()
     return period;
   }
 
+  @JsonProperty
+  public boolean isIncludeFuture()
+  {
+    return includeFuture;
+  }
+
   @Override
   public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp)
   {
@@ -63,6 +74,10 @@ public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp)
   public boolean appliesTo(Interval theInterval, DateTime referenceTimestamp)
   {
     final Interval currInterval = new Interval(period, referenceTimestamp);
-    return currInterval.contains(theInterval);
+    if (includeFuture) {
+      return currInterval.getStartMillis() <= theInterval.getStartMillis();
+    } else {
+      return currInterval.contains(theInterval);
+    }
   }
 }
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/rules/PeriodLoadRule.java b/server/src/main/java/org/apache/druid/server/coordinator/rules/PeriodLoadRule.java
index 2fe36cc14fc..e33a669007c 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/rules/PeriodLoadRule.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/rules/PeriodLoadRule.java
@@ -37,19 +37,23 @@
 public class PeriodLoadRule extends LoadRule
 {
   private static final Logger log = new Logger(PeriodLoadRule.class);
+  static final boolean DEFAULT_INCLUDE_FUTURE = true;
 
   private final Period period;
+  private final boolean includeFuture;
   private final Map<String, Integer> tieredReplicants;
 
   @JsonCreator
   public PeriodLoadRule(
       @JsonProperty("period") Period period,
+      @JsonProperty("includeFuture") Boolean includeFuture,
       @JsonProperty("tieredReplicants") Map<String, Integer> tieredReplicants
   )
   {
     this.tieredReplicants = tieredReplicants == null ? ImmutableMap.of(DruidServer.DEFAULT_TIER, DruidServer.DEFAULT_NUM_REPLICANTS) : tieredReplicants;
     validateTieredReplicants(this.tieredReplicants);
     this.period = period;
+    this.includeFuture = includeFuture == null ? DEFAULT_INCLUDE_FUTURE : includeFuture;
   }
 
   @Override
@@ -65,6 +69,12 @@ public Period getPeriod()
     return period;
   }
 
+  @JsonProperty
+  public boolean isIncludeFuture()
+  {
+    return includeFuture;
+  }
+
   @Override
   @JsonProperty
   public Map<String, Integer> getTieredReplicants()
@@ -88,6 +98,6 @@ public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp)
   @Override
   public boolean appliesTo(Interval interval, DateTime referenceTimestamp)
   {
-    return Rules.eligibleForLoad(period, interval, referenceTimestamp);
+    return Rules.eligibleForLoad(period, interval, referenceTimestamp, includeFuture);
   }
 }
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/rules/Rules.java b/server/src/main/java/org/apache/druid/server/coordinator/rules/Rules.java
index e5c65d9e850..a8d074b6381 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/rules/Rules.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/rules/Rules.java
@@ -30,10 +30,14 @@ public static boolean eligibleForLoad(Interval src, Interval target)
     return src.overlaps(target);
   }
 
-  public static boolean eligibleForLoad(Period period, Interval interval, DateTime referenceTimestamp)
+  public static boolean eligibleForLoad(Period period, Interval interval, DateTime referenceTimestamp, boolean includeFuture)
   {
     final Interval currInterval = new Interval(period, referenceTimestamp);
-    return eligibleForLoad(currInterval, interval);
+    if (includeFuture) {
+      return currInterval.getStartMillis() < interval.getEndMillis();
+    } else {
+      return eligibleForLoad(currInterval, interval);
+    }
   }
 
   private Rules() {}
diff --git a/server/src/main/resources/static/old-console/js/rules-0.0.2.js b/server/src/main/resources/static/old-console/js/rules-0.0.2.js
index 81e77af4137..a5b0e96ff97 100644
--- a/server/src/main/resources/static/old-console/js/rules-0.0.2.js
+++ b/server/src/main/resources/static/old-console/js/rules-0.0.2.js
@@ -114,6 +114,7 @@ function makeLoadByInterval(rule) {
 function makeLoadByPeriod(rule) {
   var retVal = "";
   retVal += "<span class='rule_label'>period</span><input type='text' name='period' " + "value='" + rule.period + "'/>";
+  retVal += "<span class='rule_label'>includeFuture</span><input type='text' name='includeFuture' " + "value='" + true + "'/>";
   retVal += "<button type='button' class='add_tier'>Add Another Tier</button>";
   if (rule.tieredReplicants === undefined) {
     retVal += makeTierLoad(null, 0);
@@ -148,7 +149,10 @@ function makeDropByInterval(rule) {
 }
 
 function makeDropByPeriod(rule) {
-  return "<span class='rule_label'>period</span><input type='text' name='period' " + "value='" + rule.period + "'/>";
+  var retVal = "";
+  retVal += "<span class='rule_label'>period</span><input type='text' name='period' " + "value='" + rule.period + "'/>";
+  retVal += "<span class='rule_label'>includeFuture</span><input type='text' name='includeFuture' " + "value='" + true + "'/>";
+  return retVal;
 }
 
 function makeJSON() {
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorBalancerProfiler.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorBalancerProfiler.java
index c505657c9af..7a4eb5ddad7 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorBalancerProfiler.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorBalancerProfiler.java
@@ -57,7 +57,7 @@
   Map<String, DataSegment> segments = Maps.newHashMap();
   ServiceEmitter emitter;
   MetadataRuleManager manager;
-  PeriodLoadRule loadRule = new PeriodLoadRule(new Period("P5000Y"), ImmutableMap.of("normal", 3));
+  PeriodLoadRule loadRule = new PeriodLoadRule(new Period("P5000Y"), null, ImmutableMap.of("normal", 3));
   List<Rule> rules = ImmutableList.of(loadRule);
 
   @Before
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRuleSerdeTest.java b/server/src/test/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRuleSerdeTest.java
index 012b189d3e9..5f6cbf53c3b 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRuleSerdeTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRuleSerdeTest.java
@@ -51,9 +51,9 @@
         new Object[]{new IntervalBroadcastDistributionRule(Intervals.of("0/1000"), ImmutableList.of("large_source"))},
         new Object[]{new IntervalBroadcastDistributionRule(Intervals.of("0/1000"), ImmutableList.of())},
         new Object[]{new IntervalBroadcastDistributionRule(Intervals.of("0/1000"), null)},
-        new Object[]{new PeriodBroadcastDistributionRule(new Period(1000), ImmutableList.of("large_source"))},
-        new Object[]{new PeriodBroadcastDistributionRule(new Period(1000), ImmutableList.of())},
-        new Object[]{new PeriodBroadcastDistributionRule(new Period(1000), null)}
+        new Object[]{new PeriodBroadcastDistributionRule(new Period(1000), null, ImmutableList.of("large_source"))},
+        new Object[]{new PeriodBroadcastDistributionRule(new Period(1000), null, ImmutableList.of())},
+        new Object[]{new PeriodBroadcastDistributionRule(new Period(1000), null, null)}
     );
     return params;
   }
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/rules/PeriodDropRuleTest.java b/server/src/test/java/org/apache/druid/server/coordinator/rules/PeriodDropRuleTest.java
index 16df35dfe43..1e2dafc7dbd 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/rules/PeriodDropRuleTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/rules/PeriodDropRuleTest.java
@@ -42,7 +42,8 @@ public void testAppliesToAll()
   {
     DateTime now = DateTimes.of("2012-12-31T01:00:00");
     PeriodDropRule rule = new PeriodDropRule(
-        new Period("P5000Y")
+        new Period("P5000Y"),
+        false
     );
 
     Assert.assertTrue(
@@ -70,7 +71,8 @@ public void testAppliesToPeriod()
   {
     DateTime now = DateTimes.of("2012-12-31T01:00:00");
     PeriodDropRule rule = new PeriodDropRule(
-        new Period("P1M")
+        new Period("P1M"),
+        false
     );
 
     Assert.assertTrue(
@@ -102,4 +104,31 @@ public void testAppliesToPeriod()
         )
     );
   }
+
+  @Test
+  public void testIncludeFuture()
+  {
+    DateTime now = DateTimes.of("2012-12-31T01:00:00");
+    PeriodDropRule includeFutureRule = new PeriodDropRule(
+        new Period("P2D"),
+        true
+    );
+    PeriodDropRule notIncludeFutureRule = new PeriodDropRule(
+        new Period("P2D"),
+        false
+    );
+
+    Assert.assertTrue(
+        includeFutureRule.appliesTo(
+            builder.interval(new Interval(now.plusDays(1), now.plusDays(2))).build(),
+            now
+        )
+    );
+    Assert.assertFalse(
+        notIncludeFutureRule.appliesTo(
+            builder.interval(new Interval(now.plusDays(1), now.plusDays(2))).build(),
+            now
+        )
+    );
+  }
 }
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/rules/PeriodLoadRuleTest.java b/server/src/test/java/org/apache/druid/server/coordinator/rules/PeriodLoadRuleTest.java
index 63f4ca04ca8..53eaa2ef623 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/rules/PeriodLoadRuleTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/rules/PeriodLoadRuleTest.java
@@ -48,6 +48,7 @@ public void testAppliesToAll()
     DateTime now = DateTimes.of("2013-01-01");
     PeriodLoadRule rule = new PeriodLoadRule(
         new Period("P5000Y"),
+        false,
         ImmutableMap.of("", 0)
     );
 
@@ -62,6 +63,7 @@ public void testAppliesToPeriod()
     DateTime now = DateTimes.of("2012-12-31T01:00:00");
     PeriodLoadRule rule = new PeriodLoadRule(
         new Period("P1M"),
+        false,
         ImmutableMap.of("", 0)
     );
 
@@ -88,6 +90,7 @@ public void testAppliesToPartialOverlap()
     DateTime now = DateTimes.of("2012-12-31T01:00:00");
     PeriodLoadRule rule = new PeriodLoadRule(
             new Period("P1M"),
+            false,
             ImmutableMap.of("", 0)
     );
 
@@ -108,22 +111,59 @@ public void testAppliesToPartialOverlap()
   }
 
   @Test
-  public void testSerdeNullTieredReplicants() throws Exception
+  public void testIncludeFuture()
+  {
+    DateTime now = DateTimes.of("2012-12-31T01:00:00");
+    PeriodLoadRule includeFutureRule = new PeriodLoadRule(
+        new Period("P2D"),
+        true,
+        ImmutableMap.of("", 0)
+    );
+    PeriodLoadRule notIncludeFutureRule = new PeriodLoadRule(
+        new Period("P2D"),
+        false,
+        ImmutableMap.of("", 0)
+    );
+
+    Assert.assertTrue(
+        includeFutureRule.appliesTo(
+            builder.interval(new Interval(now.plusDays(1), now.plusDays(2))).build(),
+            now
+        )
+    );
+    Assert.assertFalse(
+        notIncludeFutureRule.appliesTo(
+            builder.interval(new Interval(now.plusDays(1), now.plusDays(2))).build(),
+            now
+        )
+    );
+  }
+
+  /**
+   * test serialize/deserilize null values of {@link PeriodLoadRule#tieredReplicants} and {@link PeriodLoadRule#includeFuture}
+   */
+  @Test
+  public void testSerdeNull() throws Exception
   {
     PeriodLoadRule rule = new PeriodLoadRule(
-        new Period("P1D"), null
+        new Period("P1D"), null, null
     );
 
     ObjectMapper jsonMapper = new DefaultObjectMapper();
     Rule reread = jsonMapper.readValue(jsonMapper.writeValueAsString(rule), Rule.class);
 
     Assert.assertEquals(rule.getPeriod(), ((PeriodLoadRule) reread).getPeriod());
+    Assert.assertEquals(rule.isIncludeFuture(), ((PeriodLoadRule) reread).isIncludeFuture());
+    Assert.assertEquals(PeriodLoadRule.DEFAULT_INCLUDE_FUTURE, rule.isIncludeFuture());
     Assert.assertEquals(rule.getTieredReplicants(), ((PeriodLoadRule) reread).getTieredReplicants());
     Assert.assertEquals(ImmutableMap.of(DruidServer.DEFAULT_TIER, DruidServer.DEFAULT_NUM_REPLICANTS), rule.getTieredReplicants());
   }
 
+  /**
+   * test mapping null values of {@link PeriodLoadRule#tieredReplicants} and {@link PeriodLoadRule#includeFuture}
+   */
   @Test
-  public void testMappingNullTieredReplicants() throws Exception
+  public void testMappingNull() throws Exception
   {
     String inputJson = "{\n"
                        + "      \"period\": \"P1D\",\n"
@@ -131,6 +171,7 @@ public void testMappingNullTieredReplicants() throws Exception
                        + "    }";
     String expectedJson = "{\n"
                           + "      \"period\": \"P1D\",\n"
+                          + "      \"includeFuture\": " + PeriodLoadRule.DEFAULT_INCLUDE_FUTURE + ",\n"
                           + "      \"tieredReplicants\": {\n"
                           + "        \"" + DruidServer.DEFAULT_TIER + "\": " + DruidServer.DEFAULT_NUM_REPLICANTS + "\n"
                           + "      },\n"
@@ -141,5 +182,6 @@ public void testMappingNullTieredReplicants() throws Exception
     PeriodLoadRule expectedPeriodLoadRule = jsonMapper.readValue(expectedJson, PeriodLoadRule.class);
     Assert.assertEquals(expectedPeriodLoadRule.getTieredReplicants(), inputPeriodLoadRule.getTieredReplicants());
     Assert.assertEquals(expectedPeriodLoadRule.getPeriod(), inputPeriodLoadRule.getPeriod());
+    Assert.assertEquals(expectedPeriodLoadRule.isIncludeFuture(), inputPeriodLoadRule.isIncludeFuture());
   }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org