You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ka...@apache.org on 2023/03/10 01:29:58 UTC

[druid] branch master updated: Improve documentation for tombstone generation and minor improvement (#13907)

This is an automated email from the ASF dual-hosted git repository.

karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new c16d9da35a Improve documentation for tombstone generation and minor improvement (#13907)
c16d9da35a is described below

commit c16d9da35ae87a111a19728354b09a3b4610a107
Author: Laksh Singla <la...@gmail.com>
AuthorDate: Fri Mar 10 06:59:51 2023 +0530

    Improve documentation for tombstone generation and minor improvement (#13907)
    
    * As a follow up to #13893, this PR improves the comments added along with examples for the code, as well as adds handling for an edge case where the generated tombstone boundaries were overshooting the bounds of MIN_TIME (or MAX_TIME).
---
 .../task/batch/parallel/TombstoneHelper.java       | 45 ++++++++++++++--------
 .../task/batch/parallel/TombstoneHelperTest.java   | 39 +++++++++++++++++++
 2 files changed, 68 insertions(+), 16 deletions(-)

diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java
index 0c915d0025..61766cab77 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java
@@ -26,6 +26,7 @@ import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
 import org.apache.druid.indexing.overlord.Segments;
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.JodaUtils;
 import org.apache.druid.java.util.common.granularity.Granularity;
 import org.apache.druid.segment.indexing.DataSchema;
@@ -93,7 +94,7 @@ public class TombstoneHelper
     List<Interval> retVal = new ArrayList<>();
     GranularitySpec granularitySpec = dataSchema.getGranularitySpec();
     List<Interval> pushedSegmentsIntervals = getCondensedPushedSegmentsIntervals(pushedSegments);
-    List<Interval> intervalsForUsedSegments = getCondensedUsedIntervals(
+    List<Interval> intervalsForUsedSegments = getExistingNonEmptyIntervalsOfDatasource(
         dataSchema.getGranularitySpec().inputIntervals(),
         dataSchema.getDataSource()
     );
@@ -165,11 +166,12 @@ public class TombstoneHelper
   }
 
   /**
-   * @param intervalsToDrop Empty intervals in the query that need to be dropped. They should be aligned with the
-   *                        replaceGranularity
+   * See the method body for an example and an indepth explanation as to how the replace interval is created
+   * @param intervalsToDrop    Empty intervals in the query that need to be dropped. They should be aligned with the
+   *                           replaceGranularity
    * @param intervalsToReplace Intervals in the query which are eligible for replacement with new data.
    *                           They should be aligned with the replaceGranularity
-   * @param dataSource Datasource on which the replace is to be performed
+   * @param dataSource         Datasource on which the replace is to be performed
    * @param replaceGranularity Granularity of the replace query
    * @return Intervals computed for the tombstones
    * @throws IOException
@@ -182,7 +184,7 @@ public class TombstoneHelper
   ) throws IOException
   {
     Set<Interval> retVal = new HashSet<>();
-    List<Interval> usedIntervals = getCondensedUsedIntervals(intervalsToReplace, dataSource);
+    List<Interval> usedIntervals = getExistingNonEmptyIntervalsOfDatasource(intervalsToReplace, dataSource);
 
     for (Interval intervalToDrop : intervalsToDrop) {
       for (Interval usedInterval : usedIntervals) {
@@ -194,22 +196,29 @@ public class TombstoneHelper
           continue;
         }
 
-        // Overlap might not be aligned with the granularity if the used interval is not aligned with the granularity
-        // However we align the boundaries manually, in the following code.
+        // "overlap" might not be aligned with the if the used interval is not aligned with the granularity of
+        // the REPLACE i.e. datasource's original granularity and replace's granularity are different
+
+        // However, we align the boundaries of the overlap with the replaceGranularity manually, in the following code.
 
-        // If the start is aligned, then bucketStart is idempotent, else it will return the latest timestamp less than
-        // overlap.getStart() which aligns with the replace granularity. That extra interval that we are including
-        // before the overlap should be contained in intervalToDrop because intervalToDrop is aligned by the
-        // replaceGranularity, and the overlap's beginning would always be later than intervalToDrop (trivially,
-        // because its the overlap) and if bucketStart floors the overlap beginning, it cannot floor it before
-        // the intervalToDrop's start
         DateTime alignedIntervalStart = replaceGranularity.bucketStart(overlap.getStart());
+        long alignedIntervalStartMillis = Math.max(alignedIntervalStart.getMillis(), JodaUtils.MIN_INSTANT);
+        // If the start is aligned, then 'bucketStart()' is unchanged.
+        // Else 'bucketStart()' will return the latest timestamp less than overlap.getStart() which aligns with the REPLACE granularity.
+
+        // That extra interval that we are adding before the overlap should be contained in 'intervalToDrop' because
+        // intervalToDrop is aligned by the replaceGranularity.
+        // If the drop's interval is n, then the extra interval would start from n + 1 (where 1 denotes the replaceGranularity)
+        // The overlap's beginning would always be later than intervalToDrop (trivially,
+        // because it is the overlap) and if bucketStart floors the overlap beginning, it cannot floor it before
+        // the intervalToDrop's start
 
         // For example, if the replace granularity is DAY, intervalsToReplace are 20/02/2023 - 24/02/2023 (always
         // aligned with the replaceGranularity), intervalsToDrop are 22/02/2023 - 24/02/2023 (they must also be aligned with the replaceGranularity)
         // If the relevant usedIntervals for the datasource are from 22/02/2023 01:00:00 - 23/02/2023 02:00:00, then
         // the overlap would be 22/02/2023 01:00:00 - 23/02/2023 02:00:00. When iterating over the overlap we will get
-        // the intervals from 22/02/2023 - 23/02/2023, and 23/02/2023 - 24/02/2023
+        // the intervals from 22/02/2023 01:00:00 - 23/02/2023 02:00:00. After aligning it would become
+        // 22/02/2023T00:00:00Z - 23/02/2023T23:59:59Z
 
         // If the end is aligned, then we do not alter it, else we align the end by geting the earliest time later
         // than the overlap's end which aligns with the replace granularity. Using the above-mentioned logic for the
@@ -220,7 +229,8 @@ public class TombstoneHelper
         } else {
           alignedIntervalEnd = replaceGranularity.bucketEnd(overlap.getEnd());
         }
-        Interval alignedTombstoneInterval = new Interval(alignedIntervalStart, alignedIntervalEnd);
+        long alignedIntervalEndMillis = Math.min(alignedIntervalEnd.getMillis(), JodaUtils.MAX_INSTANT);
+        Interval alignedTombstoneInterval = Intervals.utc(alignedIntervalStartMillis, alignedIntervalEndMillis);
 
         retVal.add(alignedTombstoneInterval);
       }
@@ -259,13 +269,16 @@ public class TombstoneHelper
   /**
    * Helper method to prune required tombstones. Only tombstones that cover used intervals will be created
    * since those that not cover used intervals will be redundant.
+   * Example:
+   * For a datasource having segments for 2020-01-01/2020-12-31 and 2022-01-01/2022-12-31, this method would return
+   * the segment 2020-01-01/2020-12-31 if the input intervals asked for the segment between 2019 and 2021.
    *
    * @param inputIntervals   Intervals corresponding to the task
    * @param dataSource       Datasource corresponding to the task
    * @return Intervals corresponding to used segments that overlap with any of the spec's input intervals
    * @throws IOException If used segments cannot be retrieved
    */
-  private List<Interval> getCondensedUsedIntervals(
+  private List<Interval> getExistingNonEmptyIntervalsOfDatasource(
       List<Interval> inputIntervals,
       String dataSource
   ) throws IOException
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelperTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelperTest.java
index c5977efaa8..0ba6ad9fd1 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelperTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelperTest.java
@@ -27,6 +27,7 @@ import org.apache.druid.indexing.common.actions.TaskAction;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.JodaUtils;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.granularity.Granularity;
 import org.apache.druid.segment.indexing.DataSchema;
@@ -369,6 +370,44 @@ public class TombstoneHelperTest
     Assert.assertEquals(ImmutableSet.of(Intervals.of("2020-01-01/2020-11-30")), tombstoneIntervals);
   }
 
+  @Test
+  public void testTombstoneIntervalsCreatedForReplaceWhenReplaceAll() throws IOException
+  {
+    Interval usedInterval = Intervals.ETERNITY;
+    Interval replaceInterval = Intervals.ETERNITY;
+    List<Interval> intervalsToDrop = ImmutableList.of(
+        Intervals.utc(JodaUtils.MIN_INSTANT, 10000),
+        Intervals.utc(100000, JodaUtils.MAX_INSTANT)
+    );
+    Granularity replaceGranularity = Granularities.DAY;
+
+    DataSegment existingUsedSegment =
+        DataSegment.builder()
+                   .dataSource("test")
+                   .interval(usedInterval)
+                   .version("oldVersion")
+                   .size(100)
+                   .build();
+    Assert.assertFalse(existingUsedSegment.isTombstone());
+    Mockito.when(taskActionClient.submit(any(TaskAction.class)))
+           .thenReturn(Collections.singletonList(existingUsedSegment));
+    TombstoneHelper tombstoneHelper = new TombstoneHelper(taskActionClient);
+
+    Set<Interval> tombstoneIntervals = tombstoneHelper.computeTombstoneIntervalsForReplace(
+        intervalsToDrop,
+        ImmutableList.of(replaceInterval),
+        "test",
+        replaceGranularity
+    );
+    Assert.assertEquals(
+        ImmutableSet.of(
+            Intervals.of("-146136543-09-08T08:23:32.096Z/1970-01-02T00:00:00.000Z"),
+            Intervals.of("1970-01-01T00:00:00.000Z/146140482-04-24T15:36:27.903Z")
+        ),
+        tombstoneIntervals
+    );
+  }
+
   @Test
   public void testTombstoneSegmentsForReplaceWhenLockRevoked() throws IOException
   {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org