You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by su...@apache.org on 2021/01/29 14:02:39 UTC

[druid] branch master updated: Granularity interval materialization (#10742)

This is an automated email from the ASF dual-hosted git repository.

suneet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 0e4750b  Granularity interval materialization (#10742)
0e4750b is described below

commit 0e4750bac208d99f4a07545cc2f401f9bcdc1381
Author: Agustin Gonzalez <ag...@imply.io>
AuthorDate: Fri Jan 29 07:02:10 2021 -0700

    Granularity interval materialization (#10742)
    
    * Prevent interval materialization for UniformGranularitySpec inside the overlord
    
    * Change API of bucketIntervals in GranularitySpec to return an Iterable<Interval>
    
    * Javadoc update, respect inputIntervals contract
    
    * Eliminate dependency on wrappedspec (i.e. ArbitraryGranularity) in UniformGranularitySpec
    
    * Added one boundary condition test to UniformGranularityTest and fixed Travis forbidden method errors in IntervalsByGranularity
    
    * Fix Travis style & other checks
    
    * Refactor TreeSet to facilitate re-use in UniformGranularitySpec
    
    * Make sure intervals are unique when there is no segment granularity
    
    * Style/bugspot fixes...
    
    * More travis checks
    
    * Add condensedIntervals method to GranularitySpec and pass it as needed to the lock method
    
    * Style & PR feedback
    
    * Fixed failing test
    
    * Fixed bug in IntervalsByGranularity iterator that it would return repeated elements (see added unit tests that were broken before this change)
    
    * Refactor so that we can get the condensed buckets without materializing the intervals
    
    * Get rid of GranularitySpec::condensedInputIntervals ... not needed
    
    * Travis failures fixes
    
    * Travis checkstyle fix
    
    * Edited/added javadoc comments and a method name (code review feedback)
    
    * Fixed jacoco coverage by moving class and adding more coverage
    
    * Avoid materializing the condensed intervals when locking
    
    * Deal with overlapping intervals
    
    * Remove code and use library code instead
    
    * Refactor intervals by granularity using the FluentIterable, add sanity checks
    
    * Change !hasNext() to inputIntervals().isEmpty()
    
    * Remove redundant lambda
    
    * Use materialized intervals here since this is outside the overlord (for performance)
    
    * Name refactor to reflect the fact that bucket intervals are sorted.
    
    * Style fixes
    
    * Removed redundant method and have condensedIntervalIterator throw IAE when element is null for consistency with other methods in this class (as well that null interval when condensing does not make sense)
    
    * Remove forbidden api
    
    * Move helper class inside common base class to reduce public space pollution
---
 .../apache/druid/java/util/common/JodaUtils.java   | 151 ++++++++++--
 .../common/granularity/IntervalsByGranularity.java | 100 ++++++++
 .../apache/druid/common/utils/JodaUtilsTest.java   | 199 ++++++++++++++-
 .../util/common/IntervalsByGranularityTest.java    | 266 +++++++++++++++++++++
 .../indexer/DetermineHashedPartitionsJob.java      |  26 +-
 .../druid/indexer/DeterminePartitionsJob.java      |   6 +-
 .../HadoopDruidDetermineConfigurationJob.java      |   3 +-
 .../druid/indexer/HadoopDruidIndexerConfig.java    |  30 ++-
 .../druid/indexer/HadoopDruidIndexerMapper.java    |   2 +-
 .../HadoopDruidDetermineConfigurationJobTest.java  |   5 +-
 .../druid/indexer/HadoopIngestionSpecTest.java     |   2 +-
 .../druid/indexer/IndexGeneratorJobTest.java       |   2 +-
 .../common/task/AbstractBatchIndexTask.java        |  70 +++---
 .../indexing/common/task/HadoopIndexTask.java      |  21 +-
 .../druid/indexing/common/task/IndexTask.java      |  18 +-
 .../parallel/ParallelIndexSupervisorTask.java      |  50 ++--
 .../parallel/PartialDimensionDistributionTask.java |   5 +-
 .../parallel/PartialHashSegmentGenerateTask.java   |   3 +-
 .../task/batch/parallel/SinglePhaseSubTask.java    |  16 +-
 .../granularity/ArbitraryGranularitySpec.java      |  70 ++----
 .../indexing/granularity/BaseGranularitySpec.java  | 140 +++++++++++
 .../indexing/granularity/GranularitySpec.java      |  19 +-
 .../granularity/UniformGranularitySpec.java        |  81 ++-----
 .../granularity/ArbitraryGranularityTest.java      |  48 +++-
 .../granularity/UniformGranularityTest.java        |  77 ++++--
 25 files changed, 1107 insertions(+), 303 deletions(-)

diff --git a/core/src/main/java/org/apache/druid/java/util/common/JodaUtils.java b/core/src/main/java/org/apache/druid/java/util/common/JodaUtils.java
index a225693..96c49d9 100644
--- a/core/src/main/java/org/apache/druid/java/util/common/JodaUtils.java
+++ b/core/src/main/java/org/apache/druid/java/util/common/JodaUtils.java
@@ -19,16 +19,23 @@
 
 package org.apache.druid.java.util.common;
 
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
 import org.apache.druid.java.util.common.guava.Comparators;
 import org.joda.time.DateTime;
 import org.joda.time.Interval;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
 import java.util.SortedSet;
 import java.util.TreeSet;
 
 /**
+ *
  */
 public class JodaUtils
 {
@@ -36,10 +43,17 @@ public class JodaUtils
   public static final long MAX_INSTANT = Long.MAX_VALUE / 2;
   public static final long MIN_INSTANT = Long.MIN_VALUE / 2;
 
-  public static ArrayList<Interval> condenseIntervals(Iterable<Interval> intervals)
+  /**
+   * This method will not materialize the input intervals if they represent
+   * a SortedSet (i.e. implement that interface). If not, the method internally
+   * creates a sorted set and populates it with them thus materializing the
+   * intervals in the input.
+   *
+   * @param intervals The Iterable object containing the intervals to condense
+   * @return The condensed intervals
+   */
+  public static List<Interval> condenseIntervals(Iterable<Interval> intervals)
   {
-    ArrayList<Interval> retVal = new ArrayList<>();
-
     final SortedSet<Interval> sortedIntervals;
 
     if (intervals instanceof SortedSet) {
@@ -50,35 +64,122 @@ public class JodaUtils
         sortedIntervals.add(interval);
       }
     }
+    return ImmutableList.copyOf(condensedIntervalsIterator(sortedIntervals.iterator()));
+  }
 
-    if (sortedIntervals.isEmpty()) {
-      return new ArrayList<>();
+  /**
+   * This method does not materialize the intervals represented by the
+   * sortedIntervals iterator. However, caller needs to insure that sortedIntervals
+   * is already sorted in ascending order (use the Comparators.intervalsByStartThenEnd()).
+   * It avoids materialization by incrementally condensing the intervals by
+   * starting from the first and looking for "adjacent" intervals. This is
+   * possible since intervals in the Iterator are in ascending order (as
+   * guaranteed by the caller).
+   * <p>
+   * *
+   *
+   * @param sortedIntervals The iterator object containing the intervals to condense
+   * @return An iterator for the condensed intervals. By construction the condensed intervals are sorted
+   * in ascending order and contain no repeated elements. The iterator can contain nulls,
+   * they will be skipped if it does.
+   * @throws IAE if an element is null or if sortedIntervals is not sorted in ascending order
+   */
+  public static Iterator<Interval> condensedIntervalsIterator(Iterator<Interval> sortedIntervals)
+  {
+
+    if (sortedIntervals == null || !sortedIntervals.hasNext()) {
+      return Collections.emptyIterator();
     }
 
-    Iterator<Interval> intervalsIter = sortedIntervals.iterator();
-    Interval currInterval = intervalsIter.next();
-    while (intervalsIter.hasNext()) {
-      Interval next = intervalsIter.next();
-
-      if (currInterval.abuts(next)) {
-        currInterval = new Interval(currInterval.getStart(), next.getEnd());
-      } else if (currInterval.overlaps(next)) {
-        DateTime nextEnd = next.getEnd();
-        DateTime currEnd = currInterval.getEnd();
-        currInterval = new Interval(
-            currInterval.getStart(),
-            nextEnd.isAfter(currEnd) ? nextEnd : currEnd
-        );
-      } else {
-        retVal.add(currInterval);
-        currInterval = next;
+    final PeekingIterator<Interval> peekingIterator = Iterators.peekingIterator(sortedIntervals);
+    return new Iterator<Interval>()
+    {
+      private Interval previous;
+
+      @Override
+      public boolean hasNext()
+      {
+        return peekingIterator.hasNext();
       }
-    }
-    retVal.add(currInterval);
 
+      @Override
+      public Interval next()
+      {
+        if (!hasNext()) {
+          throw new NoSuchElementException();
+        }
+
+        Interval currInterval = peekingIterator.next();
+        if (currInterval == null) {
+          throw new IAE("Element of intervals is null");
+        }
+
+        // check sorted ascending:
+        verifyAscendingSortOrder(previous, currInterval);
+
+        previous = currInterval;
+
+        while (hasNext()) {
+          Interval next = peekingIterator.peek();
+          if (next == null) {
+            throw new IAE("Element of intervals is null");
+          }
+
+          if (currInterval.abuts(next)) {
+            currInterval = new Interval(currInterval.getStart(), next.getEnd());
+            peekingIterator.next();
+          } else if (currInterval.overlaps(next)) {
+            DateTime nextEnd = next.getEnd();
+            DateTime currEnd = currInterval.getEnd();
+            currInterval = new Interval(
+                currInterval.getStart(),
+                nextEnd.isAfter(currEnd) ? nextEnd : currEnd
+            );
+            peekingIterator.next();
+          } else {
+            break;
+          }
+        }
+        return currInterval;
+      }
+    };
+  }
+
+  /**
+   * Verify whether an iterable of intervals contains overlapping intervals
+   *
+   * @param intervals An interval iterable sorted using Comparators.intervalsByStartThenEnd()
+   * @return true if the iterable contains at least two overlapping intervals, false otherwise.
+   * @throws IAE when at least an element is null or when the iterable is not sorted
+   */
+  public static boolean containOverlappingIntervals(Iterable<Interval> intervals)
+  {
+    if (intervals == null) {
+      return false;
+    }
+    boolean retVal = false;
+    Interval previous = null;
+    for (Interval current : intervals) {
+      if (current == null) {
+        throw new IAE("Intervals should not contain nulls");
+      }
+      verifyAscendingSortOrder(previous, current);
+      if (previous != null && previous.overlaps(current)) {
+        retVal = true;
+        break;
+      }
+      previous = current;
+    }
     return retVal;
   }
 
+  private static void verifyAscendingSortOrder(Interval previous, Interval current)
+  {
+    if (previous != null && previous.isAfter(current)) {
+      throw new IAE("Adjacent intervals are not sorted [%s,%s]", previous, current);
+    }
+  }
+
   public static Interval umbrellaInterval(Iterable<Interval> intervals)
   {
     ArrayList<DateTime> startDates = new ArrayList<>();
@@ -137,4 +238,6 @@ public class JodaUtils
         return max;
     }
   }
+
+
 }
diff --git a/core/src/main/java/org/apache/druid/java/util/common/granularity/IntervalsByGranularity.java b/core/src/main/java/org/apache/druid/java/util/common/granularity/IntervalsByGranularity.java
new file mode 100644
index 0000000..7065535
--- /dev/null
+++ b/core/src/main/java/org/apache/druid/java/util/common/granularity/IntervalsByGranularity.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.common.granularity;
+
+import com.google.common.collect.FluentIterable;
+import org.apache.druid.common.guava.SettableSupplier;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.JodaUtils;
+import org.apache.druid.java.util.common.guava.Comparators;
+import org.joda.time.Interval;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Produce a stream of intervals generated by a given set of intervals as input and a given
+ * granularity. This class avoids materializing the granularity intervals whenever possible.
+ */
+public class IntervalsByGranularity
+{
+  private final List<Interval> sortedNonOverlappingIntervals;
+  private final Granularity granularity;
+
+  /**
+   * @param intervals   Intervals for which to apply the given granularity. They should
+   *                    not contain overlapped intervals.
+   * @param granularity The granularity to apply
+   * @throws IAE if intervals contains at least an overlapping pair
+   */
+  public IntervalsByGranularity(Collection<Interval> intervals, Granularity granularity)
+  {
+    // eliminate dups, sort intervals:
+    Set<Interval> intervalSet = new HashSet<>(intervals);
+    List<Interval> inputIntervals = new ArrayList<>(intervals.size());
+    inputIntervals.addAll(intervalSet);
+    inputIntervals.sort(Comparators.intervalsByStartThenEnd());
+
+    // sanity check
+    if (JodaUtils.containOverlappingIntervals(inputIntervals)) {
+      throw new IAE("Intervals contain overlapping intervals [%s]", intervals);
+    }
+
+    // all good:
+    sortedNonOverlappingIntervals = inputIntervals;
+    this.granularity = granularity;
+  }
+
+  /**
+   * @return The intervals according the granularity. The intervals are provided in
+   * order according to Comparators.intervalsByStartThenEnd()
+   */
+  public Iterator<Interval> granularityIntervalsIterator()
+  {
+    Iterator<Interval> ite;
+    if (sortedNonOverlappingIntervals.isEmpty()) {
+      ite = Collections.emptyIterator();
+    } else {
+      // The filter after transform & concat is to remove duplicats.
+      // This can happen when condense left intervals that did not overlap but
+      // when a larger granularity is applied then they become equal
+      // imagine input are 2013-01-01T00Z/2013-01-10T00Z, 2013-01-15T00Z/2013-01-20T00Z.
+      // the iterator for the two intervals is called, say with MONTH granularity,  two
+      // intervals will be returned, both with the same value 2013-01-01T00:00:00.000Z/2013-02-01T00:00:00.000Z.
+      // Thus dups can be created given the right conditions....
+      final SettableSupplier<Interval> previous = new SettableSupplier<>();
+      ite = FluentIterable.from(sortedNonOverlappingIntervals).transformAndConcat(granularity::getIterable)
+                          .filter(interval -> {
+                            if (previous.get() != null && previous.get().equals(interval)) {
+                              return false;
+                            }
+                            previous.set(interval);
+                            return true;
+                          }).iterator();
+    }
+    return ite;
+  }
+
+}
diff --git a/core/src/test/java/org/apache/druid/common/utils/JodaUtilsTest.java b/core/src/test/java/org/apache/druid/common/utils/JodaUtilsTest.java
index c5610f9..d7bfb4a 100644
--- a/core/src/test/java/org/apache/druid/common/utils/JodaUtilsTest.java
+++ b/core/src/test/java/org/apache/druid/common/utils/JodaUtilsTest.java
@@ -19,8 +19,11 @@
 
 package org.apache.druid.common.utils;
 
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.JodaUtils;
+import org.apache.druid.java.util.common.guava.Comparators;
 import org.joda.time.Duration;
 import org.joda.time.Interval;
 import org.joda.time.Period;
@@ -32,6 +35,7 @@ import java.util.Collections;
 import java.util.List;
 
 /**
+ *
  */
 public class JodaUtilsTest
 {
@@ -75,6 +79,40 @@ public class JodaUtilsTest
         Intervals.of("2011-03-05/2011-03-06")
     );
 
+    List<Interval> expected = Arrays.asList(
+        Intervals.of("2011-01-01/2011-01-03"),
+        Intervals.of("2011-02-01/2011-02-08"),
+        Intervals.of("2011-03-01/2011-03-02"),
+        Intervals.of("2011-03-03/2011-03-04"),
+        Intervals.of("2011-03-05/2011-03-06")
+    );
+
+    List<Interval> actual = JodaUtils.condenseIntervals(intervals);
+
+    Assert.assertEquals(
+        expected,
+        actual
+    );
+
+  }
+
+
+  @Test
+  public void testCondenseIntervalsSimpleSortedIterator()
+  {
+    List<Interval> intervals = Arrays.asList(
+        Intervals.of("2011-01-01/2011-01-02"),
+        Intervals.of("2011-01-02/2011-01-03"),
+        Intervals.of("2011-02-03/2011-02-08"),
+        Intervals.of("2011-02-01/2011-02-02"),
+        Intervals.of("2011-02-01/2011-02-05"),
+        Intervals.of("2011-03-01/2011-03-02"),
+        Intervals.of("2011-03-03/2011-03-04"),
+        Intervals.of("2011-03-05/2011-03-06")
+    );
+    intervals.sort(Comparators.intervalsByStartThenEnd());
+
+    List<Interval> actual = ImmutableList.copyOf(JodaUtils.condensedIntervalsIterator(intervals.iterator()));
     Assert.assertEquals(
         Arrays.asList(
             Intervals.of("2011-01-01/2011-01-03"),
@@ -83,7 +121,116 @@ public class JodaUtilsTest
             Intervals.of("2011-03-03/2011-03-04"),
             Intervals.of("2011-03-05/2011-03-06")
         ),
-        JodaUtils.condenseIntervals(intervals)
+        actual
+    );
+
+  }
+
+  @Test
+  public void testCondenseIntervalsSimpleSortedIteratorOverlapping()
+  {
+    List<Interval> intervals = Arrays.asList(
+        Intervals.of("2011-02-01/2011-03-10"),
+        Intervals.of("2011-01-02/2011-02-03"),
+        Intervals.of("2011-01-07/2015-01-19"),
+        Intervals.of("2011-01-15/2011-01-19"),
+        Intervals.of("2011-01-01/2011-01-02"),
+        Intervals.of("2011-02-01/2011-03-10")
+    );
+
+    intervals.sort(Comparators.intervalsByStartThenEnd());
+
+    Assert.assertEquals(
+        Collections.singletonList(
+            Intervals.of("2011-01-01/2015-01-19")
+        ),
+        ImmutableList.copyOf(JodaUtils.condensedIntervalsIterator(intervals.iterator()))
+    );
+  }
+
+  @Test(expected = IAE.class)
+  public void testCondenseIntervalsSimplSortedIteratorOverlappingWithNullsShouldThrow()
+  {
+    List<Interval> intervals = Arrays.asList(
+        Intervals.of("2011-01-02/2011-02-03"),
+        Intervals.of("2011-02-01/2011-03-10"),
+        null,
+        Intervals.of("2011-03-07/2011-04-19"),
+        Intervals.of("2011-04-01/2015-01-19"),
+        null
+    );
+    ImmutableList.copyOf(JodaUtils.condensedIntervalsIterator(intervals.iterator()));
+  }
+
+  @Test(expected = IAE.class)
+  public void testCondenseIntervalsSimplSortedIteratorOverlappingWithNullFirstAndLastshouldThrow()
+  {
+    List<Interval> intervals = Arrays.asList(
+        null,
+        Intervals.of("2011-01-02/2011-02-03"),
+        Intervals.of("2011-02-01/2011-03-10"),
+        Intervals.of("2011-03-07/2011-04-19"),
+        Intervals.of("2011-04-01/2015-01-19"),
+        null
+    );
+    ImmutableList.copyOf(JodaUtils.condensedIntervalsIterator(intervals.iterator()));
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testCondenseIntervalsSimpleUnsortedIterator()
+  {
+    List<Interval> intervals = Arrays.asList(
+        Intervals.of("2011-01-01/2011-01-02"),
+        Intervals.of("2011-01-02/2011-01-03"),
+        Intervals.of("2011-02-03/2011-02-08"),
+        Intervals.of("2011-02-01/2011-02-02"),
+        Intervals.of("2011-02-01/2011-02-05"),
+        Intervals.of("2011-03-01/2011-03-02"),
+        Intervals.of("2011-03-03/2011-03-04"),
+        Intervals.of("2011-03-05/2011-03-06")
+    );
+    ImmutableList.copyOf(JodaUtils.condensedIntervalsIterator(intervals.iterator()));
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testCondenseIntervalsSimpleUnsortedIteratorSmallestAtEnd()
+  {
+    List<Interval> intervals = Arrays.asList(
+        Intervals.of("2011-01-01/2011-01-02"),
+        Intervals.of("2011-02-01/2011-02-04"),
+        Intervals.of("2011-03-01/2011-03-04"),
+        Intervals.of("2010-01-01/2010-03-04")
+    );
+    ImmutableList.copyOf(JodaUtils.condensedIntervalsIterator(intervals.iterator()));
+  }
+
+  @Test
+  public void testCondenseIntervalsIteratorWithDups()
+  {
+    List<Interval> intervals = Arrays.asList(
+        Intervals.of("2011-01-01/2011-01-02"),
+        Intervals.of("2011-02-04/2011-02-05"),
+        Intervals.of("2011-01-01/2011-01-02"),
+        Intervals.of("2011-01-02/2011-01-03"),
+        Intervals.of("2011-02-03/2011-02-08"),
+        Intervals.of("2011-02-03/2011-02-08"),
+        Intervals.of("2011-02-01/2011-02-02"),
+        Intervals.of("2011-02-01/2011-02-05"),
+        Intervals.of("2011-03-01/2011-03-02"),
+        Intervals.of("2011-03-03/2011-03-04"),
+        Intervals.of("2011-03-05/2011-03-06")
+    );
+    intervals.sort(Comparators.intervalsByStartThenEnd());
+
+    Assert.assertEquals(
+        Arrays.asList(
+            Intervals.of("2011-01-01/2011-01-03"),
+            Intervals.of("2011-02-01/2011-02-08"),
+            Intervals.of("2011-03-01/2011-03-02"),
+            Intervals.of("2011-03-03/2011-03-04"),
+            Intervals.of("2011-03-05/2011-03-06")
+        ),
+        ImmutableList.copyOf(JodaUtils.condensedIntervalsIterator(intervals.iterator()))
     );
   }
 
@@ -144,4 +291,54 @@ public class JodaUtilsTest
     Assert.assertEquals(Long.MAX_VALUE, period.getMinutes());
   }
 
+  @Test
+  public void testShouldContainOverlappingIntervals()
+  {
+    List<Interval> intervals = Arrays.asList(
+        Intervals.of("2011-02-01/2011-03-10"),
+        Intervals.of("2011-03-25/2011-04-03"),
+        Intervals.of("2011-04-01/2015-01-19"),
+        Intervals.of("2016-01-15/2016-01-19")
+    );
+    Assert.assertTrue(JodaUtils.containOverlappingIntervals(intervals));
+  }
+
+
+  @Test
+  public void testShouldNotContainOverlappingIntervals()
+  {
+    List<Interval> intervals = Arrays.asList(
+        Intervals.of("2011-02-01/2011-03-10"),
+        Intervals.of("2011-03-10/2011-04-03"),
+        Intervals.of("2011-04-04/2015-01-14"),
+        Intervals.of("2016-01-15/2016-01-19")
+    );
+    Assert.assertFalse(JodaUtils.containOverlappingIntervals(intervals));
+  }
+
+  @Test(expected = IAE.class)
+  public void testOverlappingIntervalsContainsNull()
+  {
+    List<Interval> intervals = Arrays.asList(
+        Intervals.of("2011-02-01/2011-03-10"),
+        null,
+        Intervals.of("2011-04-04/2015-01-14"),
+        Intervals.of("2016-01-15/2016-01-19")
+    );
+    JodaUtils.containOverlappingIntervals(intervals);
+  }
+
+  @Test(expected = IAE.class)
+  public void testOverlappingIntervalsContainsUnsorted()
+  {
+    List<Interval> intervals = Arrays.asList(
+        Intervals.of("2011-02-01/2011-03-10"),
+        Intervals.of("2011-03-10/2011-04-03"),
+        Intervals.of("2016-01-15/2016-01-19"),
+        Intervals.of("2011-04-04/2015-01-14")
+    );
+    JodaUtils.containOverlappingIntervals(intervals);
+  }
+
+
 }
diff --git a/core/src/test/java/org/apache/druid/java/util/common/IntervalsByGranularityTest.java b/core/src/test/java/org/apache/druid/java/util/common/IntervalsByGranularityTest.java
new file mode 100644
index 0000000..a38e6d5
--- /dev/null
+++ b/core/src/test/java/org/apache/druid/java/util/common/IntervalsByGranularityTest.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.common;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.java.util.common.granularity.IntervalsByGranularity;
+import org.joda.time.Interval;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+public class IntervalsByGranularityTest
+{
+  private static final long SECONDS_IN_YEAR = 31536000;
+
+
+  @Test
+  public void testTrivialIntervalExplosion()
+  {
+    Interval first = Intervals.of("2013-01-01T00Z/2013-02-01T00Z");
+    Interval second = Intervals.of("2012-01-01T00Z/2012-02-01T00Z");
+    Interval third = Intervals.of("2002-01-01T00Z/2003-01-01T00Z");
+
+    IntervalsByGranularity intervals = new IntervalsByGranularity(
+        ImmutableList.of(first, second, third),
+        Granularity.fromString("DAY")
+    );
+
+    // get count:
+    Iterator<Interval> granularityIntervals = intervals.granularityIntervalsIterator();
+    long count = getCount(granularityIntervals);
+    Assert.assertTrue(count == 62 + 365);
+
+    granularityIntervals = intervals.granularityIntervalsIterator();
+    count = getCountWithNoHasNext(granularityIntervals);
+    Assert.assertTrue(count == 62 + 365);
+  }
+
+
+  @Test
+  public void testDups()
+  {
+    Interval first = Intervals.of("2013-01-01T00Z/2013-02-01T00Z");
+    Interval second = Intervals.of("2012-04-01T00Z/2012-05-01T00Z");
+    Interval third = Intervals.of("2013-01-01T00Z/2013-02-01T00Z"); // dup
+
+    IntervalsByGranularity intervals = new IntervalsByGranularity(
+        ImmutableList.of(first, second, third),
+        Granularity.fromString("DAY")
+    );
+
+    // get count:
+    Iterator<Interval> granularityIntervals = intervals.granularityIntervalsIterator();
+    long count = getCount(granularityIntervals);
+    Assert.assertTrue(count == 61);
+  }
+
+
+  @Test
+  public void testCondenseForManyIntervals()
+  {
+    // This method attempts to test that there are no issues when condensed is called
+    // with an iterator pointing to millions of intervals (since the version of condensed
+    // used here takes an interval iterator and does not materialize intervals)
+    Interval first = Intervals.of("2012-01-01T00Z/P1Y");
+    IntervalsByGranularity intervals = new IntervalsByGranularity(
+        ImmutableList.of(first),
+        Granularity.fromString("SECOND")
+    );
+    Assert.assertEquals(
+        ImmutableList.of(Intervals.of("2012-01-01T00Z/2013-01-01T00Z")),
+        ImmutableList.copyOf(JodaUtils.condensedIntervalsIterator(intervals.granularityIntervalsIterator()))
+    );
+  }
+
+  @Test
+  public void testIntervalExplosion()
+  {
+    Interval first = Intervals.of("2012-01-01T00Z/2012-12-31T00Z");
+    Interval second = Intervals.of("2002-01-01T00Z/2002-12-31T00Z");
+    Interval third = Intervals.of("2021-01-01T00Z/2021-06-30T00Z");
+    IntervalsByGranularity intervals = new IntervalsByGranularity(
+        ImmutableList.of(first, second, third),
+        Granularity.fromString("SECOND")
+    );
+
+    // get count:
+    Iterator<Interval> granularityIntervals = intervals.granularityIntervalsIterator();
+    long count = getCount(granularityIntervals);
+    Assert.assertTrue(count == 78537600);
+
+  }
+
+  @Test
+  public void testSimpleEliminateRepeated()
+  {
+    final List<Interval> inputIntervals = ImmutableList.of(
+        Intervals.of("2012-01-08T00Z/2012-01-11T00Z"),
+        Intervals.of("2012-01-07T00Z/2012-01-08T00Z"),
+        Intervals.of("2012-01-03T00Z/2012-01-04T00Z"),
+        Intervals.of("2012-01-01T00Z/2012-01-03T00Z")
+    );
+    IntervalsByGranularity intervals = new IntervalsByGranularity(
+        inputIntervals,
+        Granularities.MONTH
+    );
+
+    Assert.assertEquals(
+        ImmutableList.of(Intervals.of("2012-01-01T00Z/2012-02-01T00Z")),
+        ImmutableList.copyOf(intervals.granularityIntervalsIterator())
+    );
+
+  }
+
+  @Test
+  public void testALittleMoreComplexEliminateRepeated()
+  {
+    final List<Interval> inputIntervals = ImmutableList.of(
+        Intervals.of("2015-01-08T00Z/2015-01-11T00Z"),
+        Intervals.of("2012-01-08T00Z/2012-01-11T00Z"),
+        Intervals.of("2012-01-07T00Z/2012-01-08T00Z"),
+        Intervals.of("2012-01-03T00Z/2012-01-04T00Z"),
+        Intervals.of("2012-01-01T00Z/2012-01-03T00Z"),
+        Intervals.of("2007-03-08T00Z/2007-04-11T00Z")
+    );
+    IntervalsByGranularity intervals = new IntervalsByGranularity(
+        inputIntervals,
+        Granularities.MONTH
+    );
+
+    Assert.assertEquals(
+        ImmutableList.of(
+            Intervals.of("2007-03-01T00Z/2007-04-01T00Z"),
+            Intervals.of("2007-04-01T00Z/2007-05-01T00Z"),
+            Intervals.of("2012-01-01T00Z/2012-02-01T00Z"),
+            Intervals.of("2015-01-01T00Z/2015-02-01T00Z")
+        ),
+        ImmutableList.copyOf(intervals.granularityIntervalsIterator())
+    );
+
+  }
+
+  @Test(expected = IAE.class)
+  public void testOverlappingShouldThrow()
+  {
+    List<Interval> inputIntervals = ImmutableList.of(
+        Intervals.of("2013-01-01T00Z/2013-01-11T00Z"),
+        Intervals.of("2013-01-05T00Z/2013-01-08T00Z"),
+        Intervals.of("2013-01-07T00Z/2013-01-15T00Z")
+    );
+
+    IntervalsByGranularity intervals = new IntervalsByGranularity(
+        inputIntervals,
+        Granularity.fromString("DAY")
+    );
+  }
+
+
+  @Test
+  public void testWithGranularity()
+  {
+    List<Interval> inputIntervals = ImmutableList.of(
+        Intervals.of("2013-01-01T00Z/2013-01-10T00Z"),
+        Intervals.of("2013-01-15T00Z/2013-01-20T00Z"),
+        Intervals.of("2013-02-07T00Z/2013-02-15T00Z")
+    );
+
+    IntervalsByGranularity intervals = new IntervalsByGranularity(
+        inputIntervals,
+        Granularity.fromString("MONTH")
+    );
+
+    // get count:
+    Iterator<Interval> granularityIntervals = intervals.granularityIntervalsIterator();
+    long count = getCount(granularityIntervals);
+    Assert.assertTrue(count == 2);
+  }
+
+  @Test(expected = UnsupportedOperationException.class)
+  public void testRemoveThrowsException()
+  {
+    final List<Interval> inputIntervals = ImmutableList.of(
+        Intervals.of("2015-01-08T00Z/2015-01-11T00Z")
+    );
+    IntervalsByGranularity intervals = new IntervalsByGranularity(
+        inputIntervals,
+        Granularities.MONTH
+    );
+    intervals.granularityIntervalsIterator().remove();
+  }
+
+  @Test
+  public void testEmptyInput()
+  {
+    final List<Interval> inputIntervals = Collections.emptyList();
+    IntervalsByGranularity intervals = new IntervalsByGranularity(
+        inputIntervals,
+        Granularities.MONTH
+    );
+    Assert.assertFalse(intervals.granularityIntervalsIterator().hasNext());
+  }
+
+  private long getCount(Iterator<Interval> granularityIntervalIterator)
+  {
+    long count = 0;
+    Interval previous = null;
+    Interval current;
+    while (granularityIntervalIterator.hasNext()) {
+      current = granularityIntervalIterator.next();
+      if (previous != null) {
+        Assert.assertTrue(previous + "," + current, previous.getEndMillis() <= current.getStartMillis());
+      }
+      previous = current;
+      count++;
+    }
+    return count;
+  }
+
+  private long getCountWithNoHasNext(Iterator<Interval> granularityIntervalIterator)
+  {
+    long count = 0;
+    Interval previous = null;
+    Interval current;
+
+    while (true) {
+      try {
+        current = granularityIntervalIterator.next();
+      }
+      catch (NoSuchElementException e) {
+        // done
+        break;
+      }
+      if (previous != null) {
+        Assert.assertTrue(previous.getEndMillis() <= current.getStartMillis());
+      }
+      previous = current;
+      count++;
+    }
+
+    return count;
+  }
+
+}
diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/DetermineHashedPartitionsJob.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/DetermineHashedPartitionsJob.java
index b1ec546..1d0c3c4 100644
--- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/DetermineHashedPartitionsJob.java
+++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/DetermineHashedPartitionsJob.java
@@ -22,6 +22,7 @@ package org.apache.druid.indexer;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.common.hash.HashFunction;
 import com.google.common.hash.Hashing;
@@ -64,7 +65,6 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.TreeMap;
 
 /**
@@ -109,10 +109,10 @@ public class DetermineHashedPartitionsJob implements Jobby
       groupByJob.setOutputValueClass(NullWritable.class);
       groupByJob.setOutputFormatClass(SequenceFileOutputFormat.class);
       groupByJob.setPartitionerClass(DetermineHashedPartitionsPartitioner.class);
-      if (!config.getSegmentGranularIntervals().isPresent()) {
+      if (config.getInputIntervals().isEmpty()) {
         groupByJob.setNumReduceTasks(1);
       } else {
-        groupByJob.setNumReduceTasks(config.getSegmentGranularIntervals().get().size());
+        groupByJob.setNumReduceTasks(Iterators.size(config.getSegmentGranularIntervals().iterator()));
       }
       JobHelper.setupClasspath(
           JobHelper.distributedClassPath(config.getWorkingPath()),
@@ -151,7 +151,7 @@ public class DetermineHashedPartitionsJob implements Jobby
 
       log.info("Job completed, loading up partitions for intervals[%s].", config.getSegmentGranularIntervals());
       FileSystem fileSystem = null;
-      if (!config.getSegmentGranularIntervals().isPresent()) {
+      if (config.getInputIntervals().isEmpty()) {
         final Path intervalInfoPath = config.makeIntervalInfoPath();
         fileSystem = intervalInfoPath.getFileSystem(groupByJob.getConfiguration());
         if (!Utils.exists(groupByJob, fileSystem, intervalInfoPath)) {
@@ -159,7 +159,9 @@ public class DetermineHashedPartitionsJob implements Jobby
         }
         List<Interval> intervals = HadoopDruidIndexerConfig.JSON_MAPPER.readValue(
             Utils.openInputStream(groupByJob, intervalInfoPath),
-            new TypeReference<List<Interval>>() {}
+            new TypeReference<List<Interval>>()
+            {
+            }
         );
         config.setGranularitySpec(
             new UniformGranularitySpec(
@@ -182,7 +184,7 @@ public class DetermineHashedPartitionsJob implements Jobby
       }
       HashPartitionFunction partitionFunction = ((HashedPartitionsSpec) partitionsSpec).getPartitionFunction();
       int shardCount = 0;
-      for (Interval segmentGranularity : config.getSegmentGranularIntervals().get()) {
+      for (Interval segmentGranularity : config.getSegmentGranularIntervals()) {
         DateTime bucket = segmentGranularity.getStart();
 
         final Path partitionInfoPath = config.makeSegmentPartitionInfoPath(segmentGranularity);
@@ -295,11 +297,11 @@ public class DetermineHashedPartitionsJob implements Jobby
       super.setup(context);
       rollupGranularity = getConfig().getGranularitySpec().getQueryGranularity();
       config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration());
-      Optional<Set<Interval>> intervals = config.getSegmentGranularIntervals();
-      if (intervals.isPresent()) {
+      Iterable<Interval> intervals = config.getSegmentGranularIntervals();
+      if (intervals.iterator().hasNext()) {
         determineIntervals = false;
         final ImmutableMap.Builder<Interval, HyperLogLogCollector> builder = ImmutableMap.builder();
-        for (final Interval bucketInterval : intervals.get()) {
+        for (final Interval bucketInterval : intervals) {
           builder.put(bucketInterval, HyperLogLogCollector.makeLatestCollector());
         }
         hyperLogLogs = builder.build();
@@ -376,7 +378,7 @@ public class DetermineHashedPartitionsJob implements Jobby
     protected void setup(Context context)
     {
       config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration());
-      determineIntervals = !config.getSegmentGranularIntervals().isPresent();
+      determineIntervals = config.getInputIntervals().isEmpty();
     }
 
     @Override
@@ -477,11 +479,11 @@ public class DetermineHashedPartitionsJob implements Jobby
     {
       this.config = config;
       HadoopDruidIndexerConfig hadoopConfig = HadoopDruidIndexerConfig.fromConfiguration(config);
-      if (hadoopConfig.getSegmentGranularIntervals().isPresent()) {
+      if (!hadoopConfig.getInputIntervals().isEmpty()) {
         determineIntervals = false;
         int reducerNumber = 0;
         ImmutableMap.Builder<LongWritable, Integer> builder = ImmutableMap.builder();
-        for (Interval interval : hadoopConfig.getSegmentGranularIntervals().get()) {
+        for (Interval interval : hadoopConfig.getSegmentGranularIntervals()) {
           builder.put(new LongWritable(interval.getStartMillis()), reducerNumber++);
         }
         reducerLookup = builder.build();
diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJob.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJob.java
index 1e810c6..ebe69cd 100644
--- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJob.java
+++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJob.java
@@ -212,7 +212,7 @@ public class DeterminePartitionsJob implements Jobby
       dimSelectionJob.setOutputKeyClass(BytesWritable.class);
       dimSelectionJob.setOutputValueClass(Text.class);
       dimSelectionJob.setOutputFormatClass(DeterminePartitionsDimSelectionOutputFormat.class);
-      dimSelectionJob.setNumReduceTasks(config.getGranularitySpec().bucketIntervals().get().size());
+      dimSelectionJob.setNumReduceTasks(Iterators.size(config.getGranularitySpec().sortedBucketIntervals().iterator()));
       JobHelper.setupClasspath(
           JobHelper.distributedClassPath(config.getWorkingPath()),
           JobHelper.distributedClassPath(config.makeIntermediatePath()),
@@ -256,7 +256,7 @@ public class DeterminePartitionsJob implements Jobby
       FileSystem fileSystem = null;
       Map<Long, List<HadoopyShardSpec>> shardSpecs = new TreeMap<>();
       int shardCount = 0;
-      for (Interval segmentGranularity : config.getSegmentGranularIntervals().get()) {
+      for (Interval segmentGranularity : config.getSegmentGranularIntervals()) {
         final Path partitionInfoPath = config.makeSegmentPartitionInfoPath(segmentGranularity);
         if (fileSystem == null) {
           fileSystem = partitionInfoPath.getFileSystem(dimSelectionJob.getConfiguration());
@@ -447,7 +447,7 @@ public class DeterminePartitionsJob implements Jobby
 
       final ImmutableMap.Builder<Long, Integer> timeIndexBuilder = ImmutableMap.builder();
       int idx = 0;
-      for (final Interval bucketInterval : config.getGranularitySpec().bucketIntervals().get()) {
+      for (final Interval bucketInterval : config.getGranularitySpec().sortedBucketIntervals()) {
         timeIndexBuilder.put(bucketInterval.getStartMillis(), idx);
         idx++;
       }
diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJob.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJob.java
index a4f719a..8b5b4b6 100644
--- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJob.java
+++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJob.java
@@ -36,6 +36,7 @@ import java.util.Map;
 import java.util.TreeMap;
 
 /**
+ *
  */
 public class HadoopDruidDetermineConfigurationJob implements Jobby
 {
@@ -75,7 +76,7 @@ public class HadoopDruidDetermineConfigurationJob implements Jobby
       }
       Map<Long, List<HadoopyShardSpec>> shardSpecs = new TreeMap<>();
       int shardCount = 0;
-      for (Interval segmentGranularity : config.getSegmentGranularIntervals().get()) {
+      for (Interval segmentGranularity : config.getSegmentGranularIntervals()) {
         DateTime bucket = segmentGranularity.getStart();
         // negative shardsPerInterval means a single shard
         List<HadoopyShardSpec> specs = Lists.newArrayListWithCapacity(shardsPerInterval);
diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerConfig.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerConfig.java
index 0b69b33..54c8f07 100644
--- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerConfig.java
+++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerConfig.java
@@ -76,10 +76,9 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.util.Set;
-import java.util.SortedSet;
 
 /**
+ *
  */
 public class HadoopDruidIndexerConfig
 {
@@ -99,7 +98,7 @@ public class HadoopDruidIndexerConfig
   /**
    * Hadoop tasks running in an Indexer process need a reference to the Properties instance created
    * in PropertiesModule so that the task sees properties that were specified in Druid's config files.
-   *
+   * <p>
    * This is not strictly necessary for Peon-based tasks which have all properties, including config file properties,
    * specified on their command line by ForkingTaskRunner (so they could use System.getProperties() only),
    * but we always use the injected Properties for consistency.
@@ -314,9 +313,9 @@ public class HadoopDruidIndexerConfig
 
   public Optional<List<Interval>> getIntervals()
   {
-    Optional<SortedSet<Interval>> setOptional = schema.getDataSchema().getGranularitySpec().bucketIntervals();
-    if (setOptional.isPresent()) {
-      return Optional.of(JodaUtils.condenseIntervals(setOptional.get()));
+    Iterable<Interval> bucketIntervals = schema.getDataSchema().getGranularitySpec().sortedBucketIntervals();
+    if (bucketIntervals.iterator().hasNext()) {
+      return Optional.of(JodaUtils.condenseIntervals(bucketIntervals));
     } else {
       return Optional.absent();
     }
@@ -426,7 +425,6 @@ public class HadoopDruidIndexerConfig
    * Get the proper bucket for some input row.
    *
    * @param inputRow an InputRow
-   *
    * @return the Bucket that this row belongs to
    */
   Optional<Bucket> getBucket(InputRow inputRow)
@@ -455,14 +453,12 @@ public class HadoopDruidIndexerConfig
 
   }
 
-  Optional<Set<Interval>> getSegmentGranularIntervals()
+  Iterable<Interval> getSegmentGranularIntervals()
   {
-    return Optional.fromNullable(
+    return
         schema.getDataSchema()
               .getGranularitySpec()
-              .bucketIntervals()
-              .orNull()
-    );
+              .sortedBucketIntervals();
   }
 
   public List<Interval> getInputIntervals()
@@ -474,15 +470,17 @@ public class HadoopDruidIndexerConfig
 
   Optional<Iterable<Bucket>> getAllBuckets()
   {
-    Optional<Set<Interval>> intervals = getSegmentGranularIntervals();
-    if (intervals.isPresent()) {
+    Iterable<Interval> intervals = getSegmentGranularIntervals();
+    if (intervals.iterator().hasNext()) {
       return Optional.of(
           FunctionalIterable
-              .create(intervals.get())
+              .create(intervals)
               .transformCat(
                   input -> {
                     final DateTime bucketTime = input.getStart();
-                    final List<HadoopyShardSpec> specs = schema.getTuningConfig().getShardSpecs().get(bucketTime.getMillis());
+                    final List<HadoopyShardSpec> specs = schema.getTuningConfig()
+                                                               .getShardSpecs()
+                                                               .get(bucketTime.getMillis());
                     if (specs == null) {
                       return ImmutableList.of();
                     }
diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerMapper.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerMapper.java
index 02ced6c..5190280 100644
--- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerMapper.java
+++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerMapper.java
@@ -83,7 +83,7 @@ public abstract class HadoopDruidIndexerMapper<KEYOUT, VALUEOUT> extends Mapper<
             throw new ParseException(errorMsg);
           }
 
-          if (!granularitySpec.bucketIntervals().isPresent()
+          if (granularitySpec.inputIntervals().isEmpty()
               || granularitySpec.bucketInterval(DateTimes.utc(inputRow.getTimestampFromEpoch()))
                                 .isPresent()) {
             innerMap(inputRow, context);
diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJobTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJobTest.java
index 3c809d2..f16ac13 100644
--- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJobTest.java
+++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJobTest.java
@@ -20,7 +20,6 @@
 package org.apache.druid.indexer;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
@@ -59,7 +58,7 @@ public class HadoopDruidDetermineConfigurationJobTest
     final HadoopDruidIndexerConfig config = Mockito.mock(HadoopDruidIndexerConfig.class);
     Mockito.when(config.isDeterminingPartitions()).thenReturn(false);
     Mockito.when(config.getPartitionsSpec()).thenReturn(partitionsSpec);
-    Mockito.when(config.getSegmentGranularIntervals()).thenReturn(Optional.of(intervals));
+    Mockito.when(config.getSegmentGranularIntervals()).thenReturn(intervals);
     final ArgumentCaptor<Map<Long, List<HadoopyShardSpec>>> resultCaptor = ArgumentCaptor.forClass(Map.class);
     Mockito.doNothing().when(config).setShardSpecs(resultCaptor.capture());
 
@@ -99,7 +98,7 @@ public class HadoopDruidDetermineConfigurationJobTest
     final HadoopDruidIndexerConfig config = Mockito.mock(HadoopDruidIndexerConfig.class);
     Mockito.when(config.isDeterminingPartitions()).thenReturn(false);
     Mockito.when(config.getPartitionsSpec()).thenReturn(partitionsSpec);
-    Mockito.when(config.getSegmentGranularIntervals()).thenReturn(Optional.of(intervals));
+    Mockito.when(config.getSegmentGranularIntervals()).thenReturn(intervals);
     final ArgumentCaptor<Map<Long, List<HadoopyShardSpec>>> resultCaptor = ArgumentCaptor.forClass(Map.class);
     Mockito.doNothing().when(config).setShardSpecs(resultCaptor.capture());
 
diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopIngestionSpecTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopIngestionSpecTest.java
index aed3381..823cdd2 100644
--- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopIngestionSpecTest.java
+++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopIngestionSpecTest.java
@@ -78,7 +78,7 @@ public class HadoopIngestionSpecTest
     Assert.assertEquals(
         "getIntervals",
         Collections.singletonList(Intervals.of("2012-01-01/P1D")),
-        granularitySpec.getIntervals().get()
+        granularitySpec.inputIntervals()
     );
 
     Assert.assertEquals(
diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java
index 8d59554..c0254c4 100644
--- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java
+++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java
@@ -599,7 +599,7 @@ public class IndexGeneratorJobTest
     Map<Long, List<HadoopyShardSpec>> shardSpecs = new TreeMap<>(DateTimeComparator.getInstance());
     int shardCount = 0;
     int segmentNum = 0;
-    for (Interval segmentGranularity : config.getSegmentGranularIntervals().get()) {
+    for (Interval segmentGranularity : config.getSegmentGranularIntervals()) {
       List<ShardSpec> specs = constructShardSpecFromShardInfo(partitionType, shardInfoForEachShard[segmentNum++]);
       List<HadoopyShardSpec> actualSpecs = Lists.newArrayListWithExpectedSize(specs.size());
       for (ShardSpec spec : specs) {
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
index f9c725a..4a7fa0d 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
@@ -22,7 +22,6 @@ package org.apache.druid.indexing.common.task;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
 import com.google.errorprone.annotations.concurrent.GuardedBy;
 import org.apache.druid.data.input.FirehoseFactory;
 import org.apache.druid.data.input.InputFormat;
@@ -48,6 +47,7 @@ import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.JodaUtils;
 import org.apache.druid.java.util.common.granularity.Granularity;
 import org.apache.druid.java.util.common.granularity.GranularityType;
+import org.apache.druid.java.util.common.granularity.IntervalsByGranularity;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.segment.incremental.ParseExceptionHandler;
@@ -69,7 +69,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -263,40 +262,17 @@ public abstract class AbstractBatchIndexTask extends AbstractTask
   }
 
   /**
-   * Attempts to acquire a lock that covers the intervals specified in a certain granularitySpec.
-   *
-   * This method uses {@link GranularitySpec#bucketIntervals()} to get the list of intervals to lock, and passes them
-   * to {@link #determineLockGranularityAndTryLock(TaskActionClient, List)}.
-   *
-   * Will look at {@link Tasks#FORCE_TIME_CHUNK_LOCK_KEY} to decide whether to acquire a time chunk or segment lock.
-   *
-   * If {@link Tasks#FORCE_TIME_CHUNK_LOCK_KEY} is set, or if {@param intervals} is nonempty, then this method
-   * will initialize {@link #taskLockHelper} as a side effect.
-   *
-   * @return whether the lock was acquired
-   */
-  protected boolean determineLockGranularityAndTryLock(
-      TaskActionClient client,
-      GranularitySpec granularitySpec
-  ) throws IOException
-  {
-    final List<Interval> intervals = granularitySpec.bucketIntervals().isPresent()
-                                     ? new ArrayList<>(granularitySpec.bucketIntervals().get())
-                                     : Collections.emptyList();
-    return determineLockGranularityAndTryLock(client, intervals);
-  }
-
-  /**
    * Attempts to acquire a lock that covers certain intervals.
-   *
+   * <p>
    * Will look at {@link Tasks#FORCE_TIME_CHUNK_LOCK_KEY} to decide whether to acquire a time chunk or segment lock.
-   *
+   * <p>
    * If {@link Tasks#FORCE_TIME_CHUNK_LOCK_KEY} is set, or if {@param intervals} is nonempty, then this method
    * will initialize {@link #taskLockHelper} as a side effect.
    *
    * @return whether the lock was acquired
    */
-  boolean determineLockGranularityAndTryLock(TaskActionClient client, List<Interval> intervals) throws IOException
+  public boolean determineLockGranularityAndTryLock(TaskActionClient client, List<Interval> intervals)
+      throws IOException
   {
     final boolean forceTimeChunkLock = getContextValue(
         Tasks.FORCE_TIME_CHUNK_LOCK_KEY,
@@ -325,9 +301,9 @@ public abstract class AbstractBatchIndexTask extends AbstractTask
 
   /**
    * Attempts to acquire a lock that covers certain segments.
-   *
+   * <p>
    * Will look at {@link Tasks#FORCE_TIME_CHUNK_LOCK_KEY} to decide whether to acquire a time chunk or segment lock.
-   *
+   * <p>
    * This method will initialize {@link #taskLockHelper} as a side effect.
    *
    * @return whether the lock was acquired
@@ -396,25 +372,33 @@ public abstract class AbstractBatchIndexTask extends AbstractTask
     }
   }
 
+
   protected boolean tryTimeChunkLock(TaskActionClient client, List<Interval> intervals) throws IOException
   {
     // The given intervals are first converted to align with segment granularity. This is because,
     // when an overwriting task finds a version for a given input row, it expects the interval
     // associated to each version to be equal or larger than the time bucket where the input row falls in.
     // See ParallelIndexSupervisorTask.findVersion().
-    final Set<Interval> uniqueIntervals = new HashSet<>();
+    final Iterator<Interval> intervalIterator;
     final Granularity segmentGranularity = getSegmentGranularity();
-    for (Interval interval : intervals) {
-      if (segmentGranularity == null) {
-        uniqueIntervals.add(interval);
-      } else {
-        Iterables.addAll(uniqueIntervals, segmentGranularity.getIterable(interval));
-      }
+    if (segmentGranularity == null) {
+      intervalIterator = JodaUtils.condenseIntervals(intervals).iterator();
+    } else {
+      IntervalsByGranularity intervalsByGranularity = new IntervalsByGranularity(intervals, segmentGranularity);
+      // the following is calling a condense that does not materialize the intervals:
+      intervalIterator = JodaUtils.condensedIntervalsIterator(intervalsByGranularity.granularityIntervalsIterator());
     }
 
-    // Condense intervals to avoid creating too many locks.
-    for (Interval interval : JodaUtils.condenseIntervals(uniqueIntervals)) {
-      final TaskLock lock = client.submit(new TimeChunkLockTryAcquireAction(TaskLockType.EXCLUSIVE, interval));
+    // Intervals are already condensed to avoid creating too many locks.
+    // Intervals are also sorted and thus it's safe to compare only the previous interval and current one for dedup.
+    Interval prev = null;
+    while (intervalIterator.hasNext()) {
+      final Interval cur = intervalIterator.next();
+      if (prev != null && cur.equals(prev)) {
+        continue;
+      }
+      prev = cur;
+      final TaskLock lock = client.submit(new TimeChunkLockTryAcquireAction(TaskLockType.EXCLUSIVE, cur));
       if (lock == null) {
         return false;
       }
@@ -525,11 +509,11 @@ public abstract class AbstractBatchIndexTask extends AbstractTask
    * If the given firehoseFactory is {@link IngestSegmentFirehoseFactory}, then it finds the segments to lock
    * from the firehoseFactory. This is because those segments will be read by this task no matter what segments would be
    * filtered by intervalsToRead, so they need to be locked.
-   *
+   * <p>
    * However, firehoseFactory is not IngestSegmentFirehoseFactory, it means this task will overwrite some segments
    * with data read from some input source outside of Druid. As a result, only the segments falling in intervalsToRead
    * should be locked.
-   *
+   * <p>
    * The order of segments within the returned list is unspecified, but each segment is guaranteed to appear in the list
    * only once.
    */
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
index 1236e35..c11ce31 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
@@ -82,7 +82,6 @@ import java.lang.reflect.Method;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.SortedSet;
 
 public class HadoopIndexTask extends HadoopTask implements ChatHandler
 {
@@ -189,12 +188,10 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
   @Override
   public boolean isReady(TaskActionClient taskActionClient) throws Exception
   {
-    Optional<SortedSet<Interval>> intervals = spec.getDataSchema().getGranularitySpec().bucketIntervals();
-    if (intervals.isPresent()) {
+    Iterable<Interval> intervals = spec.getDataSchema().getGranularitySpec().sortedBucketIntervals();
+    if (intervals.iterator().hasNext()) {
       Interval interval = JodaUtils.umbrellaInterval(
-          JodaUtils.condenseIntervals(
-              intervals.get()
-          )
+          JodaUtils.condenseIntervals(intervals)
       );
       return taskActionClient.submit(new TimeChunkLockTryAcquireAction(TaskLockType.EXCLUSIVE, interval)) != null;
     } else {
@@ -312,7 +309,7 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
     registerResourceCloserOnAbnormalExit(config -> killHadoopJob());
     String hadoopJobIdFile = getHadoopJobIdFileName();
     final ClassLoader loader = buildClassLoader(toolbox);
-    boolean determineIntervals = !spec.getDataSchema().getGranularitySpec().bucketIntervals().isPresent();
+    boolean determineIntervals = spec.getDataSchema().getGranularitySpec().inputIntervals().isEmpty();
 
     HadoopIngestionSpec.updateSegmentListIfDatasourcePathSpecIsUsed(
         spec,
@@ -377,7 +374,7 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
     if (determineIntervals) {
       Interval interval = JodaUtils.umbrellaInterval(
           JodaUtils.condenseIntervals(
-              indexerSchema.getDataSchema().getGranularitySpec().bucketIntervals().get()
+              indexerSchema.getDataSchema().getGranularitySpec().sortedBucketIntervals()
           )
       );
       final long lockTimeoutMs = getContextValue(Tasks.LOCK_TIMEOUT_KEY, Tasks.DEFAULT_LOCK_TIMEOUT_MILLIS);
@@ -621,7 +618,9 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
   }
 
 
-  /** Called indirectly in {@link HadoopIndexTask#run(TaskToolbox)}. */
+  /**
+   * Called indirectly in {@link HadoopIndexTask#run(TaskToolbox)}.
+   */
   @SuppressWarnings("unused")
   public static class HadoopDetermineConfigInnerProcessingRunner
   {
@@ -770,9 +769,9 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
             jobId
         });
 
-        return new String[] {jobId, (res == 0 ? "Success" : "Fail")};
+        return new String[]{jobId, (res == 0 ? "Success" : "Fail")};
       }
-      return new String[] {jobId, "Fail"};
+      return new String[]{jobId, "Fail"};
     }
   }
 
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
index ce9da23..88779ab 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
@@ -123,7 +123,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
-import java.util.SortedSet;
 import java.util.TreeMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -231,7 +230,10 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
         throw new UOE("partitionsSpec[%s] is not supported", tuningConfig.getPartitionsSpec().getClass().getName());
       }
     }
-    return determineLockGranularityAndTryLock(taskActionClient, ingestionSchema.dataSchema.getGranularitySpec());
+    return determineLockGranularityAndTryLock(
+        taskActionClient,
+        ingestionSchema.dataSchema.getGranularitySpec().inputIntervals()
+    );
   }
 
   @Override
@@ -452,10 +454,10 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
           ingestionSchema.getTuningConfig().getMaxSavedParseExceptions()
       );
 
-      final boolean determineIntervals = !ingestionSchema.getDataSchema()
-                                                         .getGranularitySpec()
-                                                         .bucketIntervals()
-                                                         .isPresent();
+      final boolean determineIntervals = ingestionSchema.getDataSchema()
+                                                        .getGranularitySpec()
+                                                        .inputIntervals()
+                                                        .isEmpty();
 
       final InputSource inputSource = ingestionSchema.getIOConfig().getNonNullInputSource(
           ingestionSchema.getDataSchema().getParser()
@@ -591,7 +593,7 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
     final GranularitySpec granularitySpec = ingestionSchema.getDataSchema().getGranularitySpec();
 
     // Must determine intervals if unknown, since we acquire all locks before processing any data.
-    final boolean determineIntervals = !granularitySpec.bucketIntervals().isPresent();
+    final boolean determineIntervals = granularitySpec.inputIntervals().isEmpty();
 
     // Must determine partitions if rollup is guaranteed and the user didn't provide a specific value.
     final boolean determineNumPartitions = partitionsSpec.needsDeterminePartitions(false);
@@ -630,7 +632,7 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
       @Nonnull DynamicPartitionsSpec partitionsSpec
   )
   {
-    final SortedSet<Interval> intervals = granularitySpec.bucketIntervals().get();
+    final Iterable<Interval> intervals = granularitySpec.sortedBucketIntervals();
     final int numBucketsPerInterval = 1;
     final LinearPartitionAnalysis partitionAnalysis = new LinearPartitionAnalysis(partitionsSpec);
     intervals.forEach(interval -> partitionAnalysis.updateBucket(interval, numBucketsPerInterval));
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
index 038e788..bd64fbd 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
@@ -109,7 +109,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
-import java.util.SortedSet;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -141,7 +140,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
    * the segment granularity of existing segments until the task reads all data because we don't know what segments
    * are going to be overwritten. As a result, we assume that segment granularity is going to be changed if intervals
    * are missing and force to use timeChunk lock.
-   *
+   * <p>
    * This variable is initialized in the constructor and used in {@link #run} to log that timeChunk lock was enforced
    * in the task logs.
    */
@@ -194,10 +193,10 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
         ingestionSchema.getDataSchema().getParser()
     );
     this.missingIntervalsInOverwriteMode = !ingestionSchema.getIOConfig().isAppendToExisting()
-                                           && !ingestionSchema.getDataSchema()
-                                                              .getGranularitySpec()
-                                                              .bucketIntervals()
-                                                              .isPresent();
+                                           && ingestionSchema.getDataSchema()
+                                                             .getGranularitySpec()
+                                                             .inputIntervals()
+                                                             .isEmpty();
     if (missingIntervalsInOverwriteMode) {
       addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, true);
     }
@@ -320,12 +319,12 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
   )
   {
     return new PartialRangeSegmentGenerateParallelIndexTaskRunner(
-       toolbox,
-       getId(),
-       getGroupId(),
-       ingestionSchema,
-       getContext(),
-       intervalToPartitions
+        toolbox,
+        getId(),
+        getGroupId(),
+        ingestionSchema,
+        getContext(),
+        intervalToPartitions
     );
   }
 
@@ -350,7 +349,10 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
   @Override
   public boolean isReady(TaskActionClient taskActionClient) throws Exception
   {
-    return determineLockGranularityAndTryLock(taskActionClient, ingestionSchema.getDataSchema().getGranularitySpec());
+    return determineLockGranularityAndTryLock(
+        taskActionClient,
+        ingestionSchema.getDataSchema().getGranularitySpec().inputIntervals()
+    );
   }
 
   @Override
@@ -513,13 +515,13 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
   /**
    * Run the multi phase parallel indexing for perfect rollup. In this mode, the parallel indexing is currently
    * executed in two phases.
-   *
+   * <p>
    * - In the first phase, each task partitions input data and stores those partitions in local storage.
-   *   - The partition is created based on the segment granularity (primary partition key) and the partition dimension
-   *     values in {@link PartitionsSpec} (secondary partition key).
-   *   - Partitioned data is maintained by {@link IntermediaryDataManager}.
+   * - The partition is created based on the segment granularity (primary partition key) and the partition dimension
+   * values in {@link PartitionsSpec} (secondary partition key).
+   * - Partitioned data is maintained by {@link IntermediaryDataManager}.
    * - In the second phase, each task reads partitioned data from the intermediary data server (middleManager
-   *   or indexer) and merges them to create the final segments.
+   * or indexer) and merges them to create the final segments.
    */
   private TaskStatus runMultiPhaseParallel(TaskToolbox toolbox) throws Exception
   {
@@ -817,7 +819,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
   }
 
   private static <S extends PartitionStat, L extends PartitionLocation>
-        Map<Pair<Interval, Integer>, List<L>> groupPartitionLocationsPerPartition(
+      Map<Pair<Interval, Integer>, List<L>> groupPartitionLocationsPerPartition(
       Map<String, ? extends GeneratedPartitionsReport<S>> subTaskIdToReport,
       BiFunction<String, S, L> createPartitionLocationFunction
   )
@@ -891,7 +893,6 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
    * @param index  index of partition
    * @param total  number of items to partition
    * @param splits number of desired partitions
-   *
    * @return partition range: [lhs, rhs)
    */
   private static Pair<Integer, Integer> getPartitionBoundaries(int index, int total, int splits)
@@ -1038,7 +1039,10 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
   {
     final String dataSource = getDataSource();
     final GranularitySpec granularitySpec = getIngestionSchema().getDataSchema().getGranularitySpec();
-    final Optional<SortedSet<Interval>> bucketIntervals = granularitySpec.bucketIntervals();
+    // This method is called whenever subtasks need to allocate a new segment via the supervisor task.
+    // As a result, this code is never called in the Overlord. For now using the materialized intervals
+    // here is ok for performance reasons
+    final Set<Interval> materializedBucketIntervals = granularitySpec.materializedBucketIntervals();
 
     // List locks whenever allocating a new segment because locks might be revoked and no longer valid.
     final List<TaskLock> locks = toolbox
@@ -1054,7 +1058,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
 
     Interval interval;
     String version;
-    if (bucketIntervals.isPresent()) {
+    if (!materializedBucketIntervals.isEmpty()) {
       // If granularity spec has explicit intervals, we just need to find the version associated to the interval.
       // This is because we should have gotten all required locks up front when the task starts up.
       final Optional<Interval> maybeInterval = granularitySpec.bucketInterval(timestamp);
@@ -1063,7 +1067,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
       }
 
       interval = maybeInterval.get();
-      if (!bucketIntervals.get().contains(interval)) {
+      if (!materializedBucketIntervals.contains(interval)) {
         throw new ISE("Unspecified interval[%s] in granularitySpec[%s]", interval, granularitySpec);
       }
 
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java
index 6bec35d..a0a6179 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java
@@ -106,7 +106,7 @@ public class PartialDimensionDistributionTask extends PerfectRollupWorkerTask
     );
   }
 
-  @VisibleForTesting  // Only for testing
+  @VisibleForTesting
   PartialDimensionDistributionTask(
       @Nullable String id,
       final String groupId,
@@ -334,7 +334,8 @@ public class PartialDimensionDistributionTask extends PerfectRollupWorkerTask
       this(queryGranularity, BLOOM_FILTER_EXPECTED_INSERTIONS, BLOOM_FILTER_EXPECTED_FALSE_POSITIVE_PROBABILTY);
     }
 
-    @VisibleForTesting  // to allow controlling false positive rate of bloom filter
+    @VisibleForTesting
+      // to allow controlling false positive rate of bloom filter
     DedupInputRowFilter(
         Granularity queryGranularity,
         int bloomFilterExpectedInsertions,
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java
index b252e5d..da22876 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java
@@ -42,7 +42,6 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
-import java.util.SortedSet;
 import java.util.stream.Collectors;
 
 /**
@@ -194,7 +193,7 @@ public class PartialHashSegmentGenerateTask extends PartialSegmentGenerateTask<G
       // We only care about the intervals in intervalToNumShardsOverride here.
       intervalToNumShardsOverride.forEach(partitionAnalysis::updateBucket);
     } else {
-      final SortedSet<Interval> intervals = granularitySpec.bucketIntervals().get();
+      final Iterable<Interval> intervals = granularitySpec.sortedBucketIntervals();
       final int numBucketsPerInterval = partitionsSpec.getNumShards() == null
                                         ? 1
                                         : partitionsSpec.getNumShards();
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
index 2cc9f72..ec019b1 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
@@ -98,7 +98,7 @@ public class SinglePhaseSubTask extends AbstractBatchIndexTask
    * the segment granularity of existing segments until the task reads all data because we don't know what segments
    * are going to be overwritten. As a result, we assume that segment granularity is going to be changed if intervals
    * are missing and force to use timeChunk lock.
-   *
+   * <p>
    * This variable is initialized in the constructor and used in {@link #run} to log that timeChunk lock was enforced
    * in the task logs.
    */
@@ -132,10 +132,10 @@ public class SinglePhaseSubTask extends AbstractBatchIndexTask
     this.ingestionSchema = ingestionSchema;
     this.supervisorTaskId = supervisorTaskId;
     this.missingIntervalsInOverwriteMode = !ingestionSchema.getIOConfig().isAppendToExisting()
-                                           && !ingestionSchema.getDataSchema()
-                                                              .getGranularitySpec()
-                                                              .bucketIntervals()
-                                                              .isPresent();
+                                           && ingestionSchema.getDataSchema()
+                                                             .getGranularitySpec()
+                                                             .inputIntervals()
+                                                             .isEmpty();
     if (missingIntervalsInOverwriteMode) {
       addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, true);
     }
@@ -152,7 +152,7 @@ public class SinglePhaseSubTask extends AbstractBatchIndexTask
   {
     return determineLockGranularityAndTryLock(
         new SurrogateTaskActionClient(supervisorTaskId, taskActionClient),
-        ingestionSchema.getDataSchema().getGranularitySpec()
+        ingestionSchema.getDataSchema().getGranularitySpec().inputIntervals()
     );
   }
 
@@ -263,7 +263,7 @@ public class SinglePhaseSubTask extends AbstractBatchIndexTask
    * If the number of rows added to {@link BaseAppenderatorDriver} so far exceeds {@link DynamicPartitionsSpec#maxTotalRows}
    * </li>
    * </ul>
-   *
+   * <p>
    * At the end of this method, all the remaining segments are published.
    *
    * @return true if generated segments are successfully published, otherwise false
@@ -291,7 +291,7 @@ public class SinglePhaseSubTask extends AbstractBatchIndexTask
     final ParallelIndexTuningConfig tuningConfig = ingestionSchema.getTuningConfig();
     final DynamicPartitionsSpec partitionsSpec = (DynamicPartitionsSpec) tuningConfig.getGivenOrDefaultPartitionsSpec();
     final long pushTimeout = tuningConfig.getPushTimeout();
-    final boolean explicitIntervals = granularitySpec.bucketIntervals().isPresent();
+    final boolean explicitIntervals = !granularitySpec.inputIntervals().isEmpty();
     final SegmentAllocator segmentAllocator = SegmentAllocators.forLinearPartitioning(
         toolbox,
         getId(),
diff --git a/server/src/main/java/org/apache/druid/segment/indexing/granularity/ArbitraryGranularitySpec.java b/server/src/main/java/org/apache/druid/segment/indexing/granularity/ArbitraryGranularitySpec.java
index b2e1831..0f97749 100644
--- a/server/src/main/java/org/apache/druid/segment/indexing/granularity/ArbitraryGranularitySpec.java
+++ b/server/src/main/java/org/apache/druid/segment/indexing/granularity/ArbitraryGranularitySpec.java
@@ -21,29 +21,20 @@ package org.apache.druid.segment.indexing.granularity;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.PeekingIterator;
-import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.granularity.Granularity;
-import org.apache.druid.java.util.common.guava.Comparators;
-import org.joda.time.DateTime;
 import org.joda.time.Interval;
 
 import javax.annotation.Nullable;
-import java.util.ArrayList;
 import java.util.List;
-import java.util.SortedSet;
-import java.util.TreeSet;
 
-public class ArbitraryGranularitySpec implements GranularitySpec
+public class ArbitraryGranularitySpec extends BaseGranularitySpec
 {
-  private final TreeSet<Interval> intervals;
   private final Granularity queryGranularity;
-  private final Boolean rollup;
+  protected LookupIntervalBuckets lookupTableBucketByDateTime;
 
   @JsonCreator
   public ArbitraryGranularitySpec(
@@ -52,22 +43,15 @@ public class ArbitraryGranularitySpec implements GranularitySpec
       @JsonProperty("intervals") @Nullable List<Interval> inputIntervals
   )
   {
+    super(inputIntervals, rollup);
     this.queryGranularity = queryGranularity == null ? Granularities.NONE : queryGranularity;
-    this.rollup = rollup == null ? Boolean.TRUE : rollup;
-    this.intervals = new TreeSet<>(Comparators.intervalsByStartThenEnd());
 
-    if (inputIntervals == null) {
-      inputIntervals = new ArrayList<>();
-    }
-
-    // Insert all intervals
-    intervals.addAll(inputIntervals);
+    lookupTableBucketByDateTime = new LookupIntervalBuckets(inputIntervals);
 
     // Ensure intervals are non-overlapping (but they may abut each other)
-    final PeekingIterator<Interval> intervalIterator = Iterators.peekingIterator(intervals.iterator());
+    final PeekingIterator<Interval> intervalIterator = Iterators.peekingIterator(sortedBucketIntervals().iterator());
     while (intervalIterator.hasNext()) {
       final Interval currentInterval = intervalIterator.next();
-
       if (intervalIterator.hasNext()) {
         final Interval nextInterval = intervalIterator.peek();
         if (currentInterval.overlaps(nextInterval)) {
@@ -86,29 +70,9 @@ public class ArbitraryGranularitySpec implements GranularitySpec
   }
 
   @Override
-  @JsonProperty("intervals")
-  public Optional<SortedSet<Interval>> bucketIntervals()
-  {
-    return Optional.of(intervals);
-  }
-
-  @Override
-  public List<Interval> inputIntervals()
-  {
-    return ImmutableList.copyOf(intervals);
-  }
-
-  @Override
-  public Optional<Interval> bucketInterval(DateTime dt)
+  public Iterable<Interval> sortedBucketIntervals()
   {
-    // First interval with start time ≤ dt
-    final Interval interval = intervals.floor(new Interval(dt, DateTimes.MAX));
-
-    if (interval != null && interval.contains(dt)) {
-      return Optional.of(interval);
-    } else {
-      return Optional.absent();
-    }
+    return () -> lookupTableBucketByDateTime.iterator();
   }
 
   @Override
@@ -118,13 +82,6 @@ public class ArbitraryGranularitySpec implements GranularitySpec
   }
 
   @Override
-  @JsonProperty("rollup")
-  public boolean isRollup()
-  {
-    return rollup;
-  }
-
-  @Override
   @JsonProperty("queryGranularity")
   public Granularity getQueryGranularity()
   {
@@ -143,7 +100,7 @@ public class ArbitraryGranularitySpec implements GranularitySpec
 
     ArbitraryGranularitySpec that = (ArbitraryGranularitySpec) o;
 
-    if (!intervals.equals(that.intervals)) {
+    if (!inputIntervals().equals(that.inputIntervals())) {
       return false;
     }
     if (!rollup.equals(that.rollup)) {
@@ -159,7 +116,7 @@ public class ArbitraryGranularitySpec implements GranularitySpec
   @Override
   public int hashCode()
   {
-    int result = intervals.hashCode();
+    int result = inputIntervals().hashCode();
     result = 31 * result + rollup.hashCode();
     result = 31 * result + (queryGranularity != null ? queryGranularity.hashCode() : 0);
     return result;
@@ -169,7 +126,7 @@ public class ArbitraryGranularitySpec implements GranularitySpec
   public String toString()
   {
     return "ArbitraryGranularitySpec{" +
-           "intervals=" + intervals +
+           "intervals=" + inputIntervals() +
            ", queryGranularity=" + queryGranularity +
            ", rollup=" + rollup +
            '}';
@@ -180,4 +137,11 @@ public class ArbitraryGranularitySpec implements GranularitySpec
   {
     return new ArbitraryGranularitySpec(queryGranularity, rollup, inputIntervals);
   }
+
+  @Override
+  protected LookupIntervalBuckets getLookupTableBuckets()
+  {
+    return lookupTableBucketByDateTime;
+  }
+
 }
diff --git a/server/src/main/java/org/apache/druid/segment/indexing/granularity/BaseGranularitySpec.java b/server/src/main/java/org/apache/druid/segment/indexing/granularity/BaseGranularitySpec.java
new file mode 100644
index 0000000..9ff114b
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/segment/indexing/granularity/BaseGranularitySpec.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.indexing.granularity;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterators;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.guava.Comparators;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.TreeSet;
+
+abstract class BaseGranularitySpec implements GranularitySpec
+{
+  protected List<Interval> inputIntervals;
+  protected final Boolean rollup;
+
+  public BaseGranularitySpec(List<Interval> inputIntervals, Boolean rollup)
+  {
+    if (inputIntervals != null) {
+      this.inputIntervals = ImmutableList.copyOf(inputIntervals);
+    } else {
+      this.inputIntervals = Collections.emptyList();
+    }
+    this.rollup = rollup == null ? Boolean.TRUE : rollup;
+  }
+
+  @Override
+  @JsonProperty("intervals")
+  public List<Interval> inputIntervals()
+  {
+    return inputIntervals;
+  }
+
+  @Override
+  @JsonProperty("rollup")
+  public boolean isRollup()
+  {
+    return rollup;
+  }
+
+  @Override
+  public Optional<Interval> bucketInterval(DateTime dt)
+  {
+    return getLookupTableBuckets().bucketInterval(dt);
+  }
+
+  @Override
+  public TreeSet<Interval> materializedBucketIntervals()
+  {
+    return getLookupTableBuckets().materializedIntervals();
+  }
+
+  protected abstract LookupIntervalBuckets getLookupTableBuckets();
+
+  /**
+   * This is a helper class to facilitate sharing the code for sortedBucketIntervals among
+   * the various GranularitySpec implementations. In particular, the UniformGranularitySpec
+   * needs to avoid materializing the intervals when the need to traverse them arises.
+   */
+  protected static class LookupIntervalBuckets
+  {
+    private final Iterable<Interval> intervalIterable;
+    private final TreeSet<Interval> intervals;
+
+    /**
+     * @param intervalIterable The intervals to materialize
+     */
+    public LookupIntervalBuckets(Iterable<Interval> intervalIterable)
+    {
+      this.intervalIterable = intervalIterable;
+      // The tree set will be materialized on demand (see below) to avoid client code
+      // blowing up when constructing this data structure and when the
+      // number of intervals is very large...
+      this.intervals = new TreeSet<>(Comparators.intervalsByStartThenEnd());
+    }
+
+    /**
+     * Returns a bucket interval using a fast lookup into an efficient data structure
+     * where all the intervals have been materialized
+     *
+     * @param dt The date time to lookup
+     * @return An Optional containing the interval for the given DateTime if it exists
+     */
+    public Optional<Interval> bucketInterval(DateTime dt)
+    {
+      final Interval interval = materializedIntervals().floor(new Interval(dt, DateTimes.MAX));
+      if (interval != null && interval.contains(dt)) {
+        return Optional.of(interval);
+      } else {
+        return Optional.absent();
+      }
+    }
+
+    /**
+     * @return An iterator to traverse the materialized intervals. The traversal will be done in
+     * order as dictated by Comparators.intervalsByStartThenEnd()
+     */
+    public Iterator<Interval> iterator()
+    {
+      return materializedIntervals().iterator();
+    }
+
+    /**
+     * Helper method to avoid collecting the intervals from the iterator
+     *
+     * @return The TreeSet of materialized intervals
+     */
+    public TreeSet<Interval> materializedIntervals()
+    {
+      if (intervalIterable != null && intervalIterable.iterator().hasNext() && intervals.isEmpty()) {
+        Iterators.addAll(intervals, intervalIterable.iterator());
+      }
+      return intervals;
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/druid/segment/indexing/granularity/GranularitySpec.java b/server/src/main/java/org/apache/druid/segment/indexing/granularity/GranularitySpec.java
index 9272f69..c3bb579 100644
--- a/server/src/main/java/org/apache/druid/segment/indexing/granularity/GranularitySpec.java
+++ b/server/src/main/java/org/apache/druid/segment/indexing/granularity/GranularitySpec.java
@@ -27,7 +27,7 @@ import org.joda.time.DateTime;
 import org.joda.time.Interval;
 
 import java.util.List;
-import java.util.SortedSet;
+import java.util.TreeSet;
 
 /**
  * Tells the indexer how to group events based on timestamp. The events may then be further partitioned based
@@ -41,11 +41,13 @@ import java.util.SortedSet;
 public interface GranularitySpec
 {
   /**
-   * Set of all time groups, broken up on segment boundaries. Should be sorted by interval start and non-overlapping.
+   * Iterable all time groups, broken up on segment boundaries. Should be sorted by interval start and non-overlapping.
    *
-   * @return set of all time groups
+   * @return Iterable of all time groups
    */
-  Optional<SortedSet<Interval>> bucketIntervals();
+  Iterable<Interval> sortedBucketIntervals();
+
+
 
   /**
    * Returns user provided intervals as-is state. used for configuring granular path spec
@@ -58,11 +60,18 @@ public interface GranularitySpec
    * Time-grouping interval corresponding to some instant, if any.
    *
    * @param dt instant to return time interval for
-   *
    * @return optional time interval
    */
   Optional<Interval> bucketInterval(DateTime dt);
 
+  /**
+   * This is a helper method for areas of the code, not in the overlord, were for performance
+   * reasons might need the materialized set of bucket intervals
+   * @return A fast lookup, ordered set, of the materialized bucket interval
+   *
+   */
+  TreeSet<Interval> materializedBucketIntervals();
+
   Granularity getSegmentGranularity();
 
   boolean isRollup();
diff --git a/server/src/main/java/org/apache/druid/segment/indexing/granularity/UniformGranularitySpec.java b/server/src/main/java/org/apache/druid/segment/indexing/granularity/UniformGranularitySpec.java
index fae680a..5fe6a9e 100644
--- a/server/src/main/java/org/apache/druid/segment/indexing/granularity/UniformGranularitySpec.java
+++ b/server/src/main/java/org/apache/druid/segment/indexing/granularity/UniformGranularitySpec.java
@@ -21,28 +21,22 @@ package org.apache.druid.segment.indexing.granularity;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.granularity.Granularity;
-import org.joda.time.DateTime;
+import org.apache.druid.java.util.common.granularity.IntervalsByGranularity;
 import org.joda.time.Interval;
 
-import java.util.ArrayList;
 import java.util.List;
-import java.util.SortedSet;
 
-public class UniformGranularitySpec implements GranularitySpec
+public class UniformGranularitySpec extends BaseGranularitySpec
 {
   private static final Granularity DEFAULT_SEGMENT_GRANULARITY = Granularities.DAY;
   private static final Granularity DEFAULT_QUERY_GRANULARITY = Granularities.NONE;
 
   private final Granularity segmentGranularity;
   private final Granularity queryGranularity;
-  private final Boolean rollup;
-  private final List<Interval> inputIntervals;
-  private final ArbitraryGranularitySpec wrappedSpec;
+  private final IntervalsByGranularity intervalsByGranularity;
+  protected LookupIntervalBuckets lookupTableBucketByDateTime;
 
   @JsonCreator
   public UniformGranularitySpec(
@@ -52,21 +46,11 @@ public class UniformGranularitySpec implements GranularitySpec
       @JsonProperty("intervals") List<Interval> inputIntervals
   )
   {
+    super(inputIntervals, rollup);
     this.queryGranularity = queryGranularity == null ? DEFAULT_QUERY_GRANULARITY : queryGranularity;
-    this.rollup = rollup == null ? Boolean.TRUE : rollup;
     this.segmentGranularity = segmentGranularity == null ? DEFAULT_SEGMENT_GRANULARITY : segmentGranularity;
-
-    if (inputIntervals != null) {
-      List<Interval> granularIntervals = new ArrayList<>();
-      for (Interval inputInterval : inputIntervals) {
-        Iterables.addAll(granularIntervals, this.segmentGranularity.getIterable(inputInterval));
-      }
-      this.inputIntervals = ImmutableList.copyOf(inputIntervals);
-      this.wrappedSpec = new ArbitraryGranularitySpec(queryGranularity, rollup, granularIntervals);
-    } else {
-      this.inputIntervals = null;
-      this.wrappedSpec = null;
-    }
+    intervalsByGranularity = new IntervalsByGranularity(this.inputIntervals, segmentGranularity);
+    lookupTableBucketByDateTime = new LookupIntervalBuckets(sortedBucketIntervals());
   }
 
   public UniformGranularitySpec(
@@ -79,29 +63,9 @@ public class UniformGranularitySpec implements GranularitySpec
   }
 
   @Override
-  public Optional<SortedSet<Interval>> bucketIntervals()
-  {
-    if (wrappedSpec == null) {
-      return Optional.absent();
-    } else {
-      return wrappedSpec.bucketIntervals();
-    }
-  }
-
-  @Override
-  public List<Interval> inputIntervals()
+  public Iterable<Interval> sortedBucketIntervals()
   {
-    return inputIntervals == null ? ImmutableList.of() : ImmutableList.copyOf(inputIntervals);
-  }
-
-  @Override
-  public Optional<Interval> bucketInterval(DateTime dt)
-  {
-    if (wrappedSpec == null) {
-      return Optional.absent();
-    } else {
-      return wrappedSpec.bucketInterval(dt);
-    }
+    return () -> intervalsByGranularity.granularityIntervalsIterator();
   }
 
   @Override
@@ -112,25 +76,12 @@ public class UniformGranularitySpec implements GranularitySpec
   }
 
   @Override
-  @JsonProperty("rollup")
-  public boolean isRollup()
-  {
-    return rollup;
-  }
-
-  @Override
   @JsonProperty("queryGranularity")
   public Granularity getQueryGranularity()
   {
     return queryGranularity;
   }
 
-  @JsonProperty("intervals")
-  public Optional<List<Interval>> getIntervals()
-  {
-    return Optional.fromNullable(inputIntervals);
-  }
-
   @Override
   public boolean equals(Object o)
   {
@@ -149,14 +100,15 @@ public class UniformGranularitySpec implements GranularitySpec
     if (!queryGranularity.equals(that.queryGranularity)) {
       return false;
     }
-    if (!rollup.equals(that.rollup)) {
+    if (isRollup() != that.isRollup()) {
       return false;
     }
 
     if (inputIntervals != null ? !inputIntervals.equals(that.inputIntervals) : that.inputIntervals != null) {
       return false;
     }
-    return !(wrappedSpec != null ? !wrappedSpec.equals(that.wrappedSpec) : that.wrappedSpec != null);
+
+    return true;
 
   }
 
@@ -167,7 +119,6 @@ public class UniformGranularitySpec implements GranularitySpec
     result = 31 * result + queryGranularity.hashCode();
     result = 31 * result + rollup.hashCode();
     result = 31 * result + (inputIntervals != null ? inputIntervals.hashCode() : 0);
-    result = 31 * result + (wrappedSpec != null ? wrappedSpec.hashCode() : 0);
     return result;
   }
 
@@ -179,7 +130,6 @@ public class UniformGranularitySpec implements GranularitySpec
            ", queryGranularity=" + queryGranularity +
            ", rollup=" + rollup +
            ", inputIntervals=" + inputIntervals +
-           ", wrappedSpec=" + wrappedSpec +
            '}';
   }
 
@@ -188,4 +138,11 @@ public class UniformGranularitySpec implements GranularitySpec
   {
     return new UniformGranularitySpec(segmentGranularity, queryGranularity, rollup, inputIntervals);
   }
+
+  @Override
+  protected LookupIntervalBuckets getLookupTableBuckets()
+  {
+    return lookupTableBucketByDateTime;
+  }
+
 }
diff --git a/server/src/test/java/org/apache/druid/segment/indexing/granularity/ArbitraryGranularityTest.java b/server/src/test/java/org/apache/druid/segment/indexing/granularity/ArbitraryGranularityTest.java
index d7dff7a..a015422 100644
--- a/server/src/test/java/org/apache/druid/segment/indexing/granularity/ArbitraryGranularityTest.java
+++ b/server/src/test/java/org/apache/druid/segment/indexing/granularity/ArbitraryGranularityTest.java
@@ -21,6 +21,7 @@ package org.apache.druid.segment.indexing.granularity;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.DateTimes;
@@ -47,7 +48,8 @@ public class ArbitraryGranularityTest
             Intervals.of("2012-01-07T00Z/2012-01-08T00Z"),
             Intervals.of("2012-01-03T00Z/2012-01-04T00Z"),
             Intervals.of("2012-01-01T00Z/2012-01-03T00Z")
-        ));
+        )
+    );
     Assert.assertNotNull(spec.getQueryGranularity());
   }
 
@@ -57,12 +59,13 @@ public class ArbitraryGranularityTest
     final GranularitySpec spec = new ArbitraryGranularitySpec(
         Granularities.NONE,
         Lists.newArrayList(
-        Intervals.of("2012-01-08T00Z/2012-01-11T00Z"),
-        Intervals.of("2012-02-01T00Z/2012-03-01T00Z"),
-        Intervals.of("2012-01-07T00Z/2012-01-08T00Z"),
-        Intervals.of("2012-01-03T00Z/2012-01-04T00Z"),
-        Intervals.of("2012-01-01T00Z/2012-01-03T00Z")
-    ));
+            Intervals.of("2012-01-08T00Z/2012-01-11T00Z"),
+            Intervals.of("2012-02-01T00Z/2012-03-01T00Z"),
+            Intervals.of("2012-01-07T00Z/2012-01-08T00Z"),
+            Intervals.of("2012-01-03T00Z/2012-01-04T00Z"),
+            Intervals.of("2012-01-01T00Z/2012-01-03T00Z")
+        )
+    );
 
     Assert.assertTrue(spec.isRollup());
 
@@ -74,7 +77,17 @@ public class ArbitraryGranularityTest
             Intervals.of("2012-01-08T00Z/2012-01-11T00Z"),
             Intervals.of("2012-02-01T00Z/2012-03-01T00Z")
         ),
-        Lists.newArrayList(spec.bucketIntervals().get())
+        Lists.newArrayList(spec.sortedBucketIntervals())
+    );
+
+    Assert.assertEquals(
+        Optional.of(Intervals.of("2012-01-01T00Z/2012-01-03T00Z")),
+        spec.bucketInterval(DateTimes.of("2012-01-01T00Z"))
+    );
+
+    Assert.assertEquals(
+        Optional.of(Intervals.of("2012-01-08T00Z/2012-01-11T00Z")),
+        spec.bucketInterval(DateTimes.of("2012-01-08T00Z"))
     );
 
     Assert.assertEquals(
@@ -118,6 +131,7 @@ public class ArbitraryGranularityTest
         Optional.absent(),
         spec.bucketInterval(DateTimes.of("2012-01-05T00Z"))
     );
+
   }
 
   @Test
@@ -187,10 +201,26 @@ public class ArbitraryGranularityTest
 
     try {
       final GranularitySpec rtSpec = JSON_MAPPER.readValue(JSON_MAPPER.writeValueAsString(spec), GranularitySpec.class);
-      Assert.assertEquals("Round-trip", spec.bucketIntervals(), rtSpec.bucketIntervals());
+      Assert.assertEquals(
+          "Round-trip",
+          ImmutableList.copyOf(spec.sortedBucketIntervals()),
+          ImmutableList.copyOf(rtSpec.sortedBucketIntervals())
+      );
+      Assert.assertEquals(
+          "Round-trip",
+          ImmutableList.copyOf(spec.inputIntervals()),
+          ImmutableList.copyOf(rtSpec.inputIntervals())
+      );
     }
     catch (Exception e) {
       throw new RuntimeException(e);
     }
   }
+
+  @Test
+  public void testNullInputIntervals()
+  {
+    final GranularitySpec spec = new ArbitraryGranularitySpec(Granularities.NONE, null);
+    Assert.assertFalse(spec.sortedBucketIntervals().iterator().hasNext());
+  }
 }
diff --git a/server/src/test/java/org/apache/druid/segment/indexing/granularity/UniformGranularityTest.java b/server/src/test/java/org/apache/druid/segment/indexing/granularity/UniformGranularityTest.java
index 402f7fd..27760f6 100644
--- a/server/src/test/java/org/apache/druid/segment/indexing/granularity/UniformGranularityTest.java
+++ b/server/src/test/java/org/apache/druid/segment/indexing/granularity/UniformGranularityTest.java
@@ -21,6 +21,8 @@ package org.apache.druid.segment.indexing.granularity;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.DateTimes;
@@ -34,8 +36,8 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
-import java.util.SortedSet;
 
 public class UniformGranularityTest
 {
@@ -44,20 +46,27 @@ public class UniformGranularityTest
   @Test
   public void testSimple()
   {
+
+    final List<Interval> inputIntervals = Lists.newArrayList(
+        Intervals.of("2012-01-08T00Z/2012-01-11T00Z"),
+        Intervals.of("2012-01-07T00Z/2012-01-08T00Z"),
+        Intervals.of("2012-01-03T00Z/2012-01-04T00Z"),
+        Intervals.of("2012-01-01T00Z/2012-01-03T00Z")
+    );
     final GranularitySpec spec = new UniformGranularitySpec(
         Granularities.DAY,
         null,
-        Lists.newArrayList(
-            Intervals.of("2012-01-08T00Z/2012-01-11T00Z"),
-            Intervals.of("2012-01-07T00Z/2012-01-08T00Z"),
-            Intervals.of("2012-01-03T00Z/2012-01-04T00Z"),
-            Intervals.of("2012-01-01T00Z/2012-01-03T00Z")
-        )
+        inputIntervals
     );
 
     Assert.assertTrue(spec.isRollup());
 
     Assert.assertEquals(
+        inputIntervals,
+        Lists.newArrayList(spec.inputIntervals())
+    );
+
+    Assert.assertEquals(
         Lists.newArrayList(
             Intervals.of("2012-01-01T00Z/P1D"),
             Intervals.of("2012-01-02T00Z/P1D"),
@@ -67,7 +76,28 @@ public class UniformGranularityTest
             Intervals.of("2012-01-09T00Z/P1D"),
             Intervals.of("2012-01-10T00Z/P1D")
         ),
-        Lists.newArrayList(spec.bucketIntervals().get())
+        Lists.newArrayList(spec.sortedBucketIntervals())
+    );
+
+
+    Assert.assertEquals(
+        Optional.<Interval>absent(),
+        spec.bucketInterval(DateTimes.of("2011-01-12T00Z"))
+    );
+
+    Assert.assertEquals(
+        Optional.of(Intervals.of("2012-01-01T00Z/2012-01-02T00Z")),
+        spec.bucketInterval(DateTimes.of("2012-01-01T00Z"))
+    );
+
+    Assert.assertEquals(
+        Optional.of(Intervals.of("2012-01-10T00Z/2012-01-11T00Z")),
+        spec.bucketInterval(DateTimes.of("2012-01-10T00Z"))
+    );
+
+    Assert.assertEquals(
+        Optional.<Interval>absent(),
+        spec.bucketInterval(DateTimes.of("2012-01-12T00Z"))
     );
 
     Assert.assertEquals(
@@ -99,6 +129,7 @@ public class UniformGranularityTest
         Optional.of(Intervals.of("2012-01-08T00Z/2012-01-09T00Z")),
         spec.bucketInterval(DateTimes.of("2012-01-08T01Z"))
     );
+
   }
 
   @Test
@@ -132,9 +163,9 @@ public class UniformGranularityTest
     try {
       final GranularitySpec rtSpec = JOSN_MAPPER.readValue(JOSN_MAPPER.writeValueAsString(spec), GranularitySpec.class);
       Assert.assertEquals(
-          "Round-trip bucketIntervals",
-          spec.bucketIntervals(),
-          rtSpec.bucketIntervals()
+          "Round-trip sortedBucketIntervals",
+          ImmutableList.copyOf(spec.sortedBucketIntervals()),
+          ImmutableList.copyOf(rtSpec.sortedBucketIntervals().iterator())
       );
       Assert.assertEquals(
           "Round-trip granularity",
@@ -253,10 +284,9 @@ public class UniformGranularityTest
         )
     );
 
-    Assert.assertTrue(spec.bucketIntervals().isPresent());
+    Assert.assertTrue(spec.sortedBucketIntervals().iterator().hasNext());
 
-    final Optional<SortedSet<Interval>> sortedSetOptional = spec.bucketIntervals();
-    final SortedSet<Interval> intervals = sortedSetOptional.get();
+    final Iterable<Interval> intervals = spec.sortedBucketIntervals();
     ArrayList<Long> actualIntervals = new ArrayList<>();
     for (Interval interval : intervals) {
       actualIntervals.add(interval.toDurationMillis());
@@ -279,6 +309,25 @@ public class UniformGranularityTest
     Assert.assertEquals(expectedIntervals, actualIntervals);
   }
 
+  @Test
+  public void testUniformGranularitySpecWithLargeNumberOfIntervalsDoesNotBlowUp()
+  {
+    // just make sure that intervals for uniform spec are not materialized (causing OOM) when created
+    final GranularitySpec spec = new UniformGranularitySpec(
+        Granularities.SECOND,
+        null,
+        Collections.singletonList(
+            Intervals.of("2012-01-01T00Z/P10Y")
+        )
+    );
+
+    Assert.assertTrue(spec != null);
+
+    int count = Iterators.size(spec.sortedBucketIntervals().iterator());
+    // account for three leap years...
+    Assert.assertEquals(3600 * 24 * 365 * 10 + 3 * 24 * 3600, count);
+  }
+
   private void notEqualsCheck(GranularitySpec spec1, GranularitySpec spec2)
   {
     Assert.assertNotEquals(spec1, spec2);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org