You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ma...@apache.org on 2021/03/23 18:22:20 UTC
[druid] branch master updated: Allow overlapping intervals for the
compaction task (#10912)
This is an automated email from the ASF dual-hosted git repository.
maytasm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new a041933 Allow overlapping intervals for the compaction task (#10912)
a041933 is described below
commit a04193301732b08977b2085f2f858751ab614141
Author: Jihoon Son <ji...@apache.org>
AuthorDate: Tue Mar 23 11:21:54 2021 -0700
Allow overlapping intervals for the compaction task (#10912)
* Allow overlapping intervals for the compaction task
* unused import
* line indentation
Co-authored-by: Maytas Monsereenusorn <ma...@apache.org>
---
.../common/granularity/IntervalsByGranularity.java | 25 +-------
.../util/common/IntervalsByGranularityTest.java | 60 ++++++++++--------
.../druid/indexing/common/task/CompactionTask.java | 6 +-
.../common/task/CompactionTaskRunTest.java | 74 ++++++++++++++++++++++
.../indexing/granularity/BaseGranularitySpec.java | 9 +--
5 files changed, 117 insertions(+), 57 deletions(-)
diff --git a/core/src/main/java/org/apache/druid/java/util/common/granularity/IntervalsByGranularity.java b/core/src/main/java/org/apache/druid/java/util/common/granularity/IntervalsByGranularity.java
index 7065535..ff076d4 100644
--- a/core/src/main/java/org/apache/druid/java/util/common/granularity/IntervalsByGranularity.java
+++ b/core/src/main/java/org/apache/druid/java/util/common/granularity/IntervalsByGranularity.java
@@ -23,16 +23,12 @@ import com.google.common.collect.FluentIterable;
import org.apache.druid.common.guava.SettableSupplier;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.JodaUtils;
-import org.apache.druid.java.util.common.guava.Comparators;
import org.joda.time.Interval;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
-import java.util.Set;
/**
* Produce a stream of intervals generated by a given set of intervals as input and a given
@@ -51,19 +47,7 @@ public class IntervalsByGranularity
*/
public IntervalsByGranularity(Collection<Interval> intervals, Granularity granularity)
{
- // eliminate dups, sort intervals:
- Set<Interval> intervalSet = new HashSet<>(intervals);
- List<Interval> inputIntervals = new ArrayList<>(intervals.size());
- inputIntervals.addAll(intervalSet);
- inputIntervals.sort(Comparators.intervalsByStartThenEnd());
-
- // sanity check
- if (JodaUtils.containOverlappingIntervals(inputIntervals)) {
- throw new IAE("Intervals contain overlapping intervals [%s]", intervals);
- }
-
- // all good:
- sortedNonOverlappingIntervals = inputIntervals;
+ this.sortedNonOverlappingIntervals = JodaUtils.condenseIntervals(intervals);
this.granularity = granularity;
}
@@ -73,9 +57,8 @@ public class IntervalsByGranularity
*/
public Iterator<Interval> granularityIntervalsIterator()
{
- Iterator<Interval> ite;
if (sortedNonOverlappingIntervals.isEmpty()) {
- ite = Collections.emptyIterator();
+ return Collections.emptyIterator();
} else {
// The filter after transform & concat is to remove duplicats.
// This can happen when condense left intervals that did not overlap but
@@ -85,7 +68,7 @@ public class IntervalsByGranularity
// intervals will be returned, both with the same value 2013-01-01T00:00:00.000Z/2013-02-01T00:00:00.000Z.
// Thus dups can be created given the right conditions....
final SettableSupplier<Interval> previous = new SettableSupplier<>();
- ite = FluentIterable.from(sortedNonOverlappingIntervals).transformAndConcat(granularity::getIterable)
+ return FluentIterable.from(sortedNonOverlappingIntervals).transformAndConcat(granularity::getIterable)
.filter(interval -> {
if (previous.get() != null && previous.get().equals(interval)) {
return false;
@@ -94,7 +77,5 @@ public class IntervalsByGranularity
return true;
}).iterator();
}
- return ite;
}
-
}
diff --git a/core/src/test/java/org/apache/druid/java/util/common/IntervalsByGranularityTest.java b/core/src/test/java/org/apache/druid/java/util/common/IntervalsByGranularityTest.java
index a38e6d5..ee01aa0 100644
--- a/core/src/test/java/org/apache/druid/java/util/common/IntervalsByGranularityTest.java
+++ b/core/src/test/java/org/apache/druid/java/util/common/IntervalsByGranularityTest.java
@@ -21,11 +21,13 @@ package org.apache.druid.java.util.common;
import com.google.common.collect.ImmutableList;
import org.apache.druid.java.util.common.granularity.Granularities;
-import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.granularity.IntervalsByGranularity;
import org.joda.time.Interval;
import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
import java.util.Collections;
import java.util.Iterator;
@@ -34,8 +36,8 @@ import java.util.NoSuchElementException;
public class IntervalsByGranularityTest
{
- private static final long SECONDS_IN_YEAR = 31536000;
-
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
@Test
public void testTrivialIntervalExplosion()
@@ -46,17 +48,17 @@ public class IntervalsByGranularityTest
IntervalsByGranularity intervals = new IntervalsByGranularity(
ImmutableList.of(first, second, third),
- Granularity.fromString("DAY")
+ Granularities.DAY
);
// get count:
Iterator<Interval> granularityIntervals = intervals.granularityIntervalsIterator();
- long count = getCount(granularityIntervals);
- Assert.assertTrue(count == 62 + 365);
+ long count = verifyIteratorAndReturnIntervalCount(granularityIntervals);
+ Assert.assertEquals(62 + 365, count);
granularityIntervals = intervals.granularityIntervalsIterator();
count = getCountWithNoHasNext(granularityIntervals);
- Assert.assertTrue(count == 62 + 365);
+ Assert.assertEquals(62 + 365, count);
}
@@ -69,13 +71,13 @@ public class IntervalsByGranularityTest
IntervalsByGranularity intervals = new IntervalsByGranularity(
ImmutableList.of(first, second, third),
- Granularity.fromString("DAY")
+ Granularities.DAY
);
// get count:
Iterator<Interval> granularityIntervals = intervals.granularityIntervalsIterator();
- long count = getCount(granularityIntervals);
- Assert.assertTrue(count == 61);
+ long count = verifyIteratorAndReturnIntervalCount(granularityIntervals);
+ Assert.assertEquals(61, count);
}
@@ -88,7 +90,7 @@ public class IntervalsByGranularityTest
Interval first = Intervals.of("2012-01-01T00Z/P1Y");
IntervalsByGranularity intervals = new IntervalsByGranularity(
ImmutableList.of(first),
- Granularity.fromString("SECOND")
+ Granularities.SECOND
);
Assert.assertEquals(
ImmutableList.of(Intervals.of("2012-01-01T00Z/2013-01-01T00Z")),
@@ -96,22 +98,29 @@ public class IntervalsByGranularityTest
);
}
+ /**
+ * This test iterates huge intervals (2.5 years) with the SECOND granularity.
+ * The motivation behind this test is ensuring that IntervalsByGranularity can handle
+ * these huge intervals with a tiny granularity. However, this test takes a long time
+ * to populate all intervals based on the SECOND granularity (more than 1 min), so
+ * is ignored by default. We should make this test not a unit test, but a load test.
+ */
+ @Ignore
@Test
- public void testIntervalExplosion()
+ public void testIterateHugeIntervalsWithTinyGranularity()
{
Interval first = Intervals.of("2012-01-01T00Z/2012-12-31T00Z");
Interval second = Intervals.of("2002-01-01T00Z/2002-12-31T00Z");
Interval third = Intervals.of("2021-01-01T00Z/2021-06-30T00Z");
IntervalsByGranularity intervals = new IntervalsByGranularity(
ImmutableList.of(first, second, third),
- Granularity.fromString("SECOND")
+ Granularities.SECOND
);
// get count:
Iterator<Interval> granularityIntervals = intervals.granularityIntervalsIterator();
- long count = getCount(granularityIntervals);
- Assert.assertTrue(count == 78537600);
-
+ long count = verifyIteratorAndReturnIntervalCount(granularityIntervals);
+ Assert.assertEquals(78537600, count);
}
@Test
@@ -132,7 +141,6 @@ public class IntervalsByGranularityTest
ImmutableList.of(Intervals.of("2012-01-01T00Z/2012-02-01T00Z")),
ImmutableList.copyOf(intervals.granularityIntervalsIterator())
);
-
}
@Test
@@ -160,10 +168,9 @@ public class IntervalsByGranularityTest
),
ImmutableList.copyOf(intervals.granularityIntervalsIterator())
);
-
}
- @Test(expected = IAE.class)
+ @Test
public void testOverlappingShouldThrow()
{
List<Interval> inputIntervals = ImmutableList.of(
@@ -174,10 +181,13 @@ public class IntervalsByGranularityTest
IntervalsByGranularity intervals = new IntervalsByGranularity(
inputIntervals,
- Granularity.fromString("DAY")
+ Granularities.DAY
);
- }
+ Iterator<Interval> granularityIntervals = intervals.granularityIntervalsIterator();
+ long count = verifyIteratorAndReturnIntervalCount(granularityIntervals);
+ Assert.assertEquals(14, count);
+ }
@Test
public void testWithGranularity()
@@ -190,13 +200,13 @@ public class IntervalsByGranularityTest
IntervalsByGranularity intervals = new IntervalsByGranularity(
inputIntervals,
- Granularity.fromString("MONTH")
+ Granularities.MONTH
);
// get count:
Iterator<Interval> granularityIntervals = intervals.granularityIntervalsIterator();
- long count = getCount(granularityIntervals);
- Assert.assertTrue(count == 2);
+ long count = verifyIteratorAndReturnIntervalCount(granularityIntervals);
+ Assert.assertEquals(2, count);
}
@Test(expected = UnsupportedOperationException.class)
@@ -223,7 +233,7 @@ public class IntervalsByGranularityTest
Assert.assertFalse(intervals.granularityIntervalsIterator().hasNext());
}
- private long getCount(Iterator<Interval> granularityIntervalIterator)
+ private long verifyIteratorAndReturnIntervalCount(Iterator<Interval> granularityIntervalIterator)
{
long count = 0;
Interval previous = null;
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
index bdf250a..8234c35 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
@@ -22,6 +22,8 @@ package org.apache.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonInclude.Include;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
@@ -142,8 +144,6 @@ public class CompactionTask extends AbstractBatchIndexTask
@Nullable
private final AggregatorFactory[] metricsSpec;
@Nullable
- private final Granularity segmentGranularity;
- @Nullable
private final ClientCompactionTaskGranularitySpec granularitySpec;
@Nullable
private final ParallelIndexTuningConfig tuningConfig;
@@ -207,7 +207,6 @@ public class CompactionTask extends AbstractBatchIndexTask
this.dimensionsSpec = dimensionsSpec == null ? dimensions : dimensionsSpec;
this.metricsSpec = metricsSpec;
- this.segmentGranularity = segmentGranularity;
// Prior to apache/druid#10843 users could specify segmentGranularity using `segmentGranularity`
// Now users should prefer to use `granularitySpec`
// In case users accidentally specify both, and they are conflicting, warn the user instead of proceeding
@@ -308,6 +307,7 @@ public class CompactionTask extends AbstractBatchIndexTask
return metricsSpec;
}
+ @JsonInclude(Include.NON_NULL)
@JsonProperty
@Nullable
@Override
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
index 95f0aaf..f3ee50c 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
@@ -56,6 +56,7 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
@@ -799,6 +800,79 @@ public class CompactionTaskRunTest extends IngestionTestBase
}
@Test
+ public void testPartialIntervalCompactWithFinerSegmentGranularityThenFullIntervalCompact() throws Exception
+ {
+ // This test fails with segment lock because of the bug reported in https://github.com/apache/druid/issues/10911.
+ if (lockGranularity == LockGranularity.SEGMENT) {
+ return;
+ }
+
+ runIndexTask();
+
+ final Set<DataSegment> expectedSegments = new HashSet<>(
+ getStorageCoordinator().retrieveUsedSegmentsForIntervals(
+ DATA_SOURCE,
+ Collections.singletonList(Intervals.of("2014-01-01/2014-01-02")),
+ Segments.ONLY_VISIBLE
+ )
+ );
+
+ final Builder builder = new Builder(
+ DATA_SOURCE,
+ segmentLoaderFactory,
+ RETRY_POLICY_FACTORY
+ );
+
+ final Interval partialInterval = Intervals.of("2014-01-01T01:00:00/2014-01-01T02:00:00");
+ final CompactionTask partialCompactionTask = builder
+ .interval(partialInterval)
+ .segmentGranularity(Granularities.MINUTE)
+ .build();
+
+ final Pair<TaskStatus, List<DataSegment>> partialCompactionResult = runTask(partialCompactionTask);
+ Assert.assertTrue(partialCompactionResult.lhs.isSuccess());
+ // All segments in the previous expectedSegments should still appear as they have larger segment granularity.
+ expectedSegments.addAll(partialCompactionResult.rhs);
+
+ final Set<DataSegment> segmentsAfterPartialCompaction = new HashSet<>(
+ getStorageCoordinator().retrieveUsedSegmentsForIntervals(
+ DATA_SOURCE,
+ Collections.singletonList(Intervals.of("2014-01-01/2014-01-02")),
+ Segments.ONLY_VISIBLE
+ )
+ );
+
+ Assert.assertEquals(expectedSegments, segmentsAfterPartialCompaction);
+
+ final CompactionTask fullCompactionTask = builder
+ .interval(Intervals.of("2014-01-01/2014-01-02"))
+ .segmentGranularity(null)
+ .build();
+
+ final Pair<TaskStatus, List<DataSegment>> fullCompactionResult = runTask(fullCompactionTask);
+ Assert.assertTrue(fullCompactionResult.lhs.isSuccess());
+
+ final List<DataSegment> segmentsAfterFullCompaction = new ArrayList<>(
+ getStorageCoordinator().retrieveUsedSegmentsForIntervals(
+ DATA_SOURCE,
+ Collections.singletonList(Intervals.of("2014-01-01/2014-01-02")),
+ Segments.ONLY_VISIBLE
+ )
+ );
+ segmentsAfterFullCompaction.sort(
+ (s1, s2) -> Comparators.intervalsByStartThenEnd().compare(s1.getInterval(), s2.getInterval())
+ );
+
+ Assert.assertEquals(3, segmentsAfterFullCompaction.size());
+ for (int i = 0; i < segmentsAfterFullCompaction.size(); i++) {
+ Assert.assertEquals(
+ Intervals.of(StringUtils.format("2014-01-01T%02d/2014-01-01T%02d", i, i + 1)),
+ segmentsAfterFullCompaction.get(i).getInterval()
+ );
+ }
+ }
+
+ @Test
public void testRunIndexAndCompactForSameSegmentAtTheSameTime() throws Exception
{
runIndexTask();
diff --git a/server/src/main/java/org/apache/druid/segment/indexing/granularity/BaseGranularitySpec.java b/server/src/main/java/org/apache/druid/segment/indexing/granularity/BaseGranularitySpec.java
index 779952b..ac9fdca 100644
--- a/server/src/main/java/org/apache/druid/segment/indexing/granularity/BaseGranularitySpec.java
+++ b/server/src/main/java/org/apache/druid/segment/indexing/granularity/BaseGranularitySpec.java
@@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.granularity.Granularities;
@@ -44,16 +43,12 @@ public abstract class BaseGranularitySpec implements GranularitySpec
public static final Granularity DEFAULT_SEGMENT_GRANULARITY = Granularities.DAY;
public static final Granularity DEFAULT_QUERY_GRANULARITY = Granularities.NONE;
- protected List<Interval> inputIntervals;
+ protected final List<Interval> inputIntervals;
protected final Boolean rollup;
public BaseGranularitySpec(List<Interval> inputIntervals, Boolean rollup)
{
- if (inputIntervals != null) {
- this.inputIntervals = ImmutableList.copyOf(inputIntervals);
- } else {
- this.inputIntervals = Collections.emptyList();
- }
+ this.inputIntervals = inputIntervals == null ? Collections.emptyList() : inputIntervals;
this.rollup = rollup == null ? DEFAULT_ROLLUP : rollup;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org