You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by pr...@apache.org on 2023/03/09 05:38:14 UTC

[druid] branch master updated: Fix for OOM in the Tombstone generating logic in MSQ (#13893)

This is an automated email from the ASF dual-hosted git repository.

progers pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new dc67296e9d Fix for OOM in the Tombstone generating logic in MSQ (#13893)
dc67296e9d is described below

commit dc67296e9d0adfb65f31a96d2653e0eb3bd40f1e
Author: Laksh Singla <la...@gmail.com>
AuthorDate: Thu Mar 9 11:08:08 2023 +0530

    Fix for OOM in the Tombstone generating logic in MSQ (#13893)
    
    fix OOMs using a different logic for generating tombstones
    
    ---------
    
    Co-authored-by: Paul Rogers <pa...@users.noreply.github.com>
---
 .../org/apache/druid/msq/exec/MSQReplaceTest.java  |   4 +-
 .../task/batch/parallel/TombstoneHelper.java       |  44 +++----
 .../task/batch/parallel/TombstoneHelperTest.java   | 127 ++++++++++++++++++++-
 3 files changed, 149 insertions(+), 26 deletions(-)

diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
index 02c559ee25..271658e041 100644
--- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
+++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
@@ -748,9 +748,7 @@ public class MSQReplaceTest extends MSQTestBase
                      .setExpectedShardSpec(DimensionRangeShardSpec.class)
                      .setExpectedTombstoneIntervals(
                          ImmutableSet.of(
-                             Intervals.of("2001-04-01/P3M"),
-                             Intervals.of("2001-07-01/P3M"),
-                             Intervals.of("2001-10-01/P3M")
+                             Intervals.of("2001-04-01/2002-01-01")
                          )
                      )
                      .setExpectedResultRows(expectedResults)
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java
index 6ad3b22acc..0c915d0025 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java
@@ -20,8 +20,6 @@
 package org.apache.druid.indexing.common.task.batch.parallel;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Sets;
 import org.apache.druid.indexing.common.TaskLock;
 import org.apache.druid.indexing.common.actions.LockListAction;
 import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
@@ -30,13 +28,13 @@ import org.apache.druid.indexing.overlord.Segments;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.JodaUtils;
 import org.apache.druid.java.util.common.granularity.Granularity;
-import org.apache.druid.java.util.common.granularity.IntervalsByGranularity;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.indexing.granularity.GranularitySpec;
 import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.partition.ShardSpec;
 import org.apache.druid.timeline.partition.TombstoneShardSpec;
+import org.joda.time.DateTime;
 import org.joda.time.Interval;
 
 import java.io.IOException;
@@ -189,8 +187,6 @@ public class TombstoneHelper
     for (Interval intervalToDrop : intervalsToDrop) {
       for (Interval usedInterval : usedIntervals) {
 
-        // Overlap will always be finite (not starting from -Inf or ending at +Inf) and lesser than or
-        // equal to the size of the usedInterval
         Interval overlap = intervalToDrop.overlap(usedInterval);
 
         // No overlap of the dropped segment with the used interval due to which we donot need to generate any tombstone
@@ -199,26 +195,34 @@ public class TombstoneHelper
         }
 
         // Overlap might not be aligned with the granularity if the used interval is not aligned with the granularity
-        // However when fetching from the iterator, the first interval is found using the bucketStart, which
-        // ensures that the interval is "rounded down" to the first timestamp that aligns with the granularity
-        // Also, the interval would always be contained inside the "intervalToDrop" because the original REPLACE
-        // is aligned by the granularity, and by extension all the elements inside the intervals to drop would
-        // also be aligned by the same granularity (since intervalsToDrop = replaceIntervals - publishIntervals, and
-        // the right-hand side is always aligned)
-        //
+        // However we align the boundaries manually, in the following code.
+
+        // If the start is aligned, then bucketStart is idempotent, else it will return the latest timestamp less than
+        // overlap.getStart() which aligns with the replace granularity. That extra interval that we are including
+        // before the overlap should be contained in intervalToDrop because intervalToDrop is aligned by the
+        // replaceGranularity, and the overlap's beginning would always be later than intervalToDrop (trivially,
+        // because its the overlap) and if bucketStart floors the overlap beginning, it cannot floor it before
+        // the intervalToDrop's start
+        DateTime alignedIntervalStart = replaceGranularity.bucketStart(overlap.getStart());
+
         // For example, if the replace granularity is DAY, intervalsToReplace are 20/02/2023 - 24/02/2023 (always
         // aligned with the replaceGranularity), intervalsToDrop are 22/02/2023 - 24/02/2023 (they must also be aligned with the replaceGranularity)
         // If the relevant usedIntervals for the datasource are from 22/02/2023 01:00:00 - 23/02/2023 02:00:00, then
         // the overlap would be 22/02/2023 01:00:00 - 23/02/2023 02:00:00. When iterating over the overlap we will get
         // the intervals from 22/02/2023 - 23/02/2023, and 23/02/2023 - 24/02/2023
-        IntervalsByGranularity intervalsToDropByGranularity = new IntervalsByGranularity(
-            ImmutableList.of(overlap),
-            replaceGranularity
-        );
-
-        // Helps in deduplication if required. Since all the intervals are uniformly granular, there should be no
-        // no overlap post deduplication
-        retVal.addAll(Sets.newHashSet(intervalsToDropByGranularity.granularityIntervalsIterator()));
+
+        // If the end is aligned, then we do not alter it, else we align the end by geting the earliest time later
+        // than the overlap's end which aligns with the replace granularity. Using the above-mentioned logic for the
+        // start time, we can also argue that the rounded up end would be contained in the intervalToDrop
+        DateTime alignedIntervalEnd;
+        if (replaceGranularity.bucketStart(overlap.getEnd()).equals(overlap.getEnd())) { // Check if the end is aligned
+          alignedIntervalEnd = overlap.getEnd();
+        } else {
+          alignedIntervalEnd = replaceGranularity.bucketEnd(overlap.getEnd());
+        }
+        Interval alignedTombstoneInterval = new Interval(alignedIntervalStart, alignedIntervalEnd);
+
+        retVal.add(alignedTombstoneInterval);
       }
     }
     return retVal;
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelperTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelperTest.java
index 3450bf2703..c5977efaa8 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelperTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelperTest.java
@@ -147,7 +147,7 @@ public class TombstoneHelperTest
         replaceGranularity
     );
     Assert.assertEquals(
-        ImmutableSet.of(Intervals.of("2020-03-05/2020-03-06"), Intervals.of("2020-03-06/2020-03-07")),
+        ImmutableSet.of(Intervals.of("2020-03-05/2020-03-07")),
         tombstoneIntervals
     );
   }
@@ -183,8 +183,7 @@ public class TombstoneHelperTest
     Assert.assertEquals(
         ImmutableSet.of(
             Intervals.of("2020-03-01/2020-04-01"),
-            Intervals.of("2020-07-01/2020-08-01"),
-            Intervals.of("2020-08-01/2020-09-01")
+            Intervals.of("2020-07-01/2020-09-01")
         ),
         tombstoneIntervals
     );
@@ -248,6 +247,128 @@ public class TombstoneHelperTest
     Assert.assertEquals(ImmutableSet.of(), tombstoneIntervals);
   }
 
+  @Test
+  public void testTombstoneIntervalsCreatedForReplaceWhenDataLiesOnLeft() throws IOException
+  {
+    Interval usedInterval = Intervals.of("2020-01-01/2020-12-31");
+    Interval replaceInterval = Intervals.of("2020-01-01/2020-12-31");
+    Interval intervalToDrop = Intervals.of("2020-02-01/2020-12-31");
+    Granularity replaceGranularity = Granularities.DAY;
+
+    DataSegment existingUsedSegment =
+        DataSegment.builder()
+                   .dataSource("test")
+                   .interval(usedInterval)
+                   .version("oldVersion")
+                   .size(100)
+                   .build();
+    Assert.assertFalse(existingUsedSegment.isTombstone());
+    Mockito.when(taskActionClient.submit(any(TaskAction.class)))
+           .thenReturn(Collections.singletonList(existingUsedSegment));
+    TombstoneHelper tombstoneHelper = new TombstoneHelper(taskActionClient);
+
+    Set<Interval> tombstoneIntervals = tombstoneHelper.computeTombstoneIntervalsForReplace(
+        ImmutableList.of(intervalToDrop),
+        ImmutableList.of(replaceInterval),
+        "test",
+        replaceGranularity
+    );
+    Assert.assertEquals(ImmutableSet.of(Intervals.of("2020-02-01/2020-12-31")), tombstoneIntervals);
+  }
+
+  @Test
+  public void testTombstoneIntervalsCreatedForReplaceWhenDataLiesOnRight() throws IOException
+  {
+    Interval usedInterval = Intervals.of("2020-01-01/2020-12-31");
+    Interval replaceInterval = Intervals.of("2020-01-01/2020-12-31");
+    Interval intervalToDrop = Intervals.of("2020-01-01/2020-11-30");
+    Granularity replaceGranularity = Granularities.DAY;
+
+    DataSegment existingUsedSegment =
+        DataSegment.builder()
+                   .dataSource("test")
+                   .interval(usedInterval)
+                   .version("oldVersion")
+                   .size(100)
+                   .build();
+    Assert.assertFalse(existingUsedSegment.isTombstone());
+    Mockito.when(taskActionClient.submit(any(TaskAction.class)))
+           .thenReturn(Collections.singletonList(existingUsedSegment));
+    TombstoneHelper tombstoneHelper = new TombstoneHelper(taskActionClient);
+
+    Set<Interval> tombstoneIntervals = tombstoneHelper.computeTombstoneIntervalsForReplace(
+        ImmutableList.of(intervalToDrop),
+        ImmutableList.of(replaceInterval),
+        "test",
+        replaceGranularity
+    );
+    Assert.assertEquals(ImmutableSet.of(Intervals.of("2020-01-01/2020-11-30")), tombstoneIntervals);
+  }
+
+  @Test
+  public void testTombstoneIntervalsCreatedForReplaceWhenDataLiesInMiddle() throws IOException
+  {
+    Interval usedInterval = Intervals.of("2020-01-01/2020-12-31");
+    Interval replaceInterval = Intervals.of("2020-01-01/2020-12-31");
+    List<Interval> intervalsToDrop = ImmutableList.of(
+        Intervals.of("2020-01-01/2020-11-30"),
+        Intervals.of("2020-12-05/2020-12-30")
+    );
+    Granularity replaceGranularity = Granularities.DAY;
+
+    DataSegment existingUsedSegment =
+        DataSegment.builder()
+                   .dataSource("test")
+                   .interval(usedInterval)
+                   .version("oldVersion")
+                   .size(100)
+                   .build();
+    Assert.assertFalse(existingUsedSegment.isTombstone());
+    Mockito.when(taskActionClient.submit(any(TaskAction.class)))
+           .thenReturn(Collections.singletonList(existingUsedSegment));
+    TombstoneHelper tombstoneHelper = new TombstoneHelper(taskActionClient);
+
+    Set<Interval> tombstoneIntervals = tombstoneHelper.computeTombstoneIntervalsForReplace(
+        intervalsToDrop,
+        ImmutableList.of(replaceInterval),
+        "test",
+        replaceGranularity
+    );
+    Assert.assertEquals(
+        ImmutableSet.of(Intervals.of("2020-01-01/2020-11-30"), Intervals.of("2020-12-05/2020-12-30")),
+        tombstoneIntervals
+    );
+  }
+
+  @Test
+  public void testTombstoneIntervalsCreatedForReplaceWhenExistingGranularityIsEternity() throws IOException
+  {
+    Interval usedInterval = Intervals.ETERNITY;
+    Interval replaceInterval = Intervals.ETERNITY;
+    List<Interval> intervalsToDrop = ImmutableList.of(Intervals.of("2020-01-01/2020-11-30"));
+    Granularity replaceGranularity = Granularities.DAY;
+
+    DataSegment existingUsedSegment =
+        DataSegment.builder()
+                   .dataSource("test")
+                   .interval(usedInterval)
+                   .version("oldVersion")
+                   .size(100)
+                   .build();
+    Assert.assertFalse(existingUsedSegment.isTombstone());
+    Mockito.when(taskActionClient.submit(any(TaskAction.class)))
+           .thenReturn(Collections.singletonList(existingUsedSegment));
+    TombstoneHelper tombstoneHelper = new TombstoneHelper(taskActionClient);
+
+    Set<Interval> tombstoneIntervals = tombstoneHelper.computeTombstoneIntervalsForReplace(
+        intervalsToDrop,
+        ImmutableList.of(replaceInterval),
+        "test",
+        replaceGranularity
+    );
+    Assert.assertEquals(ImmutableSet.of(Intervals.of("2020-01-01/2020-11-30")), tombstoneIntervals);
+  }
+
   @Test
   public void testTombstoneSegmentsForReplaceWhenLockRevoked() throws IOException
   {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org