You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2021/01/11 20:05:11 UTC

[GitHub] [druid] loquisgon opened a new pull request #10742: Granularity interval materialization

loquisgon opened a new pull request #10742:
URL: https://github.com/apache/druid/pull/10742


   <!-- Thanks for trying to help us make Apache Druid be the best it can be! Please fill out as much of the following information as is possible (where relevant, and remove it when irrelevant) to help make the intention and scope of this PR clear in order to ease review. -->
   
   Fixes #XXXX.
   
   <!-- Replace XXXX with the id of the issue fixed in this PR. Remove this section if there is no corresponding issue. Don't reference the issue in the title of this pull-request. -->
   
   <!-- If you are a committer, follow the PR action item checklist for committers:
   https://github.com/apache/druid/blob/master/dev/committer-instructions.md#pr-and-issue-action-item-checklist-for-committers. -->
   
   ### Description
   
   <!-- Describe the goal of this PR, what problem are you fixing. If there is a corresponding issue (referenced above), it's not necessary to repeat the description here, however, you may choose to keep one summary sentence. -->
   
   <!-- Describe your patch: what did you change in code? How did you fix the problem? -->
   
   <!-- If there are several relatively logically separate changes in this PR, create a mini-section for each of them. For example: -->
   
   #### Fixed the bug ...
   #### Renamed the class ...
   #### Added a forbidden-apis entry ...
   
   <!--
   In each section, please describe design decisions made, including:
    - Choice of algorithms
    - Behavioral aspects. What configuration values are acceptable? How are corner cases and error conditions handled, such as when there are insufficient resources?
    - Class organization and design (how the logic is split between classes, inheritance, composition, design patterns)
    - Method organization and design (how the logic is split between methods, parameters and return types)
    - Naming (class, method, API, configuration, HTTP endpoint, names of emitted metrics)
   -->
   
   
   <!-- It's good to describe an alternative design (or mention an alternative name) for every design (or naming) decision point and compare the alternatives with the designs that you've implemented (or the names you've chosen) to highlight the advantages of the chosen designs and names. -->
   
   <!-- If there was a discussion of the design of the feature implemented in this PR elsewhere (e. g. a "Proposal" issue, any other issue, or a thread in the development mailing list), link to that discussion from this PR description and explain what have changed in your final design compared to your original proposal or the consensus version in the end of the discussion. If something hasn't changed since the original discussion, you can omit a detailed discussion of those aspects of the design here, perhaps apart from brief mentioning for the sake of readability of this PR description. -->
   
   <!-- Some of the aspects mentioned above may be omitted for simple and small changes. -->
   
   <hr>
   
   This PR has:
   - [ ] been self-reviewed.
      - [ ] using the [concurrency checklist](https://github.com/apache/druid/blob/master/dev/code-review/concurrency.md) (Remove this item if the PR doesn't have any relation to concurrency.)
   - [ ] added documentation for new or modified features or behaviors.
   - [ ] added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
   - [ ] added or updated version, license, or notice information in [licenses.yaml](https://github.com/apache/druid/blob/master/licenses.yaml)
   - [ ] added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
   - [ ] added unit tests or modified existing tests to cover new code paths, ensuring the threshold for [code coverage](https://github.com/apache/druid/blob/master/dev/code-review/code-coverage.md) is met.
   - [ ] added integration tests.
   - [ ] been tested in a test Druid cluster.
   
   <!-- Check the items by putting "x" in the brackets for the done things. Not all of these items apply to every PR. Remove the items which are not done or not relevant to the PR. None of the items from the checklist above are strictly necessary, but it would be very helpful if you at least self-review the PR. -->
   
   <hr>
   
   ##### Key changed/added classes in this PR
    * `MyFoo`
    * `OurBar`
    * `TheirBaz`
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] loquisgon commented on a change in pull request #10742: Granularity interval materialization

Posted by GitBox <gi...@apache.org>.
loquisgon commented on a change in pull request #10742:
URL: https://github.com/apache/druid/pull/10742#discussion_r562027501



##########
File path: server/src/test/java/org/apache/druid/segment/indexing/granularity/IntervalsByGranularityTest.java
##########
@@ -76,6 +79,21 @@ public void testDups()
     Assert.assertTrue(count == 61);
   }
 
+
+  @Test
+  public void testCondenseDoesNotMaterialize()

Review comment:
       It only does indirectly, if it materialized it would create 1 year worth of seconds amount of intervals (over 31 million). If it materialized I reasoned that it would slow down the test or something. I can remove it or rename it. Let me know if you think of a better way to test that. I don't want to measure memory before and after using the `Runtime.getRuntime()` methods because they are not very precise either.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] loquisgon commented on a change in pull request #10742: Granularity interval materialization

Posted by GitBox <gi...@apache.org>.
loquisgon commented on a change in pull request #10742:
URL: https://github.com/apache/druid/pull/10742#discussion_r564678335



##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
##########
@@ -373,23 +373,25 @@ private boolean tryLockWithDetermineResult(TaskActionClient client, LockGranular
     }
   }
 
+
   protected boolean tryTimeChunkLock(TaskActionClient client, List<Interval> intervals) throws IOException
   {
-
-    // In this case, the intervals to lock must be aligned with segmentGranularity if it's defined
-    final Iterator<Interval> intervalIterator;
+    // The given intervals are first converted to align with segment granularity. This is because,
+    // when an overwriting task finds a version for a given input row, it expects the interval
+    // associated to each version to be equal or larger than the time bucket where the input row falls in.
+    // See ParallelIndexSupervisorTask.findVersion().
+    final Set<Interval> uniqueCondensedIntervals = new HashSet<>();
     final Granularity segmentGranularity = getSegmentGranularity();
     if (segmentGranularity == null) {
-      Set<Interval> uniqueIntervals = new HashSet<>(intervals);
-      ArrayList<Interval> condensedIntervals = JodaUtils.condenseIntervals(() -> uniqueIntervals.iterator());
-      intervalIterator = condensedIntervals.iterator();
+      uniqueCondensedIntervals.addAll(JodaUtils.condenseIntervals(intervals));
     } else {
       IntervalsByGranularity intervalsByGranularity = new IntervalsByGranularity(intervals, segmentGranularity);
-      intervalIterator = intervalsByGranularity.granularityIntervalsIterator();
+      // the following is calling a condense that does not materialize the intervals:
+      uniqueCondensedIntervals.addAll(JodaUtils.condenseIntervals(intervalsByGranularity.granularityIntervalsIterator()));

Review comment:
       I like your suggestion so I followed it and will avoid materializing them.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #10742: Granularity interval materialization

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #10742:
URL: https://github.com/apache/druid/pull/10742#discussion_r564203826



##########
File path: core/src/main/java/org/apache/druid/java/util/common/JodaUtils.java
##########
@@ -50,15 +58,31 @@
         sortedIntervals.add(interval);
       }
     }
+    return condenseIntervals(sortedIntervals.iterator());
+  }
 
-    if (sortedIntervals.isEmpty()) {
-      return new ArrayList<>();
+  /**
+   * This method does not materialize the intervals represented by the
+   * sortedIntervals iterator. However, caller needs to insure that sortedIntervals
+   * is already sorted in ascending order.

Review comment:
       Since `Interval` is not `Comparable` (it's not because there could be multiple ways to compare them), it would be nice to explicitly state the order should be `Comparators.intervalsByStartThenEnd()` to make it more clear.

##########
File path: core/src/main/java/org/apache/druid/java/util/common/granularity/IntervalsByGranularity.java
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.common.granularity;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
+import com.google.common.collect.Sets;
+import org.apache.druid.java.util.common.guava.Comparators;
+import org.joda.time.Interval;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * Produce a stream of intervals generated by a given set of intervals as input and a given
+ * granularity. This class avoids materializing the granularity intervals whenever possible.
+ */
+public class IntervalsByGranularity
+{
+  private final List<Interval> sortedIntervals;
+  private final Granularity granularity;
+
+  public IntervalsByGranularity(Collection<Interval> intervals, Granularity granularity)
+  {
+    // eliminate dups & sort intervals:
+    HashSet<Interval> intervalSet = Sets.newHashSetWithExpectedSize(intervals.size());
+    intervalSet.addAll(intervals);
+    this.sortedIntervals = new ArrayList<>(intervals.size());
+    this.sortedIntervals.addAll(intervalSet);
+    this.sortedIntervals.sort(Comparators.intervalsByStartThenEnd());

Review comment:
       It seems that `IntervalIterator` would work only when `sortedIntervals` don't overlap each other because the intervals returned from `IntervalIterator` may not be sorted otherwise. Is this correct? Then, please add a sanity check after dedup to make sure there is no overlap.

##########
File path: indexing-hadoop/src/main/java/org/apache/druid/indexer/DetermineHashedPartitionsJob.java
##########
@@ -109,10 +109,10 @@ public boolean run()
       groupByJob.setOutputValueClass(NullWritable.class);
       groupByJob.setOutputFormatClass(SequenceFileOutputFormat.class);
       groupByJob.setPartitionerClass(DetermineHashedPartitionsPartitioner.class);
-      if (!config.getSegmentGranularIntervals().isPresent()) {
+      if (!config.getSegmentGranularIntervals().iterator().hasNext()) {

Review comment:
       Same comment for other places where it calls `iterator.hasNext()` for the same purpose.

##########
File path: indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJobTest.java
##########
@@ -99,7 +98,7 @@ public void testRunWithSingleDimensionPartitionsSpecCreateHashBasedNumberedShard
     final HadoopDruidIndexerConfig config = Mockito.mock(HadoopDruidIndexerConfig.class);
     Mockito.when(config.isDeterminingPartitions()).thenReturn(false);
     Mockito.when(config.getPartitionsSpec()).thenReturn(partitionsSpec);
-    Mockito.when(config.getSegmentGranularIntervals()).thenReturn(Optional.of(intervals));
+    Mockito.when(config.getSegmentGranularIntervals()).thenReturn(() -> intervals.iterator());

Review comment:
       nit: the lambda is redundant since `intervals` is an `Iterable`.

##########
File path: core/src/main/java/org/apache/druid/java/util/common/granularity/IntervalsByGranularity.java
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.common.granularity;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
+import com.google.common.collect.Sets;
+import org.apache.druid.java.util.common.guava.Comparators;
+import org.joda.time.Interval;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * Produce a stream of intervals generated by a given set of intervals as input and a given
+ * granularity. This class avoids materializing the granularity intervals whenever possible.
+ */
+public class IntervalsByGranularity
+{
+  private final List<Interval> sortedIntervals;
+  private final Granularity granularity;
+
+  public IntervalsByGranularity(Collection<Interval> intervals, Granularity granularity)
+  {
+    // eliminate dups & sort intervals:
+    HashSet<Interval> intervalSet = Sets.newHashSetWithExpectedSize(intervals.size());
+    intervalSet.addAll(intervals);
+    this.sortedIntervals = new ArrayList<>(intervals.size());
+    this.sortedIntervals.addAll(intervalSet);
+    this.sortedIntervals.sort(Comparators.intervalsByStartThenEnd());
+
+    this.granularity = granularity;
+  }
+
+  public Iterator<Interval> granularityIntervalsIterator()
+  {
+    Iterator<Interval> ite;
+    if (sortedIntervals.isEmpty()) {
+      ite = Collections.emptyIterator();
+    } else {
+      ite = new IntervalIterator(sortedIntervals);
+    }
+    return ite;
+  }
+
+  private class IntervalIterator implements Iterator<Interval>
+  {
+    private final List<Interval> sortedIntervals;
+    private int currentInterval;
+    private volatile PeekingIterator<Interval> currentIterator;
+    private Interval previous;
+
+    public IntervalIterator(List<Interval> sortedIntervals)
+    {
+      this.sortedIntervals = sortedIntervals;
+      this.currentInterval = 0;
+      currentIterator =
+          Iterators.peekingIterator(granularity.getIterable(sortedIntervals.get(currentInterval)).iterator());
+    }
+
+    @Override
+    public boolean hasNext()
+    {
+      if (currentIterator != null) {
+        boolean advance = false;
+        while (true) {
+
+          // is it time to move to the next iterator?
+          if (advance) {
+            if (currentInterval < sortedIntervals.size() - 1) {
+              currentIterator =
+                  Iterators.peekingIterator(granularity.getIterable(sortedIntervals.get(++currentInterval)).iterator());
+              advance = false;
+            } else {
+              break;
+            }
+          }
+
+          // check if this iterator has more elements:
+          if (currentIterator.hasNext()) {
+
+            // drop all subsequent intervals that are the same as the previous...
+            while (previous != null && previous.equals(currentIterator.peek())) {

Review comment:
       Does the iterator created from `granularity.getIterable()` ever return the same interval? It doesn't seem like so and this check seems unnecessary.

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
##########
@@ -1062,7 +1064,7 @@ SegmentIdWithShardSpec allocateNewSegment(DateTime timestamp) throws IOException
       }
 
       interval = maybeInterval.get();
-      if (!bucketIntervals.get().contains(interval)) {
+      if (!Iterators.contains(bucketIntervals.iterator(), interval)) {

Review comment:
       This method is called whenever subtasks need to allocate a new segment via the supervisor task. As a result, this code is never called in the Overlord. It might be better to do this check without materialization for less memory pressure, but will degrade ingestion performance. I suggest to keep the current behaviour of materializing all bucket intervals here because I'm not sure what is the best way to handle this yet. We need to think more about how to fix the OOM error in the task as a follow-up.

##########
File path: server/src/main/java/org/apache/druid/segment/indexing/granularity/GranularitySpec.java
##########
@@ -41,11 +40,11 @@
 public interface GranularitySpec
 {
   /**
-   * Set of all time groups, broken up on segment boundaries. Should be sorted by interval start and non-overlapping.
+   * Iterable all time groups, broken up on segment boundaries. Should be sorted by interval start and non-overlapping.
    *
-   * @return set of all time groups
+   * @return Iterable of all time groups
    */
-  Optional<SortedSet<Interval>> bucketIntervals();
+  Iterable<Interval> bucketIntervals();

Review comment:
       Suggest `sortedBucketIntervals()` to make it clear the results are sorted.

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
##########
@@ -320,12 +320,12 @@ PartialRangeSegmentGenerateParallelIndexTaskRunner createPartialRangeSegmentGene
   )
   {
     return new PartialRangeSegmentGenerateParallelIndexTaskRunner(
-       toolbox,
-       getId(),
-       getGroupId(),
-       ingestionSchema,
-       getContext(),
-       intervalToPartitions
+        toolbox,

Review comment:
       nit: I think this PR is OK, but just FYI, it's usually recommended to not fix the code style unless you are modifying that area because it makes hard to find what are the real changes of the PR.

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
##########
@@ -373,23 +373,25 @@ private boolean tryLockWithDetermineResult(TaskActionClient client, LockGranular
     }
   }
 
+
   protected boolean tryTimeChunkLock(TaskActionClient client, List<Interval> intervals) throws IOException
   {
-
-    // In this case, the intervals to lock must be aligned with segmentGranularity if it's defined
-    final Iterator<Interval> intervalIterator;
+    // The given intervals are first converted to align with segment granularity. This is because,
+    // when an overwriting task finds a version for a given input row, it expects the interval
+    // associated to each version to be equal or larger than the time bucket where the input row falls in.
+    // See ParallelIndexSupervisorTask.findVersion().
+    final Set<Interval> uniqueCondensedIntervals = new HashSet<>();
     final Granularity segmentGranularity = getSegmentGranularity();
     if (segmentGranularity == null) {
-      Set<Interval> uniqueIntervals = new HashSet<>(intervals);
-      ArrayList<Interval> condensedIntervals = JodaUtils.condenseIntervals(() -> uniqueIntervals.iterator());
-      intervalIterator = condensedIntervals.iterator();
+      uniqueCondensedIntervals.addAll(JodaUtils.condenseIntervals(intervals));
     } else {
       IntervalsByGranularity intervalsByGranularity = new IntervalsByGranularity(intervals, segmentGranularity);
-      intervalIterator = intervalsByGranularity.granularityIntervalsIterator();
+      // the following is calling a condense that does not materialize the intervals:
+      uniqueCondensedIntervals.addAll(JodaUtils.condenseIntervals(intervalsByGranularity.granularityIntervalsIterator()));

Review comment:
       > Yes, `addAll` is using the materialized intervals being returned by the `condenseIntervals` call. The assumption here is that the condensed intervals are much fewer than the bucket intervals. I think this assumption will hold in the majority of real cases. I think that in a rare corner case where the original intervals cannot be condensed then we still would have issues, but not in the majority of production cases.
   
   I agree but it doesn't seem hard to avoid materialization. How about adding a new method `condensedIntervalsIterator()` which returns an iterator that lazily computes condensed intervals? It could be something like this:
   
   ```java
     public static Iterator<Interval> condensedIntervalsIterator(Iterator<Interval> sortedIntervals)
     {
       if (!sortedIntervals.hasNext()) {
         return Iterators.emptyIterator();
       }
   
       final PeekingIterator<Interval> peekingIterator = Iterators.peekingIterator(sortedIntervals);
       return new Iterator<Interval>()
       {
         @Override
         public boolean hasNext()
         {
           return peekingIterator.hasNext();
         }
   
         @Override
         public Interval next()
         {
           if (!hasNext()) {
             throw new NoSuchElementException();
           }
           Interval currInterval = peekingIterator.next();
           while (peekingIterator.hasNext()) {
             Interval next = peekingIterator.peek();
   
             if (currInterval.abuts(next)) {
               currInterval = new Interval(currInterval.getStart(), next.getEnd());
               peekingIterator.next();
             } else if (currInterval.overlaps(next)) {
               DateTime nextEnd = next.getEnd();
               DateTime currEnd = currInterval.getEnd();
               currInterval = new Interval(
                   currInterval.getStart(),
                   nextEnd.isAfter(currEnd) ? nextEnd : currEnd
               );
               peekingIterator.next();
             } else {
               break;
             }
           }
           return currInterval;
         }
       };
     }
   ```
   
   Then, `tryTimeChunkLock()` doesn't have to materialize intervals at all after condensing.
   
   ```java
     protected boolean tryTimeChunkLock(TaskActionClient client, List<Interval> intervals) throws IOException
     {
       // The given intervals are first converted to align with segment granularity. This is because,
       // when an overwriting task finds a version for a given input row, it expects the interval
       // associated to each version to be equal or larger than the time bucket where the input row falls in.
       // See ParallelIndexSupervisorTask.findVersion().
       final Iterator<Interval> intervalIterator;
       final Granularity segmentGranularity = getSegmentGranularity();
       if (segmentGranularity == null) {
         intervalIterator = JodaUtils.condenseIntervals(intervals).iterator();
       } else {
         IntervalsByGranularity intervalsByGranularity = new IntervalsByGranularity(intervals, segmentGranularity);
         // the following is calling a condense that does not materialize the intervals:
         intervalIterator = JodaUtils.condensedIntervalsIterator(intervalsByGranularity.granularityIntervalsIterator());
       }
   
       // Intervals are already condensed to avoid creating too many locks.
       // Intervals are also sorted and thus it's safe to compare only the previous interval and current one for dedup.
       Interval prev = null;
       while (intervalIterator.hasNext()) {
         final Interval cur = intervalIterator.next();
         if (prev != null && cur.equals(prev)) {
           continue;
         }
         prev = cur;
         final TaskLock lock = client.submit(new TimeChunkLockTryAcquireAction(TaskLockType.EXCLUSIVE, cur));
         if (lock == null) {
           return false;
         }
       }
       return true;
     }
   ```

##########
File path: server/src/main/java/org/apache/druid/segment/indexing/LookupIntervalBuckets.java
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.indexing;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Iterators;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.guava.Comparators;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import java.util.Iterator;
+import java.util.TreeSet;
+
+public class LookupIntervalBuckets

Review comment:
       Please add some javadoc explaining what this class does and what is for.

##########
File path: core/src/main/java/org/apache/druid/java/util/common/granularity/IntervalsByGranularity.java
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.common.granularity;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
+import com.google.common.collect.Sets;
+import org.apache.druid.java.util.common.guava.Comparators;
+import org.joda.time.Interval;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * Produce a stream of intervals generated by a given set of intervals as input and a given
+ * granularity. This class avoids materializing the granularity intervals whenever possible.
+ */
+public class IntervalsByGranularity
+{
+  private final List<Interval> sortedIntervals;
+  private final Granularity granularity;
+
+  public IntervalsByGranularity(Collection<Interval> intervals, Granularity granularity)
+  {
+    // eliminate dups & sort intervals:
+    HashSet<Interval> intervalSet = Sets.newHashSetWithExpectedSize(intervals.size());
+    intervalSet.addAll(intervals);
+    this.sortedIntervals = new ArrayList<>(intervals.size());
+    this.sortedIntervals.addAll(intervalSet);
+    this.sortedIntervals.sort(Comparators.intervalsByStartThenEnd());
+
+    this.granularity = granularity;
+  }
+
+  public Iterator<Interval> granularityIntervalsIterator()
+  {
+    Iterator<Interval> ite;
+    if (sortedIntervals.isEmpty()) {
+      ite = Collections.emptyIterator();
+    } else {
+      ite = new IntervalIterator(sortedIntervals);

Review comment:
       Does `IntervalIterator` just flatten the nested iterators and concat them? If this is the case, I suggest to use `FluentIterable.from(sortedIntervals).transformAndConcat(interval -> granularity.getIterable(interval))` instead because it's better to use a well-tested library than inventing another wheel.

##########
File path: core/src/main/java/org/apache/druid/java/util/common/JodaUtils.java
##########
@@ -50,15 +58,31 @@
         sortedIntervals.add(interval);
       }
     }
+    return condenseIntervals(sortedIntervals.iterator());
+  }
 
-    if (sortedIntervals.isEmpty()) {
-      return new ArrayList<>();
+  /**
+   * This method does not materialize the intervals represented by the
+   * sortedIntervals iterator. However, caller needs to insure that sortedIntervals
+   * is already sorted in ascending order.

Review comment:
       How about adding a sanity check in the method, so that ti can break if this expectation doesn't meet? Otherwise, it will be hard to debug if something goes wrong in the future.

##########
File path: indexing-hadoop/src/main/java/org/apache/druid/indexer/DetermineHashedPartitionsJob.java
##########
@@ -109,10 +109,10 @@ public boolean run()
       groupByJob.setOutputValueClass(NullWritable.class);
       groupByJob.setOutputFormatClass(SequenceFileOutputFormat.class);
       groupByJob.setPartitionerClass(DetermineHashedPartitionsPartitioner.class);
-      if (!config.getSegmentGranularIntervals().isPresent()) {
+      if (!config.getSegmentGranularIntervals().iterator().hasNext()) {

Review comment:
       This code just wants to know whether `inputIntervals` are set or not. Suggest to add a new method `hasInputIntervals()` in `GranularitySpec` for better readability.

##########
File path: indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJobTest.java
##########
@@ -59,7 +58,7 @@ public void testRunWithHashedPartitionsSpecCreateHashBasedNumberedShardSpecWithH
     final HadoopDruidIndexerConfig config = Mockito.mock(HadoopDruidIndexerConfig.class);
     Mockito.when(config.isDeterminingPartitions()).thenReturn(false);
     Mockito.when(config.getPartitionsSpec()).thenReturn(partitionsSpec);
-    Mockito.when(config.getSegmentGranularIntervals()).thenReturn(Optional.of(intervals));
+    Mockito.when(config.getSegmentGranularIntervals()).thenReturn(() -> intervals.iterator());

Review comment:
       nit: the lambda is redundant since `intervals` is an `Iterable`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] loquisgon commented on a change in pull request #10742: Granularity interval materialization

Posted by GitBox <gi...@apache.org>.
loquisgon commented on a change in pull request #10742:
URL: https://github.com/apache/druid/pull/10742#discussion_r565775481



##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
##########
@@ -320,12 +320,12 @@ PartialRangeSegmentGenerateParallelIndexTaskRunner createPartialRangeSegmentGene
   )
   {
     return new PartialRangeSegmentGenerateParallelIndexTaskRunner(
-       toolbox,
-       getId(),
-       getGroupId(),
-       ingestionSchema,
-       getContext(),
-       intervalToPartitions
+        toolbox,

Review comment:
       Probably that was an automatic intelli-j change.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] loquisgon commented on a change in pull request #10742: Granularity interval materialization

Posted by GitBox <gi...@apache.org>.
loquisgon commented on a change in pull request #10742:
URL: https://github.com/apache/druid/pull/10742#discussion_r564941129



##########
File path: core/src/main/java/org/apache/druid/java/util/common/granularity/IntervalsByGranularity.java
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.common.granularity;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
+import com.google.common.collect.Sets;
+import org.apache.druid.java.util.common.guava.Comparators;
+import org.joda.time.Interval;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * Produce a stream of intervals generated by a given set of intervals as input and a given
+ * granularity. This class avoids materializing the granularity intervals whenever possible.
+ */
+public class IntervalsByGranularity
+{
+  private final List<Interval> sortedIntervals;
+  private final Granularity granularity;
+
+  public IntervalsByGranularity(Collection<Interval> intervals, Granularity granularity)
+  {
+    // eliminate dups & sort intervals:
+    HashSet<Interval> intervalSet = Sets.newHashSetWithExpectedSize(intervals.size());
+    intervalSet.addAll(intervals);
+    this.sortedIntervals = new ArrayList<>(intervals.size());
+    this.sortedIntervals.addAll(intervalSet);
+    this.sortedIntervals.sort(Comparators.intervalsByStartThenEnd());

Review comment:
       Great catch! I am now condensing the intervals in the constructor to eliminate overlaps... I tried to add more unit tests to catch corner cases since condensing can be tricky... we will review this closer together




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on pull request #10742: Granularity interval materialization

Posted by GitBox <gi...@apache.org>.
jihoonson commented on pull request #10742:
URL: https://github.com/apache/druid/pull/10742#issuecomment-767309426


   Forgot to mention one more thing. I think this PR won't fix all OOM errors in the Overlord and the next place where we should fix is likely `TaskLockbox` that manages all task locks per interval. However, I would say this PR is a good start to look into the OOM errors in the Overlord :slightly_smiling_face:


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] loquisgon commented on a change in pull request #10742: Granularity interval materialization

Posted by GitBox <gi...@apache.org>.
loquisgon commented on a change in pull request #10742:
URL: https://github.com/apache/druid/pull/10742#discussion_r564679000



##########
File path: core/src/main/java/org/apache/druid/java/util/common/granularity/IntervalsByGranularity.java
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.common.granularity;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
+import com.google.common.collect.Sets;
+import org.apache.druid.java.util.common.guava.Comparators;
+import org.joda.time.Interval;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * Produce a stream of intervals generated by a given set of intervals as input and a given
+ * granularity. This class avoids materializing the granularity intervals whenever possible.
+ */
+public class IntervalsByGranularity
+{
+  private final List<Interval> sortedIntervals;
+  private final Granularity granularity;
+
+  public IntervalsByGranularity(Collection<Interval> intervals, Granularity granularity)
+  {
+    // eliminate dups & sort intervals:
+    HashSet<Interval> intervalSet = Sets.newHashSetWithExpectedSize(intervals.size());
+    intervalSet.addAll(intervals);
+    this.sortedIntervals = new ArrayList<>(intervals.size());
+    this.sortedIntervals.addAll(intervalSet);
+    this.sortedIntervals.sort(Comparators.intervalsByStartThenEnd());
+
+    this.granularity = granularity;
+  }
+
+  public Iterator<Interval> granularityIntervalsIterator()
+  {
+    Iterator<Interval> ite;
+    if (sortedIntervals.isEmpty()) {
+      ite = Collections.emptyIterator();
+    } else {
+      ite = new IntervalIterator(sortedIntervals);
+    }
+    return ite;
+  }
+
+  private class IntervalIterator implements Iterator<Interval>
+  {
+    private final List<Interval> sortedIntervals;
+    private int currentInterval;
+    private volatile PeekingIterator<Interval> currentIterator;
+    private Interval previous;
+
+    public IntervalIterator(List<Interval> sortedIntervals)
+    {
+      this.sortedIntervals = sortedIntervals;
+      this.currentInterval = 0;
+      currentIterator =
+          Iterators.peekingIterator(granularity.getIterable(sortedIntervals.get(currentInterval)).iterator());
+    }
+
+    @Override
+    public boolean hasNext()
+    {
+      if (currentIterator != null) {
+        boolean advance = false;
+        while (true) {
+
+          // is it time to move to the next iterator?
+          if (advance) {
+            if (currentInterval < sortedIntervals.size() - 1) {
+              currentIterator =
+                  Iterators.peekingIterator(granularity.getIterable(sortedIntervals.get(++currentInterval)).iterator());
+              advance = false;
+            } else {
+              break;
+            }
+          }
+
+          // check if this iterator has more elements:
+          if (currentIterator.hasNext()) {
+
+            // drop all subsequent intervals that are the same as the previous...
+            while (previous != null && previous.equals(currentIterator.peek())) {
+              currentIterator.next(); // drop it like it's hot
+              if (!currentIterator.hasNext()) {
+                advance = true;
+                break;
+              }
+            }
+            if (advance) {
+              continue; // no more here, get to the next iterator...
+            }
+
+            return true; // there are more
+
+          } else {
+            advance = true; // no more
+          }
+
+        } // while true
+      }
+      return false;
+    }

Review comment:
       The issue was that test class in the wrong project... moved to core-druid and fixed it... now coverage is ok




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] loquisgon commented on a change in pull request #10742: Granularity interval materialization

Posted by GitBox <gi...@apache.org>.
loquisgon commented on a change in pull request #10742:
URL: https://github.com/apache/druid/pull/10742#discussion_r557576226



##########
File path: server/src/main/java/org/apache/druid/segment/indexing/LookupIntervalBuckets.java
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.indexing;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Iterators;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.guava.Comparators;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import java.util.Iterator;
+import java.util.TreeSet;
+
+public class LookupIntervalBuckets
+{
+  private final Iterable<Interval> intervalIterable;
+  private final TreeSet<Interval> intervals;
+
+  public LookupIntervalBuckets(Iterable<Interval> intervalIterable)

Review comment:
       I added this class to more cleanly re-use the TreeSet logic for bucket extraction among the `ArbitraryGranularitySpec` and the `UniformGranularitySpec`. The code in this PR goal is to avoid materialization of intervals in the Overlord only. We will deal with materialization issues elsewhere later. Thus when code outside the Overlord needs to get a bucket for a particuar `DateTime` then the code will actually materialize all the intervals and stores them in a fast lookup data structure (like `TreeSet`) to return the bucket. We could still iterate through all the intervals and find the bucket (since the intervals are sorted) but that would be linear in the number of the intervals for every call. This is not acceptable given the frequency of calling the method to find the bucket. We feel we can improve on this but we decided to divide the work and at least for now prevent the materialization of intervals in the overlord (see PR description also).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] loquisgon commented on a change in pull request #10742: Granularity interval materialization

Posted by GitBox <gi...@apache.org>.
loquisgon commented on a change in pull request #10742:
URL: https://github.com/apache/druid/pull/10742#discussion_r562029029



##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java
##########
@@ -106,7 +106,8 @@
     );
   }
 
-  @VisibleForTesting  // Only for testing
+  @VisibleForTesting
+    // Only for testing

Review comment:
       Ok... that comment has been there for a long time... the only reason it shows as my change is because of automatic formatting from Intelli-j. I will remove the message in next commit.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] loquisgon commented on a change in pull request #10742: Granularity interval materialization

Posted by GitBox <gi...@apache.org>.
loquisgon commented on a change in pull request #10742:
URL: https://github.com/apache/druid/pull/10742#discussion_r564978098



##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
##########
@@ -1062,7 +1064,7 @@ SegmentIdWithShardSpec allocateNewSegment(DateTime timestamp) throws IOException
       }
 
       interval = maybeInterval.get();
-      if (!bucketIntervals.get().contains(interval)) {
+      if (!Iterators.contains(bucketIntervals.iterator(), interval)) {

Review comment:
       I will materialize them then




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #10742: Granularity interval materialization

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #10742:
URL: https://github.com/apache/druid/pull/10742#discussion_r564614500



##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
##########
@@ -373,23 +373,25 @@ private boolean tryLockWithDetermineResult(TaskActionClient client, LockGranular
     }
   }
 
+
   protected boolean tryTimeChunkLock(TaskActionClient client, List<Interval> intervals) throws IOException
   {
-
-    // In this case, the intervals to lock must be aligned with segmentGranularity if it's defined
-    final Iterator<Interval> intervalIterator;
+    // The given intervals are first converted to align with segment granularity. This is because,
+    // when an overwriting task finds a version for a given input row, it expects the interval
+    // associated to each version to be equal or larger than the time bucket where the input row falls in.
+    // See ParallelIndexSupervisorTask.findVersion().
+    final Set<Interval> uniqueCondensedIntervals = new HashSet<>();
     final Granularity segmentGranularity = getSegmentGranularity();
     if (segmentGranularity == null) {
-      Set<Interval> uniqueIntervals = new HashSet<>(intervals);
-      ArrayList<Interval> condensedIntervals = JodaUtils.condenseIntervals(() -> uniqueIntervals.iterator());
-      intervalIterator = condensedIntervals.iterator();
+      uniqueCondensedIntervals.addAll(JodaUtils.condenseIntervals(intervals));
     } else {
       IntervalsByGranularity intervalsByGranularity = new IntervalsByGranularity(intervals, segmentGranularity);
-      intervalIterator = intervalsByGranularity.granularityIntervalsIterator();
+      // the following is calling a condense that does not materialize the intervals:
+      uniqueCondensedIntervals.addAll(JodaUtils.condenseIntervals(intervalsByGranularity.granularityIntervalsIterator()));

Review comment:
       Hmm, on the second thought, I think the current code is good enough. However, please add some comment explaining why it is OK to materialize intervals here. I think your comment above would be nice. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #10742: Granularity interval materialization

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #10742:
URL: https://github.com/apache/druid/pull/10742#discussion_r561439141



##########
File path: server/src/test/java/org/apache/druid/segment/indexing/granularity/IntervalsByGranularityTest.java
##########
@@ -76,6 +79,21 @@ public void testDups()
     Assert.assertTrue(count == 61);
   }
 
+
+  @Test
+  public void testCondenseDoesNotMaterialize()

Review comment:
       how does this test that condense does not materialize?

##########
File path: core/src/main/java/org/apache/druid/java/util/common/JodaUtils.java
##########
@@ -50,15 +49,25 @@
         sortedIntervals.add(interval);
       }
     }
+    return condenseIntervals(sortedIntervals.iterator());
+  }
 
-    if (sortedIntervals.isEmpty()) {
-      return new ArrayList<>();
+  /**
+   * Caller needs to insure that sortedIntervals is sorted in ascending order

Review comment:
       Can you add comment that this function does not materialize the input intervals and how it does that.

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java
##########
@@ -106,7 +106,8 @@
     );
   }
 
-  @VisibleForTesting  // Only for testing
+  @VisibleForTesting
+    // Only for testing

Review comment:
       Comment not needed as the annotation is saying the same thing

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
##########
@@ -373,23 +373,25 @@ private boolean tryLockWithDetermineResult(TaskActionClient client, LockGranular
     }
   }
 
+
   protected boolean tryTimeChunkLock(TaskActionClient client, List<Interval> intervals) throws IOException
   {
-
-    // In this case, the intervals to lock must be aligned with segmentGranularity if it's defined
-    final Iterator<Interval> intervalIterator;
+    // The given intervals are first converted to align with segment granularity. This is because,
+    // when an overwriting task finds a version for a given input row, it expects the interval
+    // associated to each version to be equal or larger than the time bucket where the input row falls in.
+    // See ParallelIndexSupervisorTask.findVersion().
+    final Set<Interval> uniqueCondensedIntervals = new HashSet<>();
     final Granularity segmentGranularity = getSegmentGranularity();
     if (segmentGranularity == null) {
-      Set<Interval> uniqueIntervals = new HashSet<>(intervals);
-      ArrayList<Interval> condensedIntervals = JodaUtils.condenseIntervals(() -> uniqueIntervals.iterator());
-      intervalIterator = condensedIntervals.iterator();
+      uniqueCondensedIntervals.addAll(JodaUtils.condenseIntervals(intervals));
     } else {
       IntervalsByGranularity intervalsByGranularity = new IntervalsByGranularity(intervals, segmentGranularity);
-      intervalIterator = intervalsByGranularity.granularityIntervalsIterator();
+      // the following is calling a condense that does not materialize the intervals:
+      uniqueCondensedIntervals.addAll(JodaUtils.condenseIntervals(intervalsByGranularity.granularityIntervalsIterator()));

Review comment:
       doesn't addAll materialize all the intervals? Is materializing the condensed intervals fine as long as we are not materializing the original intervals?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #10742: Granularity interval materialization

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #10742:
URL: https://github.com/apache/druid/pull/10742#discussion_r579592454



##########
File path: core/src/main/java/org/apache/druid/java/util/common/granularity/IntervalsByGranularity.java
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.common.granularity;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
+import com.google.common.collect.Sets;
+import org.apache.druid.java.util.common.guava.Comparators;
+import org.joda.time.Interval;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * Produce a stream of intervals generated by a given set of intervals as input and a given
+ * granularity. This class avoids materializing the granularity intervals whenever possible.
+ */
+public class IntervalsByGranularity
+{
+  private final List<Interval> sortedIntervals;
+  private final Granularity granularity;
+
+  public IntervalsByGranularity(Collection<Interval> intervals, Granularity granularity)
+  {
+    // eliminate dups & sort intervals:
+    HashSet<Interval> intervalSet = Sets.newHashSetWithExpectedSize(intervals.size());
+    intervalSet.addAll(intervals);
+    this.sortedIntervals = new ArrayList<>(intervals.size());
+    this.sortedIntervals.addAll(intervalSet);
+    this.sortedIntervals.sort(Comparators.intervalsByStartThenEnd());

Review comment:
       @jihoonson @loquisgon How can we be sure that intervals don't overlap each?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] loquisgon commented on a change in pull request #10742: Granularity interval materialization

Posted by GitBox <gi...@apache.org>.
loquisgon commented on a change in pull request #10742:
URL: https://github.com/apache/druid/pull/10742#discussion_r564882937



##########
File path: core/src/main/java/org/apache/druid/java/util/common/JodaUtils.java
##########
@@ -50,15 +58,31 @@
         sortedIntervals.add(interval);
       }
     }
+    return condenseIntervals(sortedIntervals.iterator());
+  }
 
-    if (sortedIntervals.isEmpty()) {
-      return new ArrayList<>();
+  /**
+   * This method does not materialize the intervals represented by the
+   * sortedIntervals iterator. However, caller needs to insure that sortedIntervals
+   * is already sorted in ascending order.

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] loquisgon commented on a change in pull request #10742: Granularity interval materialization

Posted by GitBox <gi...@apache.org>.
loquisgon commented on a change in pull request #10742:
URL: https://github.com/apache/druid/pull/10742#discussion_r565774749



##########
File path: server/src/main/java/org/apache/druid/segment/indexing/granularity/GranularitySpec.java
##########
@@ -41,11 +40,11 @@
 public interface GranularitySpec
 {
   /**
-   * Set of all time groups, broken up on segment boundaries. Should be sorted by interval start and non-overlapping.
+   * Iterable all time groups, broken up on segment boundaries. Should be sorted by interval start and non-overlapping.
    *
-   * @return set of all time groups
+   * @return Iterable of all time groups
    */
-  Optional<SortedSet<Interval>> bucketIntervals();
+  Iterable<Interval> bucketIntervals();

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] suneet-s merged pull request #10742: Granularity interval materialization

Posted by GitBox <gi...@apache.org>.
suneet-s merged pull request #10742:
URL: https://github.com/apache/druid/pull/10742


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] loquisgon commented on a change in pull request #10742: Granularity interval materialization

Posted by GitBox <gi...@apache.org>.
loquisgon commented on a change in pull request #10742:
URL: https://github.com/apache/druid/pull/10742#discussion_r562027501



##########
File path: server/src/test/java/org/apache/druid/segment/indexing/granularity/IntervalsByGranularityTest.java
##########
@@ -76,6 +79,21 @@ public void testDups()
     Assert.assertTrue(count == 61);
   }
 
+
+  @Test
+  public void testCondenseDoesNotMaterialize()

Review comment:
       It only does indirectly, if it materialized it would create 1 year worth of seconds amount of intervals (over 31 million). If it materialized I reasoned that it would slow down the test or something. I can remove it or rename it. Let me know if you think of a better way to test that. I don't want to measure memory before and after using the `Runtime.getRuntime()` methods because they are not very precise either.

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java
##########
@@ -106,7 +106,8 @@
     );
   }
 
-  @VisibleForTesting  // Only for testing
+  @VisibleForTesting
+    // Only for testing

Review comment:
       Ok... that comment has been there for a long time... the only reason it shows as my change is because of automatic formatting from Intelli-j. I will remove the message in next commit.

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
##########
@@ -373,23 +373,25 @@ private boolean tryLockWithDetermineResult(TaskActionClient client, LockGranular
     }
   }
 
+
   protected boolean tryTimeChunkLock(TaskActionClient client, List<Interval> intervals) throws IOException
   {
-
-    // In this case, the intervals to lock must be aligned with segmentGranularity if it's defined
-    final Iterator<Interval> intervalIterator;
+    // The given intervals are first converted to align with segment granularity. This is because,
+    // when an overwriting task finds a version for a given input row, it expects the interval
+    // associated to each version to be equal or larger than the time bucket where the input row falls in.
+    // See ParallelIndexSupervisorTask.findVersion().
+    final Set<Interval> uniqueCondensedIntervals = new HashSet<>();
     final Granularity segmentGranularity = getSegmentGranularity();
     if (segmentGranularity == null) {
-      Set<Interval> uniqueIntervals = new HashSet<>(intervals);
-      ArrayList<Interval> condensedIntervals = JodaUtils.condenseIntervals(() -> uniqueIntervals.iterator());
-      intervalIterator = condensedIntervals.iterator();
+      uniqueCondensedIntervals.addAll(JodaUtils.condenseIntervals(intervals));
     } else {
       IntervalsByGranularity intervalsByGranularity = new IntervalsByGranularity(intervals, segmentGranularity);
-      intervalIterator = intervalsByGranularity.granularityIntervalsIterator();
+      // the following is calling a condense that does not materialize the intervals:
+      uniqueCondensedIntervals.addAll(JodaUtils.condenseIntervals(intervalsByGranularity.granularityIntervalsIterator()));

Review comment:
       Yes, `addAll` is using the materialized intervals being returned by the `condenseIntervals` call. The assumption here is that the condensed intervals are much fewer than the bucket intervals. I think this assumption will hold in the majority of real cases. I think that in a rare corner case where the original intervals cannot be condensed then we still would have issues, but not in the majority of production cases.

##########
File path: core/src/main/java/org/apache/druid/java/util/common/JodaUtils.java
##########
@@ -50,15 +49,25 @@
         sortedIntervals.add(interval);
       }
     }
+    return condenseIntervals(sortedIntervals.iterator());
+  }
 
-    if (sortedIntervals.isEmpty()) {
-      return new ArrayList<>();
+  /**
+   * Caller needs to insure that sortedIntervals is sorted in ascending order

Review comment:
       Fixed in next commit.

##########
File path: server/src/test/java/org/apache/druid/segment/indexing/granularity/IntervalsByGranularityTest.java
##########
@@ -76,6 +79,21 @@ public void testDups()
     Assert.assertTrue(count == 61);
   }
 
+
+  @Test
+  public void testCondenseDoesNotMaterialize()

Review comment:
       I changed the name of the method and added a comment inside the method to indicate intention..




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] loquisgon commented on a change in pull request #10742: Granularity interval materialization

Posted by GitBox <gi...@apache.org>.
loquisgon commented on a change in pull request #10742:
URL: https://github.com/apache/druid/pull/10742#discussion_r562044660



##########
File path: core/src/main/java/org/apache/druid/java/util/common/JodaUtils.java
##########
@@ -50,15 +49,25 @@
         sortedIntervals.add(interval);
       }
     }
+    return condenseIntervals(sortedIntervals.iterator());
+  }
 
-    if (sortedIntervals.isEmpty()) {
-      return new ArrayList<>();
+  /**
+   * Caller needs to insure that sortedIntervals is sorted in ascending order

Review comment:
       Fixed in next commit.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] loquisgon commented on a change in pull request #10742: Granularity interval materialization

Posted by GitBox <gi...@apache.org>.
loquisgon commented on a change in pull request #10742:
URL: https://github.com/apache/druid/pull/10742#discussion_r564977372



##########
File path: core/src/main/java/org/apache/druid/java/util/common/granularity/IntervalsByGranularity.java
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.common.granularity;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
+import com.google.common.collect.Sets;
+import org.apache.druid.java.util.common.guava.Comparators;
+import org.joda.time.Interval;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * Produce a stream of intervals generated by a given set of intervals as input and a given
+ * granularity. This class avoids materializing the granularity intervals whenever possible.
+ */
+public class IntervalsByGranularity
+{
+  private final List<Interval> sortedIntervals;
+  private final Granularity granularity;
+
+  public IntervalsByGranularity(Collection<Interval> intervals, Granularity granularity)
+  {
+    // eliminate dups & sort intervals:
+    HashSet<Interval> intervalSet = Sets.newHashSetWithExpectedSize(intervals.size());
+    intervalSet.addAll(intervals);
+    this.sortedIntervals = new ArrayList<>(intervals.size());
+    this.sortedIntervals.addAll(intervalSet);
+    this.sortedIntervals.sort(Comparators.intervalsByStartThenEnd());
+
+    this.granularity = granularity;
+  }
+
+  public Iterator<Interval> granularityIntervalsIterator()
+  {
+    Iterator<Interval> ite;
+    if (sortedIntervals.isEmpty()) {
+      ite = Collections.emptyIterator();
+    } else {
+      ite = new IntervalIterator(sortedIntervals);

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] loquisgon commented on a change in pull request #10742: Granularity interval materialization

Posted by GitBox <gi...@apache.org>.
loquisgon commented on a change in pull request #10742:
URL: https://github.com/apache/druid/pull/10742#discussion_r565763900



##########
File path: indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJobTest.java
##########
@@ -99,7 +98,7 @@ public void testRunWithSingleDimensionPartitionsSpecCreateHashBasedNumberedShard
     final HadoopDruidIndexerConfig config = Mockito.mock(HadoopDruidIndexerConfig.class);
     Mockito.when(config.isDeterminingPartitions()).thenReturn(false);
     Mockito.when(config.getPartitionsSpec()).thenReturn(partitionsSpec);
-    Mockito.when(config.getSegmentGranularIntervals()).thenReturn(Optional.of(intervals));
+    Mockito.when(config.getSegmentGranularIntervals()).thenReturn(() -> intervals.iterator());

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] loquisgon commented on a change in pull request #10742: Granularity interval materialization

Posted by GitBox <gi...@apache.org>.
loquisgon commented on a change in pull request #10742:
URL: https://github.com/apache/druid/pull/10742#discussion_r564940783



##########
File path: core/src/main/java/org/apache/druid/java/util/common/granularity/IntervalsByGranularity.java
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.common.granularity;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
+import com.google.common.collect.Sets;
+import org.apache.druid.java.util.common.guava.Comparators;
+import org.joda.time.Interval;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * Produce a stream of intervals generated by a given set of intervals as input and a given
+ * granularity. This class avoids materializing the granularity intervals whenever possible.
+ */
+public class IntervalsByGranularity
+{
+  private final List<Interval> sortedIntervals;
+  private final Granularity granularity;
+
+  public IntervalsByGranularity(Collection<Interval> intervals, Granularity granularity)
+  {
+    // eliminate dups & sort intervals:
+    HashSet<Interval> intervalSet = Sets.newHashSetWithExpectedSize(intervals.size());
+    intervalSet.addAll(intervals);
+    this.sortedIntervals = new ArrayList<>(intervals.size());
+    this.sortedIntervals.addAll(intervalSet);
+    this.sortedIntervals.sort(Comparators.intervalsByStartThenEnd());
+
+    this.granularity = granularity;
+  }
+
+  public Iterator<Interval> granularityIntervalsIterator()
+  {
+    Iterator<Interval> ite;
+    if (sortedIntervals.isEmpty()) {
+      ite = Collections.emptyIterator();
+    } else {
+      ite = new IntervalIterator(sortedIntervals);
+    }
+    return ite;
+  }
+
+  private class IntervalIterator implements Iterator<Interval>
+  {
+    private final List<Interval> sortedIntervals;
+    private int currentInterval;
+    private volatile PeekingIterator<Interval> currentIterator;
+    private Interval previous;
+
+    public IntervalIterator(List<Interval> sortedIntervals)
+    {
+      this.sortedIntervals = sortedIntervals;
+      this.currentInterval = 0;
+      currentIterator =
+          Iterators.peekingIterator(granularity.getIterable(sortedIntervals.get(currentInterval)).iterator());
+    }
+
+    @Override
+    public boolean hasNext()
+    {
+      if (currentIterator != null) {
+        boolean advance = false;
+        while (true) {
+
+          // is it time to move to the next iterator?
+          if (advance) {
+            if (currentInterval < sortedIntervals.size() - 1) {
+              currentIterator =
+                  Iterators.peekingIterator(granularity.getIterable(sortedIntervals.get(++currentInterval)).iterator());
+              advance = false;
+            } else {
+              break;
+            }
+          }
+
+          // check if this iterator has more elements:
+          if (currentIterator.hasNext()) {
+
+            // drop all subsequent intervals that are the same as the previous...
+            while (previous != null && previous.equals(currentIterator.peek())) {

Review comment:
       I put comments in the code to illustrate when this can happen (also I have unit tests to catch this)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] loquisgon commented on a change in pull request #10742: Granularity interval materialization

Posted by GitBox <gi...@apache.org>.
loquisgon commented on a change in pull request #10742:
URL: https://github.com/apache/druid/pull/10742#discussion_r565762407



##########
File path: indexing-hadoop/src/main/java/org/apache/druid/indexer/DetermineHashedPartitionsJob.java
##########
@@ -109,10 +109,10 @@ public boolean run()
       groupByJob.setOutputValueClass(NullWritable.class);
       groupByJob.setOutputFormatClass(SequenceFileOutputFormat.class);
       groupByJob.setPartitionerClass(DetermineHashedPartitionsPartitioner.class);
-      if (!config.getSegmentGranularIntervals().isPresent()) {
+      if (!config.getSegmentGranularIntervals().iterator().hasNext()) {

Review comment:
       Rather than adding a new method I decided to use `GranularitySpec::inputIntervals().isEmpty()`. Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] loquisgon commented on a change in pull request #10742: Granularity interval materialization

Posted by GitBox <gi...@apache.org>.
loquisgon commented on a change in pull request #10742:
URL: https://github.com/apache/druid/pull/10742#discussion_r565773059



##########
File path: server/src/main/java/org/apache/druid/segment/indexing/LookupIntervalBuckets.java
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.indexing;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Iterators;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.guava.Comparators;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import java.util.Iterator;
+import java.util.TreeSet;
+
+public class LookupIntervalBuckets

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] loquisgon commented on a change in pull request #10742: Granularity interval materialization

Posted by GitBox <gi...@apache.org>.
loquisgon commented on a change in pull request #10742:
URL: https://github.com/apache/druid/pull/10742#discussion_r557784750



##########
File path: indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerConfig.java
##########
@@ -314,9 +313,9 @@ public void setShardSpecs(Map<Long, List<HadoopyShardSpec>> shardSpecs)
 
   public Optional<List<Interval>> getIntervals()
   {
-    Optional<SortedSet<Interval>> setOptional = schema.getDataSchema().getGranularitySpec().bucketIntervals();
-    if (setOptional.isPresent()) {
-      return Optional.of(JodaUtils.condenseIntervals(setOptional.get()));
+    Iterable<Interval> bucketIntervals = schema.getDataSchema().getGranularitySpec().bucketIntervals();
+    if (bucketIntervals.iterator().hasNext()) {
+      return Optional.of(JodaUtils.condenseIntervals(schema.getDataSchema().getGranularitySpec().bucketIntervals()));

Review comment:
       Fixed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] suneet-s commented on a change in pull request #10742: Granularity interval materialization

Posted by GitBox <gi...@apache.org>.
suneet-s commented on a change in pull request #10742:
URL: https://github.com/apache/druid/pull/10742#discussion_r562319912



##########
File path: core/src/main/java/org/apache/druid/java/util/common/granularity/IntervalsByGranularity.java
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.common.granularity;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
+import com.google.common.collect.Sets;
+import org.apache.druid.java.util.common.guava.Comparators;
+import org.joda.time.Interval;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * Produce a stream of intervals generated by a given set of intervals as input and a given
+ * granularity. This class avoids materializing the granularity intervals whenever possible.
+ */
+public class IntervalsByGranularity
+{
+  private final List<Interval> sortedIntervals;
+  private final Granularity granularity;
+
+  public IntervalsByGranularity(Collection<Interval> intervals, Granularity granularity)
+  {
+    // eliminate dups & sort intervals:
+    HashSet<Interval> intervalSet = Sets.newHashSetWithExpectedSize(intervals.size());
+    intervalSet.addAll(intervals);
+    this.sortedIntervals = new ArrayList<>(intervals.size());
+    this.sortedIntervals.addAll(intervalSet);
+    this.sortedIntervals.sort(Comparators.intervalsByStartThenEnd());
+
+    this.granularity = granularity;
+  }
+
+  public Iterator<Interval> granularityIntervalsIterator()
+  {
+    Iterator<Interval> ite;
+    if (sortedIntervals.isEmpty()) {
+      ite = Collections.emptyIterator();
+    } else {
+      ite = new IntervalIterator(sortedIntervals);
+    }
+    return ite;
+  }
+
+  private class IntervalIterator implements Iterator<Interval>
+  {
+    private final List<Interval> sortedIntervals;
+    private int currentInterval;
+    private volatile PeekingIterator<Interval> currentIterator;
+    private Interval previous;
+
+    public IntervalIterator(List<Interval> sortedIntervals)
+    {
+      this.sortedIntervals = sortedIntervals;
+      this.currentInterval = 0;
+      currentIterator =
+          Iterators.peekingIterator(granularity.getIterable(sortedIntervals.get(currentInterval)).iterator());
+    }
+
+    @Override
+    public boolean hasNext()
+    {
+      if (currentIterator != null) {
+        boolean advance = false;
+        while (true) {
+
+          // is it time to move to the next iterator?
+          if (advance) {
+            if (currentInterval < sortedIntervals.size() - 1) {
+              currentIterator =
+                  Iterators.peekingIterator(granularity.getIterable(sortedIntervals.get(++currentInterval)).iterator());
+              advance = false;
+            } else {
+              break;
+            }
+          }
+
+          // check if this iterator has more elements:
+          if (currentIterator.hasNext()) {
+
+            // drop all subsequent intervals that are the same as the previous...
+            while (previous != null && previous.equals(currentIterator.peek())) {
+              currentIterator.next(); // drop it like it's hot
+              if (!currentIterator.hasNext()) {
+                advance = true;
+                break;
+              }
+            }
+            if (advance) {
+              continue; // no more here, get to the next iterator...
+            }
+
+            return true; // there are more
+
+          } else {
+            advance = true; // no more
+          }
+
+        } // while true
+      }
+      return false;
+    }

Review comment:
       Looks like CI is complaining about a lack of test coverage for this class - how is this being tested?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] suneet-s commented on a change in pull request #10742: Granularity interval materialization

Posted by GitBox <gi...@apache.org>.
suneet-s commented on a change in pull request #10742:
URL: https://github.com/apache/druid/pull/10742#discussion_r562319912



##########
File path: core/src/main/java/org/apache/druid/java/util/common/granularity/IntervalsByGranularity.java
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.common.granularity;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
+import com.google.common.collect.Sets;
+import org.apache.druid.java.util.common.guava.Comparators;
+import org.joda.time.Interval;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * Produce a stream of intervals generated by a given set of intervals as input and a given
+ * granularity. This class avoids materializing the granularity intervals whenever possible.
+ */
+public class IntervalsByGranularity
+{
+  private final List<Interval> sortedIntervals;
+  private final Granularity granularity;
+
+  public IntervalsByGranularity(Collection<Interval> intervals, Granularity granularity)
+  {
+    // eliminate dups & sort intervals:
+    HashSet<Interval> intervalSet = Sets.newHashSetWithExpectedSize(intervals.size());
+    intervalSet.addAll(intervals);
+    this.sortedIntervals = new ArrayList<>(intervals.size());
+    this.sortedIntervals.addAll(intervalSet);
+    this.sortedIntervals.sort(Comparators.intervalsByStartThenEnd());
+
+    this.granularity = granularity;
+  }
+
+  public Iterator<Interval> granularityIntervalsIterator()
+  {
+    Iterator<Interval> ite;
+    if (sortedIntervals.isEmpty()) {
+      ite = Collections.emptyIterator();
+    } else {
+      ite = new IntervalIterator(sortedIntervals);
+    }
+    return ite;
+  }
+
+  private class IntervalIterator implements Iterator<Interval>
+  {
+    private final List<Interval> sortedIntervals;
+    private int currentInterval;
+    private volatile PeekingIterator<Interval> currentIterator;
+    private Interval previous;
+
+    public IntervalIterator(List<Interval> sortedIntervals)
+    {
+      this.sortedIntervals = sortedIntervals;
+      this.currentInterval = 0;
+      currentIterator =
+          Iterators.peekingIterator(granularity.getIterable(sortedIntervals.get(currentInterval)).iterator());
+    }
+
+    @Override
+    public boolean hasNext()
+    {
+      if (currentIterator != null) {
+        boolean advance = false;
+        while (true) {
+
+          // is it time to move to the next iterator?
+          if (advance) {
+            if (currentInterval < sortedIntervals.size() - 1) {
+              currentIterator =
+                  Iterators.peekingIterator(granularity.getIterable(sortedIntervals.get(++currentInterval)).iterator());
+              advance = false;
+            } else {
+              break;
+            }
+          }
+
+          // check if this iterator has more elements:
+          if (currentIterator.hasNext()) {
+
+            // drop all subsequent intervals that are the same as the previous...
+            while (previous != null && previous.equals(currentIterator.peek())) {
+              currentIterator.next(); // drop it like it's hot
+              if (!currentIterator.hasNext()) {
+                advance = true;
+                break;
+              }
+            }
+            if (advance) {
+              continue; // no more here, get to the next iterator...
+            }
+
+            return true; // there are more
+
+          } else {
+            advance = true; // no more
+          }
+
+        } // while true
+      }
+      return false;
+    }

Review comment:
       Looks like CI is complaining about a lack of test coverage for this class - how is this being tested?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #10742: Granularity interval materialization

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #10742:
URL: https://github.com/apache/druid/pull/10742#discussion_r561439141



##########
File path: server/src/test/java/org/apache/druid/segment/indexing/granularity/IntervalsByGranularityTest.java
##########
@@ -76,6 +79,21 @@ public void testDups()
     Assert.assertTrue(count == 61);
   }
 
+
+  @Test
+  public void testCondenseDoesNotMaterialize()

Review comment:
       how does this test that condense does not materialize?

##########
File path: core/src/main/java/org/apache/druid/java/util/common/JodaUtils.java
##########
@@ -50,15 +49,25 @@
         sortedIntervals.add(interval);
       }
     }
+    return condenseIntervals(sortedIntervals.iterator());
+  }
 
-    if (sortedIntervals.isEmpty()) {
-      return new ArrayList<>();
+  /**
+   * Caller needs to insure that sortedIntervals is sorted in ascending order

Review comment:
       Can you add comment that this function does not materialize the input intervals and how it does that.

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java
##########
@@ -106,7 +106,8 @@
     );
   }
 
-  @VisibleForTesting  // Only for testing
+  @VisibleForTesting
+    // Only for testing

Review comment:
       Comment not needed as the annotation is saying the same thing

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
##########
@@ -373,23 +373,25 @@ private boolean tryLockWithDetermineResult(TaskActionClient client, LockGranular
     }
   }
 
+
   protected boolean tryTimeChunkLock(TaskActionClient client, List<Interval> intervals) throws IOException
   {
-
-    // In this case, the intervals to lock must be aligned with segmentGranularity if it's defined
-    final Iterator<Interval> intervalIterator;
+    // The given intervals are first converted to align with segment granularity. This is because,
+    // when an overwriting task finds a version for a given input row, it expects the interval
+    // associated to each version to be equal or larger than the time bucket where the input row falls in.
+    // See ParallelIndexSupervisorTask.findVersion().
+    final Set<Interval> uniqueCondensedIntervals = new HashSet<>();
     final Granularity segmentGranularity = getSegmentGranularity();
     if (segmentGranularity == null) {
-      Set<Interval> uniqueIntervals = new HashSet<>(intervals);
-      ArrayList<Interval> condensedIntervals = JodaUtils.condenseIntervals(() -> uniqueIntervals.iterator());
-      intervalIterator = condensedIntervals.iterator();
+      uniqueCondensedIntervals.addAll(JodaUtils.condenseIntervals(intervals));
     } else {
       IntervalsByGranularity intervalsByGranularity = new IntervalsByGranularity(intervals, segmentGranularity);
-      intervalIterator = intervalsByGranularity.granularityIntervalsIterator();
+      // the following is calling a condense that does not materialize the intervals:
+      uniqueCondensedIntervals.addAll(JodaUtils.condenseIntervals(intervalsByGranularity.granularityIntervalsIterator()));

Review comment:
       doesn't addAll materialize all the intervals? Is materializing the condensed intervals fine as long as we are not materializing the original intervals?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] loquisgon commented on a change in pull request #10742: Granularity interval materialization

Posted by GitBox <gi...@apache.org>.
loquisgon commented on a change in pull request #10742:
URL: https://github.com/apache/druid/pull/10742#discussion_r562047065



##########
File path: server/src/test/java/org/apache/druid/segment/indexing/granularity/IntervalsByGranularityTest.java
##########
@@ -76,6 +79,21 @@ public void testDups()
     Assert.assertTrue(count == 61);
   }
 
+
+  @Test
+  public void testCondenseDoesNotMaterialize()

Review comment:
       I changed the name of the method and added a comment inside the method to indicate intention..




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] loquisgon commented on a change in pull request #10742: Granularity interval materialization

Posted by GitBox <gi...@apache.org>.
loquisgon commented on a change in pull request #10742:
URL: https://github.com/apache/druid/pull/10742#discussion_r557783333



##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
##########
@@ -398,22 +375,21 @@ private boolean tryLockWithDetermineResult(TaskActionClient client, LockGranular
 
   protected boolean tryTimeChunkLock(TaskActionClient client, List<Interval> intervals) throws IOException
   {
-    // The given intervals are first converted to align with segment granularity. This is because,
-    // when an overwriting task finds a version for a given input row, it expects the interval
-    // associated to each version to be equal or larger than the time bucket where the input row falls in.
-    // See ParallelIndexSupervisorTask.findVersion().
-    final Set<Interval> uniqueIntervals = new HashSet<>();
+
+    // In this case, the intervals to lock must be aligned with segmentGranularity if it's defined
+    final Iterator<Interval> intervalIterator;
     final Granularity segmentGranularity = getSegmentGranularity();
-    for (Interval interval : intervals) {
-      if (segmentGranularity == null) {
-        uniqueIntervals.add(interval);
-      } else {
-        Iterables.addAll(uniqueIntervals, segmentGranularity.getIterable(interval));
-      }
+    if (segmentGranularity == null) {
+      Set<Interval> uniqueIntervals = new HashSet<>(intervals);
+      ArrayList<Interval> condensedIntervals = JodaUtils.condenseIntervals(() -> uniqueIntervals.iterator());
+      intervalIterator = condensedIntervals.iterator();
+    } else {
+      IntervalsByGranularity intervalsByGranularity = new IntervalsByGranularity(intervals, segmentGranularity);

Review comment:
       I will fix this in the next commit




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] loquisgon commented on a change in pull request #10742: Granularity interval materialization

Posted by GitBox <gi...@apache.org>.
loquisgon commented on a change in pull request #10742:
URL: https://github.com/apache/druid/pull/10742#discussion_r555931298



##########
File path: server/src/main/java/org/apache/druid/segment/indexing/granularity/UniformGranularitySpec.java
##########
@@ -57,18 +60,17 @@ public UniformGranularitySpec(
     this.segmentGranularity = segmentGranularity == null ? DEFAULT_SEGMENT_GRANULARITY : segmentGranularity;
 
     if (inputIntervals != null) {
-      List<Interval> granularIntervals = new ArrayList<>();
-      for (Interval inputInterval : inputIntervals) {
-        Iterables.addAll(granularIntervals, this.segmentGranularity.getIterable(inputInterval));
-      }
-      this.inputIntervals = ImmutableList.copyOf(inputIntervals);
-      this.wrappedSpec = new ArbitraryGranularitySpec(queryGranularity, rollup, granularIntervals);
+      // sort them

Review comment:
       Contract indicates that inputIntervals must be left "as is".... will remove sorting in next commit




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #10742: Granularity interval materialization

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #10742:
URL: https://github.com/apache/druid/pull/10742#discussion_r557248198



##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
##########
@@ -398,22 +375,21 @@ private boolean tryLockWithDetermineResult(TaskActionClient client, LockGranular
 
   protected boolean tryTimeChunkLock(TaskActionClient client, List<Interval> intervals) throws IOException
   {
-    // The given intervals are first converted to align with segment granularity. This is because,
-    // when an overwriting task finds a version for a given input row, it expects the interval
-    // associated to each version to be equal or larger than the time bucket where the input row falls in.
-    // See ParallelIndexSupervisorTask.findVersion().
-    final Set<Interval> uniqueIntervals = new HashSet<>();
+
+    // In this case, the intervals to lock must be aligned with segmentGranularity if it's defined
+    final Iterator<Interval> intervalIterator;
     final Granularity segmentGranularity = getSegmentGranularity();
-    for (Interval interval : intervals) {
-      if (segmentGranularity == null) {
-        uniqueIntervals.add(interval);
-      } else {
-        Iterables.addAll(uniqueIntervals, segmentGranularity.getIterable(interval));
-      }
+    if (segmentGranularity == null) {
+      Set<Interval> uniqueIntervals = new HashSet<>(intervals);
+      ArrayList<Interval> condensedIntervals = JodaUtils.condenseIntervals(() -> uniqueIntervals.iterator());
+      intervalIterator = condensedIntervals.iterator();
+    } else {
+      IntervalsByGranularity intervalsByGranularity = new IntervalsByGranularity(intervals, segmentGranularity);

Review comment:
       Are we no longer condensing the intervals if segmentGranularity != null?

##########
File path: indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerConfig.java
##########
@@ -314,9 +313,9 @@ public void setShardSpecs(Map<Long, List<HadoopyShardSpec>> shardSpecs)
 
   public Optional<List<Interval>> getIntervals()
   {
-    Optional<SortedSet<Interval>> setOptional = schema.getDataSchema().getGranularitySpec().bucketIntervals();
-    if (setOptional.isPresent()) {
-      return Optional.of(JodaUtils.condenseIntervals(setOptional.get()));
+    Iterable<Interval> bucketIntervals = schema.getDataSchema().getGranularitySpec().bucketIntervals();
+    if (bucketIntervals.iterator().hasNext()) {
+      return Optional.of(JodaUtils.condenseIntervals(schema.getDataSchema().getGranularitySpec().bucketIntervals()));

Review comment:
       nit: can reuse bucketIntervals variable here

##########
File path: server/src/main/java/org/apache/druid/segment/indexing/LookupIntervalBuckets.java
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.indexing;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Iterators;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.guava.Comparators;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import java.util.Iterator;
+import java.util.TreeSet;
+
+public class LookupIntervalBuckets
+{
+  private final Iterable<Interval> intervalIterable;
+  private final TreeSet<Interval> intervals;
+
+  public LookupIntervalBuckets(Iterable<Interval> intervalIterable)

Review comment:
       Is this class basically something like a lazy load of the TreeSet?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] loquisgon commented on a change in pull request #10742: Granularity interval materialization

Posted by GitBox <gi...@apache.org>.
loquisgon commented on a change in pull request #10742:
URL: https://github.com/apache/druid/pull/10742#discussion_r564080150



##########
File path: core/src/main/java/org/apache/druid/java/util/common/granularity/IntervalsByGranularity.java
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.common.granularity;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
+import com.google.common.collect.Sets;
+import org.apache.druid.java.util.common.guava.Comparators;
+import org.joda.time.Interval;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * Produce a stream of intervals generated by a given set of intervals as input and a given
+ * granularity. This class avoids materializing the granularity intervals whenever possible.
+ */
+public class IntervalsByGranularity
+{
+  private final List<Interval> sortedIntervals;
+  private final Granularity granularity;
+
+  public IntervalsByGranularity(Collection<Interval> intervals, Granularity granularity)
+  {
+    // eliminate dups & sort intervals:
+    HashSet<Interval> intervalSet = Sets.newHashSetWithExpectedSize(intervals.size());
+    intervalSet.addAll(intervals);
+    this.sortedIntervals = new ArrayList<>(intervals.size());
+    this.sortedIntervals.addAll(intervalSet);
+    this.sortedIntervals.sort(Comparators.intervalsByStartThenEnd());
+
+    this.granularity = granularity;
+  }
+
+  public Iterator<Interval> granularityIntervalsIterator()
+  {
+    Iterator<Interval> ite;
+    if (sortedIntervals.isEmpty()) {
+      ite = Collections.emptyIterator();
+    } else {
+      ite = new IntervalIterator(sortedIntervals);
+    }
+    return ite;
+  }
+
+  private class IntervalIterator implements Iterator<Interval>
+  {
+    private final List<Interval> sortedIntervals;
+    private int currentInterval;
+    private volatile PeekingIterator<Interval> currentIterator;
+    private Interval previous;
+
+    public IntervalIterator(List<Interval> sortedIntervals)
+    {
+      this.sortedIntervals = sortedIntervals;
+      this.currentInterval = 0;
+      currentIterator =
+          Iterators.peekingIterator(granularity.getIterable(sortedIntervals.get(currentInterval)).iterator());
+    }
+
+    @Override
+    public boolean hasNext()
+    {
+      if (currentIterator != null) {
+        boolean advance = false;
+        while (true) {
+
+          // is it time to move to the next iterator?
+          if (advance) {
+            if (currentInterval < sortedIntervals.size() - 1) {
+              currentIterator =
+                  Iterators.peekingIterator(granularity.getIterable(sortedIntervals.get(++currentInterval)).iterator());
+              advance = false;
+            } else {
+              break;
+            }
+          }
+
+          // check if this iterator has more elements:
+          if (currentIterator.hasNext()) {
+
+            // drop all subsequent intervals that are the same as the previous...
+            while (previous != null && previous.equals(currentIterator.peek())) {
+              currentIterator.next(); // drop it like it's hot
+              if (!currentIterator.hasNext()) {
+                advance = true;
+                break;
+              }
+            }
+            if (advance) {
+              continue; // no more here, get to the next iterator...
+            }
+
+            return true; // there are more
+
+          } else {
+            advance = true; // no more
+          }
+
+        } // while true
+      }
+      return false;
+    }

Review comment:
       This is being sufficiently tested (verified locally by intelli-j coverage) but apparently I need to move the class to some other place so the build picks the coverage (based on team feedback today)... I will look into this and also in the other build failures




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] loquisgon commented on a change in pull request #10742: Granularity interval materialization

Posted by GitBox <gi...@apache.org>.
loquisgon commented on a change in pull request #10742:
URL: https://github.com/apache/druid/pull/10742#discussion_r565762407



##########
File path: indexing-hadoop/src/main/java/org/apache/druid/indexer/DetermineHashedPartitionsJob.java
##########
@@ -109,10 +109,10 @@ public boolean run()
       groupByJob.setOutputValueClass(NullWritable.class);
       groupByJob.setOutputFormatClass(SequenceFileOutputFormat.class);
       groupByJob.setPartitionerClass(DetermineHashedPartitionsPartitioner.class);
-      if (!config.getSegmentGranularIntervals().isPresent()) {
+      if (!config.getSegmentGranularIntervals().iterator().hasNext()) {

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] loquisgon commented on a change in pull request #10742: Granularity interval materialization

Posted by GitBox <gi...@apache.org>.
loquisgon commented on a change in pull request #10742:
URL: https://github.com/apache/druid/pull/10742#discussion_r555457289



##########
File path: server/src/main/java/org/apache/druid/segment/indexing/granularity/GranularitySpec.java
##########
@@ -45,7 +46,7 @@
    *
    * @return set of all time groups

Review comment:
       I will cleanup the javadoc in next commit




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] loquisgon commented on a change in pull request #10742: Granularity interval materialization

Posted by GitBox <gi...@apache.org>.
loquisgon commented on a change in pull request #10742:
URL: https://github.com/apache/druid/pull/10742#discussion_r557576226



##########
File path: server/src/main/java/org/apache/druid/segment/indexing/LookupIntervalBuckets.java
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.indexing;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Iterators;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.guava.Comparators;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import java.util.Iterator;
+import java.util.TreeSet;
+
+public class LookupIntervalBuckets
+{
+  private final Iterable<Interval> intervalIterable;
+  private final TreeSet<Interval> intervals;
+
+  public LookupIntervalBuckets(Iterable<Interval> intervalIterable)

Review comment:
       I added this class to more cleanly re-use the TreeSet logic for bucket extraction among the `ArbitraryGranularitySpec` and the `UniformGranularitySpec`. The code in this PR goal is to avoid materialization of intervals in the Overlord only. We will deal with materialization issues elsewhere later. Thus when code outside the Overlord needs to get a bucket for a particuar `DateTime` then the code will actually materialize all the intervals and stores them in a fast lookup data structure (like `TreeSet`) to return the bucket. In the `UniformGranularitySpec`, we could still iterate through all the intervals and find the bucket (since the intervals are sorted) but that would be linear in the number of the intervals for every call. This is not acceptable given the frequency of calling the method to find the bucket. We feel we can improve on this but we decided to divide the work and at least for now prevent the materialization of intervals in the overlord (see PR description also).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] loquisgon commented on a change in pull request #10742: Granularity interval materialization

Posted by GitBox <gi...@apache.org>.
loquisgon commented on a change in pull request #10742:
URL: https://github.com/apache/druid/pull/10742#discussion_r555991064



##########
File path: server/src/main/java/org/apache/druid/segment/indexing/granularity/GranularitySpec.java
##########
@@ -45,7 +46,7 @@
    *
    * @return set of all time groups

Review comment:
       done

##########
File path: server/src/main/java/org/apache/druid/segment/indexing/granularity/UniformGranularitySpec.java
##########
@@ -57,18 +60,17 @@ public UniformGranularitySpec(
     this.segmentGranularity = segmentGranularity == null ? DEFAULT_SEGMENT_GRANULARITY : segmentGranularity;
 
     if (inputIntervals != null) {
-      List<Interval> granularIntervals = new ArrayList<>();
-      for (Interval inputInterval : inputIntervals) {
-        Iterables.addAll(granularIntervals, this.segmentGranularity.getIterable(inputInterval));
-      }
-      this.inputIntervals = ImmutableList.copyOf(inputIntervals);
-      this.wrappedSpec = new ArbitraryGranularitySpec(queryGranularity, rollup, granularIntervals);
+      // sort them

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] loquisgon commented on a change in pull request #10742: Granularity interval materialization

Posted by GitBox <gi...@apache.org>.
loquisgon commented on a change in pull request #10742:
URL: https://github.com/apache/druid/pull/10742#discussion_r562031987



##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
##########
@@ -373,23 +373,25 @@ private boolean tryLockWithDetermineResult(TaskActionClient client, LockGranular
     }
   }
 
+
   protected boolean tryTimeChunkLock(TaskActionClient client, List<Interval> intervals) throws IOException
   {
-
-    // In this case, the intervals to lock must be aligned with segmentGranularity if it's defined
-    final Iterator<Interval> intervalIterator;
+    // The given intervals are first converted to align with segment granularity. This is because,
+    // when an overwriting task finds a version for a given input row, it expects the interval
+    // associated to each version to be equal or larger than the time bucket where the input row falls in.
+    // See ParallelIndexSupervisorTask.findVersion().
+    final Set<Interval> uniqueCondensedIntervals = new HashSet<>();
     final Granularity segmentGranularity = getSegmentGranularity();
     if (segmentGranularity == null) {
-      Set<Interval> uniqueIntervals = new HashSet<>(intervals);
-      ArrayList<Interval> condensedIntervals = JodaUtils.condenseIntervals(() -> uniqueIntervals.iterator());
-      intervalIterator = condensedIntervals.iterator();
+      uniqueCondensedIntervals.addAll(JodaUtils.condenseIntervals(intervals));
     } else {
       IntervalsByGranularity intervalsByGranularity = new IntervalsByGranularity(intervals, segmentGranularity);
-      intervalIterator = intervalsByGranularity.granularityIntervalsIterator();
+      // the following is calling a condense that does not materialize the intervals:
+      uniqueCondensedIntervals.addAll(JodaUtils.condenseIntervals(intervalsByGranularity.granularityIntervalsIterator()));

Review comment:
       Yes, `addAll` is using the materialized intervals being returned by the `condenseIntervals` call. The assumption here is that the condensed intervals are much fewer than the bucket intervals. I think this assumption will hold in the majority of real cases. I think that in a rare corner case where the original intervals cannot be condensed then we still would have issues, but not in the majority of production cases.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] loquisgon commented on a change in pull request #10742: Granularity interval materialization

Posted by GitBox <gi...@apache.org>.
loquisgon commented on a change in pull request #10742:
URL: https://github.com/apache/druid/pull/10742#discussion_r564941129



##########
File path: core/src/main/java/org/apache/druid/java/util/common/granularity/IntervalsByGranularity.java
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.common.granularity;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
+import com.google.common.collect.Sets;
+import org.apache.druid.java.util.common.guava.Comparators;
+import org.joda.time.Interval;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * Produce a stream of intervals generated by a given set of intervals as input and a given
+ * granularity. This class avoids materializing the granularity intervals whenever possible.
+ */
+public class IntervalsByGranularity
+{
+  private final List<Interval> sortedIntervals;
+  private final Granularity granularity;
+
+  public IntervalsByGranularity(Collection<Interval> intervals, Granularity granularity)
+  {
+    // eliminate dups & sort intervals:
+    HashSet<Interval> intervalSet = Sets.newHashSetWithExpectedSize(intervals.size());
+    intervalSet.addAll(intervals);
+    this.sortedIntervals = new ArrayList<>(intervals.size());
+    this.sortedIntervals.addAll(intervalSet);
+    this.sortedIntervals.sort(Comparators.intervalsByStartThenEnd());

Review comment:
       Great catch! I will add the sanity check




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] loquisgon commented on a change in pull request #10742: Granularity interval materialization

Posted by GitBox <gi...@apache.org>.
loquisgon commented on a change in pull request #10742:
URL: https://github.com/apache/druid/pull/10742#discussion_r557783333



##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
##########
@@ -398,22 +375,21 @@ private boolean tryLockWithDetermineResult(TaskActionClient client, LockGranular
 
   protected boolean tryTimeChunkLock(TaskActionClient client, List<Interval> intervals) throws IOException
   {
-    // The given intervals are first converted to align with segment granularity. This is because,
-    // when an overwriting task finds a version for a given input row, it expects the interval
-    // associated to each version to be equal or larger than the time bucket where the input row falls in.
-    // See ParallelIndexSupervisorTask.findVersion().
-    final Set<Interval> uniqueIntervals = new HashSet<>();
+
+    // In this case, the intervals to lock must be aligned with segmentGranularity if it's defined
+    final Iterator<Interval> intervalIterator;
     final Granularity segmentGranularity = getSegmentGranularity();
-    for (Interval interval : intervals) {
-      if (segmentGranularity == null) {
-        uniqueIntervals.add(interval);
-      } else {
-        Iterables.addAll(uniqueIntervals, segmentGranularity.getIterable(interval));
-      }
+    if (segmentGranularity == null) {
+      Set<Interval> uniqueIntervals = new HashSet<>(intervals);
+      ArrayList<Interval> condensedIntervals = JodaUtils.condenseIntervals(() -> uniqueIntervals.iterator());
+      intervalIterator = condensedIntervals.iterator();
+    } else {
+      IntervalsByGranularity intervalsByGranularity = new IntervalsByGranularity(intervals, segmentGranularity);

Review comment:
       Fixed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] loquisgon commented on a change in pull request #10742: Granularity interval materialization

Posted by GitBox <gi...@apache.org>.
loquisgon commented on a change in pull request #10742:
URL: https://github.com/apache/druid/pull/10742#discussion_r564080150



##########
File path: core/src/main/java/org/apache/druid/java/util/common/granularity/IntervalsByGranularity.java
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.common.granularity;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
+import com.google.common.collect.Sets;
+import org.apache.druid.java.util.common.guava.Comparators;
+import org.joda.time.Interval;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * Produce a stream of intervals generated by a given set of intervals as input and a given
+ * granularity. This class avoids materializing the granularity intervals whenever possible.
+ */
+public class IntervalsByGranularity
+{
+  private final List<Interval> sortedIntervals;
+  private final Granularity granularity;
+
+  public IntervalsByGranularity(Collection<Interval> intervals, Granularity granularity)
+  {
+    // eliminate dups & sort intervals:
+    HashSet<Interval> intervalSet = Sets.newHashSetWithExpectedSize(intervals.size());
+    intervalSet.addAll(intervals);
+    this.sortedIntervals = new ArrayList<>(intervals.size());
+    this.sortedIntervals.addAll(intervalSet);
+    this.sortedIntervals.sort(Comparators.intervalsByStartThenEnd());
+
+    this.granularity = granularity;
+  }
+
+  public Iterator<Interval> granularityIntervalsIterator()
+  {
+    Iterator<Interval> ite;
+    if (sortedIntervals.isEmpty()) {
+      ite = Collections.emptyIterator();
+    } else {
+      ite = new IntervalIterator(sortedIntervals);
+    }
+    return ite;
+  }
+
+  private class IntervalIterator implements Iterator<Interval>
+  {
+    private final List<Interval> sortedIntervals;
+    private int currentInterval;
+    private volatile PeekingIterator<Interval> currentIterator;
+    private Interval previous;
+
+    public IntervalIterator(List<Interval> sortedIntervals)
+    {
+      this.sortedIntervals = sortedIntervals;
+      this.currentInterval = 0;
+      currentIterator =
+          Iterators.peekingIterator(granularity.getIterable(sortedIntervals.get(currentInterval)).iterator());
+    }
+
+    @Override
+    public boolean hasNext()
+    {
+      if (currentIterator != null) {
+        boolean advance = false;
+        while (true) {
+
+          // is it time to move to the next iterator?
+          if (advance) {
+            if (currentInterval < sortedIntervals.size() - 1) {
+              currentIterator =
+                  Iterators.peekingIterator(granularity.getIterable(sortedIntervals.get(++currentInterval)).iterator());
+              advance = false;
+            } else {
+              break;
+            }
+          }
+
+          // check if this iterator has more elements:
+          if (currentIterator.hasNext()) {
+
+            // drop all subsequent intervals that are the same as the previous...
+            while (previous != null && previous.equals(currentIterator.peek())) {
+              currentIterator.next(); // drop it like it's hot
+              if (!currentIterator.hasNext()) {
+                advance = true;
+                break;
+              }
+            }
+            if (advance) {
+              continue; // no more here, get to the next iterator...
+            }
+
+            return true; // there are more
+
+          } else {
+            advance = true; // no more
+          }
+
+        } // while true
+      }
+      return false;
+    }

Review comment:
       This is being sufficiently tested (verified locally by intelli-j coverage) but apparently I need to move the class to some other place so the build picks the coverage (based on team feedback today)... I will look into this and also in the other build failures




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] loquisgon commented on a change in pull request #10742: Granularity interval materialization

Posted by GitBox <gi...@apache.org>.
loquisgon commented on a change in pull request #10742:
URL: https://github.com/apache/druid/pull/10742#discussion_r564678335



##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
##########
@@ -373,23 +373,25 @@ private boolean tryLockWithDetermineResult(TaskActionClient client, LockGranular
     }
   }
 
+
   protected boolean tryTimeChunkLock(TaskActionClient client, List<Interval> intervals) throws IOException
   {
-
-    // In this case, the intervals to lock must be aligned with segmentGranularity if it's defined
-    final Iterator<Interval> intervalIterator;
+    // The given intervals are first converted to align with segment granularity. This is because,
+    // when an overwriting task finds a version for a given input row, it expects the interval
+    // associated to each version to be equal or larger than the time bucket where the input row falls in.
+    // See ParallelIndexSupervisorTask.findVersion().
+    final Set<Interval> uniqueCondensedIntervals = new HashSet<>();
     final Granularity segmentGranularity = getSegmentGranularity();
     if (segmentGranularity == null) {
-      Set<Interval> uniqueIntervals = new HashSet<>(intervals);
-      ArrayList<Interval> condensedIntervals = JodaUtils.condenseIntervals(() -> uniqueIntervals.iterator());
-      intervalIterator = condensedIntervals.iterator();
+      uniqueCondensedIntervals.addAll(JodaUtils.condenseIntervals(intervals));
     } else {
       IntervalsByGranularity intervalsByGranularity = new IntervalsByGranularity(intervals, segmentGranularity);
-      intervalIterator = intervalsByGranularity.granularityIntervalsIterator();
+      // the following is calling a condense that does not materialize the intervals:
+      uniqueCondensedIntervals.addAll(JodaUtils.condenseIntervals(intervalsByGranularity.granularityIntervalsIterator()));

Review comment:
       No need to write the new method that does not materialize intervals... it is already done by the `JodaUtils` `condenseIntervals` method that takes an iterator...




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org