You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ma...@apache.org on 2021/03/23 18:22:20 UTC

[druid] branch master updated: Allow overlapping intervals for the compaction task (#10912)

This is an automated email from the ASF dual-hosted git repository.

maytasm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new a041933  Allow overlapping intervals for the compaction task (#10912)
a041933 is described below

commit a04193301732b08977b2085f2f858751ab614141
Author: Jihoon Son <ji...@apache.org>
AuthorDate: Tue Mar 23 11:21:54 2021 -0700

    Allow overlapping intervals for the compaction task (#10912)
    
    * Allow overlapping intervals for the compaction task
    
    * unused import
    
    * line indentation
    
    Co-authored-by: Maytas Monsereenusorn <ma...@apache.org>
---
 .../common/granularity/IntervalsByGranularity.java | 25 +-------
 .../util/common/IntervalsByGranularityTest.java    | 60 ++++++++++--------
 .../druid/indexing/common/task/CompactionTask.java |  6 +-
 .../common/task/CompactionTaskRunTest.java         | 74 ++++++++++++++++++++++
 .../indexing/granularity/BaseGranularitySpec.java  |  9 +--
 5 files changed, 117 insertions(+), 57 deletions(-)

diff --git a/core/src/main/java/org/apache/druid/java/util/common/granularity/IntervalsByGranularity.java b/core/src/main/java/org/apache/druid/java/util/common/granularity/IntervalsByGranularity.java
index 7065535..ff076d4 100644
--- a/core/src/main/java/org/apache/druid/java/util/common/granularity/IntervalsByGranularity.java
+++ b/core/src/main/java/org/apache/druid/java/util/common/granularity/IntervalsByGranularity.java
@@ -23,16 +23,12 @@ import com.google.common.collect.FluentIterable;
 import org.apache.druid.common.guava.SettableSupplier;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.JodaUtils;
-import org.apache.druid.java.util.common.guava.Comparators;
 import org.joda.time.Interval;
 
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Set;
 
 /**
  * Produce a stream of intervals generated by a given set of intervals as input and a given
@@ -51,19 +47,7 @@ public class IntervalsByGranularity
    */
   public IntervalsByGranularity(Collection<Interval> intervals, Granularity granularity)
   {
-    // eliminate dups, sort intervals:
-    Set<Interval> intervalSet = new HashSet<>(intervals);
-    List<Interval> inputIntervals = new ArrayList<>(intervals.size());
-    inputIntervals.addAll(intervalSet);
-    inputIntervals.sort(Comparators.intervalsByStartThenEnd());
-
-    // sanity check
-    if (JodaUtils.containOverlappingIntervals(inputIntervals)) {
-      throw new IAE("Intervals contain overlapping intervals [%s]", intervals);
-    }
-
-    // all good:
-    sortedNonOverlappingIntervals = inputIntervals;
+    this.sortedNonOverlappingIntervals = JodaUtils.condenseIntervals(intervals);
     this.granularity = granularity;
   }
 
@@ -73,9 +57,8 @@ public class IntervalsByGranularity
    */
   public Iterator<Interval> granularityIntervalsIterator()
   {
-    Iterator<Interval> ite;
     if (sortedNonOverlappingIntervals.isEmpty()) {
-      ite = Collections.emptyIterator();
+      return Collections.emptyIterator();
     } else {
       // The filter after transform & concat is to remove duplicats.
       // This can happen when condense left intervals that did not overlap but
@@ -85,7 +68,7 @@ public class IntervalsByGranularity
       // intervals will be returned, both with the same value 2013-01-01T00:00:00.000Z/2013-02-01T00:00:00.000Z.
       // Thus dups can be created given the right conditions....
       final SettableSupplier<Interval> previous = new SettableSupplier<>();
-      ite = FluentIterable.from(sortedNonOverlappingIntervals).transformAndConcat(granularity::getIterable)
+      return FluentIterable.from(sortedNonOverlappingIntervals).transformAndConcat(granularity::getIterable)
                           .filter(interval -> {
                             if (previous.get() != null && previous.get().equals(interval)) {
                               return false;
@@ -94,7 +77,5 @@ public class IntervalsByGranularity
                             return true;
                           }).iterator();
     }
-    return ite;
   }
-
 }
diff --git a/core/src/test/java/org/apache/druid/java/util/common/IntervalsByGranularityTest.java b/core/src/test/java/org/apache/druid/java/util/common/IntervalsByGranularityTest.java
index a38e6d5..ee01aa0 100644
--- a/core/src/test/java/org/apache/druid/java/util/common/IntervalsByGranularityTest.java
+++ b/core/src/test/java/org/apache/druid/java/util/common/IntervalsByGranularityTest.java
@@ -21,11 +21,13 @@ package org.apache.druid.java.util.common;
 
 import com.google.common.collect.ImmutableList;
 import org.apache.druid.java.util.common.granularity.Granularities;
-import org.apache.druid.java.util.common.granularity.Granularity;
 import org.apache.druid.java.util.common.granularity.IntervalsByGranularity;
 import org.joda.time.Interval;
 import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 
 import java.util.Collections;
 import java.util.Iterator;
@@ -34,8 +36,8 @@ import java.util.NoSuchElementException;
 
 public class IntervalsByGranularityTest
 {
-  private static final long SECONDS_IN_YEAR = 31536000;
-
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
 
   @Test
   public void testTrivialIntervalExplosion()
@@ -46,17 +48,17 @@ public class IntervalsByGranularityTest
 
     IntervalsByGranularity intervals = new IntervalsByGranularity(
         ImmutableList.of(first, second, third),
-        Granularity.fromString("DAY")
+        Granularities.DAY
     );
 
     // get count:
     Iterator<Interval> granularityIntervals = intervals.granularityIntervalsIterator();
-    long count = getCount(granularityIntervals);
-    Assert.assertTrue(count == 62 + 365);
+    long count = verifyIteratorAndReturnIntervalCount(granularityIntervals);
+    Assert.assertEquals(62 + 365, count);
 
     granularityIntervals = intervals.granularityIntervalsIterator();
     count = getCountWithNoHasNext(granularityIntervals);
-    Assert.assertTrue(count == 62 + 365);
+    Assert.assertEquals(62 + 365, count);
   }
 
 
@@ -69,13 +71,13 @@ public class IntervalsByGranularityTest
 
     IntervalsByGranularity intervals = new IntervalsByGranularity(
         ImmutableList.of(first, second, third),
-        Granularity.fromString("DAY")
+        Granularities.DAY
     );
 
     // get count:
     Iterator<Interval> granularityIntervals = intervals.granularityIntervalsIterator();
-    long count = getCount(granularityIntervals);
-    Assert.assertTrue(count == 61);
+    long count = verifyIteratorAndReturnIntervalCount(granularityIntervals);
+    Assert.assertEquals(61, count);
   }
 
 
@@ -88,7 +90,7 @@ public class IntervalsByGranularityTest
     Interval first = Intervals.of("2012-01-01T00Z/P1Y");
     IntervalsByGranularity intervals = new IntervalsByGranularity(
         ImmutableList.of(first),
-        Granularity.fromString("SECOND")
+        Granularities.SECOND
     );
     Assert.assertEquals(
         ImmutableList.of(Intervals.of("2012-01-01T00Z/2013-01-01T00Z")),
@@ -96,22 +98,29 @@ public class IntervalsByGranularityTest
     );
   }
 
+  /**
+   * This test iterates huge intervals (2.5 years) with the SECOND granularity.
+   * The motivation behind this test is ensuring that IntervalsByGranularity can handle
+   * these huge intervals with a tiny granularity. However, this test takes a long time
+   * to populate all intervals based on the SECOND granularity (more than 1 min), so
+   * is ignored by default. We should make this test not a unit test, but a load test.
+   */
+  @Ignore
   @Test
-  public void testIntervalExplosion()
+  public void testIterateHugeIntervalsWithTinyGranularity()
   {
     Interval first = Intervals.of("2012-01-01T00Z/2012-12-31T00Z");
     Interval second = Intervals.of("2002-01-01T00Z/2002-12-31T00Z");
     Interval third = Intervals.of("2021-01-01T00Z/2021-06-30T00Z");
     IntervalsByGranularity intervals = new IntervalsByGranularity(
         ImmutableList.of(first, second, third),
-        Granularity.fromString("SECOND")
+        Granularities.SECOND
     );
 
     // get count:
     Iterator<Interval> granularityIntervals = intervals.granularityIntervalsIterator();
-    long count = getCount(granularityIntervals);
-    Assert.assertTrue(count == 78537600);
-
+    long count = verifyIteratorAndReturnIntervalCount(granularityIntervals);
+    Assert.assertEquals(78537600, count);
   }
 
   @Test
@@ -132,7 +141,6 @@ public class IntervalsByGranularityTest
         ImmutableList.of(Intervals.of("2012-01-01T00Z/2012-02-01T00Z")),
         ImmutableList.copyOf(intervals.granularityIntervalsIterator())
     );
-
   }
 
   @Test
@@ -160,10 +168,9 @@ public class IntervalsByGranularityTest
         ),
         ImmutableList.copyOf(intervals.granularityIntervalsIterator())
     );
-
   }
 
-  @Test(expected = IAE.class)
+  @Test
   public void testOverlappingShouldThrow()
   {
     List<Interval> inputIntervals = ImmutableList.of(
@@ -174,10 +181,13 @@ public class IntervalsByGranularityTest
 
     IntervalsByGranularity intervals = new IntervalsByGranularity(
         inputIntervals,
-        Granularity.fromString("DAY")
+        Granularities.DAY
     );
-  }
 
+    Iterator<Interval> granularityIntervals = intervals.granularityIntervalsIterator();
+    long count = verifyIteratorAndReturnIntervalCount(granularityIntervals);
+    Assert.assertEquals(14, count);
+  }
 
   @Test
   public void testWithGranularity()
@@ -190,13 +200,13 @@ public class IntervalsByGranularityTest
 
     IntervalsByGranularity intervals = new IntervalsByGranularity(
         inputIntervals,
-        Granularity.fromString("MONTH")
+        Granularities.MONTH
     );
 
     // get count:
     Iterator<Interval> granularityIntervals = intervals.granularityIntervalsIterator();
-    long count = getCount(granularityIntervals);
-    Assert.assertTrue(count == 2);
+    long count = verifyIteratorAndReturnIntervalCount(granularityIntervals);
+    Assert.assertEquals(2, count);
   }
 
   @Test(expected = UnsupportedOperationException.class)
@@ -223,7 +233,7 @@ public class IntervalsByGranularityTest
     Assert.assertFalse(intervals.granularityIntervalsIterator().hasNext());
   }
 
-  private long getCount(Iterator<Interval> granularityIntervalIterator)
+  private long verifyIteratorAndReturnIntervalCount(Iterator<Interval> granularityIntervalIterator)
   {
     long count = 0;
     Interval previous = null;
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
index bdf250a..8234c35 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
@@ -22,6 +22,8 @@ package org.apache.druid.indexing.common.task;
 import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonInclude.Include;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
@@ -142,8 +144,6 @@ public class CompactionTask extends AbstractBatchIndexTask
   @Nullable
   private final AggregatorFactory[] metricsSpec;
   @Nullable
-  private final Granularity segmentGranularity;
-  @Nullable
   private final ClientCompactionTaskGranularitySpec granularitySpec;
   @Nullable
   private final ParallelIndexTuningConfig tuningConfig;
@@ -207,7 +207,6 @@ public class CompactionTask extends AbstractBatchIndexTask
 
     this.dimensionsSpec = dimensionsSpec == null ? dimensions : dimensionsSpec;
     this.metricsSpec = metricsSpec;
-    this.segmentGranularity = segmentGranularity;
     // Prior to apache/druid#10843 users could specify segmentGranularity using `segmentGranularity`
     // Now users should prefer to use `granularitySpec`
     // In case users accidentally specify both, and they are conflicting, warn the user instead of proceeding
@@ -308,6 +307,7 @@ public class CompactionTask extends AbstractBatchIndexTask
     return metricsSpec;
   }
 
+  @JsonInclude(Include.NON_NULL)
   @JsonProperty
   @Nullable
   @Override
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
index 95f0aaf..f3ee50c 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
@@ -56,6 +56,7 @@ import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.java.util.common.guava.Comparators;
 import org.apache.druid.java.util.common.guava.Sequence;
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
@@ -799,6 +800,79 @@ public class CompactionTaskRunTest extends IngestionTestBase
   }
 
   @Test
+  public void testPartialIntervalCompactWithFinerSegmentGranularityThenFullIntervalCompact() throws Exception
+  {
+    // This test fails with segment lock because of the bug reported in https://github.com/apache/druid/issues/10911.
+    if (lockGranularity == LockGranularity.SEGMENT) {
+      return;
+    }
+
+    runIndexTask();
+
+    final Set<DataSegment> expectedSegments = new HashSet<>(
+        getStorageCoordinator().retrieveUsedSegmentsForIntervals(
+            DATA_SOURCE,
+            Collections.singletonList(Intervals.of("2014-01-01/2014-01-02")),
+            Segments.ONLY_VISIBLE
+        )
+    );
+
+    final Builder builder = new Builder(
+        DATA_SOURCE,
+        segmentLoaderFactory,
+        RETRY_POLICY_FACTORY
+    );
+
+    final Interval partialInterval = Intervals.of("2014-01-01T01:00:00/2014-01-01T02:00:00");
+    final CompactionTask partialCompactionTask = builder
+        .interval(partialInterval)
+        .segmentGranularity(Granularities.MINUTE)
+        .build();
+
+    final Pair<TaskStatus, List<DataSegment>> partialCompactionResult = runTask(partialCompactionTask);
+    Assert.assertTrue(partialCompactionResult.lhs.isSuccess());
+    // All segments in the previous expectedSegments should still appear as they have larger segment granularity.
+    expectedSegments.addAll(partialCompactionResult.rhs);
+
+    final Set<DataSegment> segmentsAfterPartialCompaction = new HashSet<>(
+        getStorageCoordinator().retrieveUsedSegmentsForIntervals(
+            DATA_SOURCE,
+            Collections.singletonList(Intervals.of("2014-01-01/2014-01-02")),
+            Segments.ONLY_VISIBLE
+        )
+    );
+
+    Assert.assertEquals(expectedSegments, segmentsAfterPartialCompaction);
+
+    final CompactionTask fullCompactionTask = builder
+        .interval(Intervals.of("2014-01-01/2014-01-02"))
+        .segmentGranularity(null)
+        .build();
+
+    final Pair<TaskStatus, List<DataSegment>> fullCompactionResult = runTask(fullCompactionTask);
+    Assert.assertTrue(fullCompactionResult.lhs.isSuccess());
+
+    final List<DataSegment> segmentsAfterFullCompaction = new ArrayList<>(
+        getStorageCoordinator().retrieveUsedSegmentsForIntervals(
+            DATA_SOURCE,
+            Collections.singletonList(Intervals.of("2014-01-01/2014-01-02")),
+            Segments.ONLY_VISIBLE
+        )
+    );
+    segmentsAfterFullCompaction.sort(
+        (s1, s2) -> Comparators.intervalsByStartThenEnd().compare(s1.getInterval(), s2.getInterval())
+    );
+
+    Assert.assertEquals(3, segmentsAfterFullCompaction.size());
+    for (int i = 0; i < segmentsAfterFullCompaction.size(); i++) {
+      Assert.assertEquals(
+          Intervals.of(StringUtils.format("2014-01-01T%02d/2014-01-01T%02d", i, i + 1)),
+          segmentsAfterFullCompaction.get(i).getInterval()
+      );
+    }
+  }
+
+  @Test
   public void testRunIndexAndCompactForSameSegmentAtTheSameTime() throws Exception
   {
     runIndexTask();
diff --git a/server/src/main/java/org/apache/druid/segment/indexing/granularity/BaseGranularitySpec.java b/server/src/main/java/org/apache/druid/segment/indexing/granularity/BaseGranularitySpec.java
index 779952b..ac9fdca 100644
--- a/server/src/main/java/org/apache/druid/segment/indexing/granularity/BaseGranularitySpec.java
+++ b/server/src/main/java/org/apache/druid/segment/indexing/granularity/BaseGranularitySpec.java
@@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterators;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.granularity.Granularities;
@@ -44,16 +43,12 @@ public abstract class BaseGranularitySpec implements GranularitySpec
   public static final Granularity DEFAULT_SEGMENT_GRANULARITY = Granularities.DAY;
   public static final Granularity DEFAULT_QUERY_GRANULARITY = Granularities.NONE;
 
-  protected List<Interval> inputIntervals;
+  protected final List<Interval> inputIntervals;
   protected final Boolean rollup;
 
   public BaseGranularitySpec(List<Interval> inputIntervals, Boolean rollup)
   {
-    if (inputIntervals != null) {
-      this.inputIntervals = ImmutableList.copyOf(inputIntervals);
-    } else {
-      this.inputIntervals = Collections.emptyList();
-    }
+    this.inputIntervals = inputIntervals == null ? Collections.emptyList() : inputIntervals;
     this.rollup = rollup == null ? DEFAULT_ROLLUP : rollup;
   }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org