You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by pr...@apache.org on 2023/03/09 05:38:14 UTC
[druid] branch master updated: Fix for OOM in the Tombstone generating logic in MSQ (#13893)
This is an automated email from the ASF dual-hosted git repository.
progers pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new dc67296e9d Fix for OOM in the Tombstone generating logic in MSQ (#13893)
dc67296e9d is described below
commit dc67296e9d0adfb65f31a96d2653e0eb3bd40f1e
Author: Laksh Singla <la...@gmail.com>
AuthorDate: Thu Mar 9 11:08:08 2023 +0530
Fix for OOM in the Tombstone generating logic in MSQ (#13893)
fix OOMs using a different logic for generating tombstones
---------
Co-authored-by: Paul Rogers <pa...@users.noreply.github.com>
---
.../org/apache/druid/msq/exec/MSQReplaceTest.java | 4 +-
.../task/batch/parallel/TombstoneHelper.java | 44 +++----
.../task/batch/parallel/TombstoneHelperTest.java | 127 ++++++++++++++++++++-
3 files changed, 149 insertions(+), 26 deletions(-)
diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
index 02c559ee25..271658e041 100644
--- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
+++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
@@ -748,9 +748,7 @@ public class MSQReplaceTest extends MSQTestBase
.setExpectedShardSpec(DimensionRangeShardSpec.class)
.setExpectedTombstoneIntervals(
ImmutableSet.of(
- Intervals.of("2001-04-01/P3M"),
- Intervals.of("2001-07-01/P3M"),
- Intervals.of("2001-10-01/P3M")
+ Intervals.of("2001-04-01/2002-01-01")
)
)
.setExpectedResultRows(expectedResults)
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java
index 6ad3b22acc..0c915d0025 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelper.java
@@ -20,8 +20,6 @@
package org.apache.druid.indexing.common.task.batch.parallel;
import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Sets;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.actions.LockListAction;
import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
@@ -30,13 +28,13 @@ import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
-import org.apache.druid.java.util.common.granularity.IntervalsByGranularity;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.ShardSpec;
import org.apache.druid.timeline.partition.TombstoneShardSpec;
+import org.joda.time.DateTime;
import org.joda.time.Interval;
import java.io.IOException;
@@ -189,8 +187,6 @@ public class TombstoneHelper
for (Interval intervalToDrop : intervalsToDrop) {
for (Interval usedInterval : usedIntervals) {
- // Overlap will always be finite (not starting from -Inf or ending at +Inf) and lesser than or
- // equal to the size of the usedInterval
Interval overlap = intervalToDrop.overlap(usedInterval);
// No overlap of the dropped segment with the used interval due to which we donot need to generate any tombstone
@@ -199,26 +195,34 @@ public class TombstoneHelper
}
// Overlap might not be aligned with the granularity if the used interval is not aligned with the granularity
- // However when fetching from the iterator, the first interval is found using the bucketStart, which
- // ensures that the interval is "rounded down" to the first timestamp that aligns with the granularity
- // Also, the interval would always be contained inside the "intervalToDrop" because the original REPLACE
- // is aligned by the granularity, and by extension all the elements inside the intervals to drop would
- // also be aligned by the same granularity (since intervalsToDrop = replaceIntervals - publishIntervals, and
- // the right-hand side is always aligned)
- //
+ // However we align the boundaries manually, in the following code.
+
+ // If the start is aligned, then bucketStart is idempotent, else it will return the latest timestamp less than
+ // overlap.getStart() which aligns with the replace granularity. That extra interval that we are including
+ // before the overlap should be contained in intervalToDrop because intervalToDrop is aligned by the
+ // replaceGranularity, and the overlap's beginning would always be later than intervalToDrop (trivially,
+ // because its the overlap) and if bucketStart floors the overlap beginning, it cannot floor it before
+ // the intervalToDrop's start
+ DateTime alignedIntervalStart = replaceGranularity.bucketStart(overlap.getStart());
+
// For example, if the replace granularity is DAY, intervalsToReplace are 20/02/2023 - 24/02/2023 (always
// aligned with the replaceGranularity), intervalsToDrop are 22/02/2023 - 24/02/2023 (they must also be aligned with the replaceGranularity)
// If the relevant usedIntervals for the datasource are from 22/02/2023 01:00:00 - 23/02/2023 02:00:00, then
// the overlap would be 22/02/2023 01:00:00 - 23/02/2023 02:00:00. When iterating over the overlap we will get
// the intervals from 22/02/2023 - 23/02/2023, and 23/02/2023 - 24/02/2023
- IntervalsByGranularity intervalsToDropByGranularity = new IntervalsByGranularity(
- ImmutableList.of(overlap),
- replaceGranularity
- );
-
- // Helps in deduplication if required. Since all the intervals are uniformly granular, there should be no
- // no overlap post deduplication
- retVal.addAll(Sets.newHashSet(intervalsToDropByGranularity.granularityIntervalsIterator()));
+
+ // If the end is aligned, then we do not alter it, else we align the end by geting the earliest time later
+ // than the overlap's end which aligns with the replace granularity. Using the above-mentioned logic for the
+ // start time, we can also argue that the rounded up end would be contained in the intervalToDrop
+ DateTime alignedIntervalEnd;
+ if (replaceGranularity.bucketStart(overlap.getEnd()).equals(overlap.getEnd())) { // Check if the end is aligned
+ alignedIntervalEnd = overlap.getEnd();
+ } else {
+ alignedIntervalEnd = replaceGranularity.bucketEnd(overlap.getEnd());
+ }
+ Interval alignedTombstoneInterval = new Interval(alignedIntervalStart, alignedIntervalEnd);
+
+ retVal.add(alignedTombstoneInterval);
}
}
return retVal;
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelperTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelperTest.java
index 3450bf2703..c5977efaa8 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelperTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TombstoneHelperTest.java
@@ -147,7 +147,7 @@ public class TombstoneHelperTest
replaceGranularity
);
Assert.assertEquals(
- ImmutableSet.of(Intervals.of("2020-03-05/2020-03-06"), Intervals.of("2020-03-06/2020-03-07")),
+ ImmutableSet.of(Intervals.of("2020-03-05/2020-03-07")),
tombstoneIntervals
);
}
@@ -183,8 +183,7 @@ public class TombstoneHelperTest
Assert.assertEquals(
ImmutableSet.of(
Intervals.of("2020-03-01/2020-04-01"),
- Intervals.of("2020-07-01/2020-08-01"),
- Intervals.of("2020-08-01/2020-09-01")
+ Intervals.of("2020-07-01/2020-09-01")
),
tombstoneIntervals
);
@@ -248,6 +247,128 @@ public class TombstoneHelperTest
Assert.assertEquals(ImmutableSet.of(), tombstoneIntervals);
}
+ @Test
+ public void testTombstoneIntervalsCreatedForReplaceWhenDataLiesOnLeft() throws IOException
+ {
+ Interval usedInterval = Intervals.of("2020-01-01/2020-12-31");
+ Interval replaceInterval = Intervals.of("2020-01-01/2020-12-31");
+ Interval intervalToDrop = Intervals.of("2020-02-01/2020-12-31");
+ Granularity replaceGranularity = Granularities.DAY;
+
+ DataSegment existingUsedSegment =
+ DataSegment.builder()
+ .dataSource("test")
+ .interval(usedInterval)
+ .version("oldVersion")
+ .size(100)
+ .build();
+ Assert.assertFalse(existingUsedSegment.isTombstone());
+ Mockito.when(taskActionClient.submit(any(TaskAction.class)))
+ .thenReturn(Collections.singletonList(existingUsedSegment));
+ TombstoneHelper tombstoneHelper = new TombstoneHelper(taskActionClient);
+
+ Set<Interval> tombstoneIntervals = tombstoneHelper.computeTombstoneIntervalsForReplace(
+ ImmutableList.of(intervalToDrop),
+ ImmutableList.of(replaceInterval),
+ "test",
+ replaceGranularity
+ );
+ Assert.assertEquals(ImmutableSet.of(Intervals.of("2020-02-01/2020-12-31")), tombstoneIntervals);
+ }
+
+ @Test
+ public void testTombstoneIntervalsCreatedForReplaceWhenDataLiesOnRight() throws IOException
+ {
+ Interval usedInterval = Intervals.of("2020-01-01/2020-12-31");
+ Interval replaceInterval = Intervals.of("2020-01-01/2020-12-31");
+ Interval intervalToDrop = Intervals.of("2020-01-01/2020-11-30");
+ Granularity replaceGranularity = Granularities.DAY;
+
+ DataSegment existingUsedSegment =
+ DataSegment.builder()
+ .dataSource("test")
+ .interval(usedInterval)
+ .version("oldVersion")
+ .size(100)
+ .build();
+ Assert.assertFalse(existingUsedSegment.isTombstone());
+ Mockito.when(taskActionClient.submit(any(TaskAction.class)))
+ .thenReturn(Collections.singletonList(existingUsedSegment));
+ TombstoneHelper tombstoneHelper = new TombstoneHelper(taskActionClient);
+
+ Set<Interval> tombstoneIntervals = tombstoneHelper.computeTombstoneIntervalsForReplace(
+ ImmutableList.of(intervalToDrop),
+ ImmutableList.of(replaceInterval),
+ "test",
+ replaceGranularity
+ );
+ Assert.assertEquals(ImmutableSet.of(Intervals.of("2020-01-01/2020-11-30")), tombstoneIntervals);
+ }
+
+ @Test
+ public void testTombstoneIntervalsCreatedForReplaceWhenDataLiesInMiddle() throws IOException
+ {
+ Interval usedInterval = Intervals.of("2020-01-01/2020-12-31");
+ Interval replaceInterval = Intervals.of("2020-01-01/2020-12-31");
+ List<Interval> intervalsToDrop = ImmutableList.of(
+ Intervals.of("2020-01-01/2020-11-30"),
+ Intervals.of("2020-12-05/2020-12-30")
+ );
+ Granularity replaceGranularity = Granularities.DAY;
+
+ DataSegment existingUsedSegment =
+ DataSegment.builder()
+ .dataSource("test")
+ .interval(usedInterval)
+ .version("oldVersion")
+ .size(100)
+ .build();
+ Assert.assertFalse(existingUsedSegment.isTombstone());
+ Mockito.when(taskActionClient.submit(any(TaskAction.class)))
+ .thenReturn(Collections.singletonList(existingUsedSegment));
+ TombstoneHelper tombstoneHelper = new TombstoneHelper(taskActionClient);
+
+ Set<Interval> tombstoneIntervals = tombstoneHelper.computeTombstoneIntervalsForReplace(
+ intervalsToDrop,
+ ImmutableList.of(replaceInterval),
+ "test",
+ replaceGranularity
+ );
+ Assert.assertEquals(
+ ImmutableSet.of(Intervals.of("2020-01-01/2020-11-30"), Intervals.of("2020-12-05/2020-12-30")),
+ tombstoneIntervals
+ );
+ }
+
+ @Test
+ public void testTombstoneIntervalsCreatedForReplaceWhenExistingGranularityIsEternity() throws IOException
+ {
+ Interval usedInterval = Intervals.ETERNITY;
+ Interval replaceInterval = Intervals.ETERNITY;
+ List<Interval> intervalsToDrop = ImmutableList.of(Intervals.of("2020-01-01/2020-11-30"));
+ Granularity replaceGranularity = Granularities.DAY;
+
+ DataSegment existingUsedSegment =
+ DataSegment.builder()
+ .dataSource("test")
+ .interval(usedInterval)
+ .version("oldVersion")
+ .size(100)
+ .build();
+ Assert.assertFalse(existingUsedSegment.isTombstone());
+ Mockito.when(taskActionClient.submit(any(TaskAction.class)))
+ .thenReturn(Collections.singletonList(existingUsedSegment));
+ TombstoneHelper tombstoneHelper = new TombstoneHelper(taskActionClient);
+
+ Set<Interval> tombstoneIntervals = tombstoneHelper.computeTombstoneIntervalsForReplace(
+ intervalsToDrop,
+ ImmutableList.of(replaceInterval),
+ "test",
+ replaceGranularity
+ );
+ Assert.assertEquals(ImmutableSet.of(Intervals.of("2020-01-01/2020-11-30")), tombstoneIntervals);
+ }
+
@Test
public void testTombstoneSegmentsForReplaceWhenLockRevoked() throws IOException
{
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org