You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2021/01/26 05:35:42 UTC

[GitHub] [druid] jihoonson commented on a change in pull request #10742: Granularity interval materialization

jihoonson commented on a change in pull request #10742:
URL: https://github.com/apache/druid/pull/10742#discussion_r564203826



##########
File path: core/src/main/java/org/apache/druid/java/util/common/JodaUtils.java
##########
@@ -50,15 +58,31 @@
         sortedIntervals.add(interval);
       }
     }
+    return condenseIntervals(sortedIntervals.iterator());
+  }
 
-    if (sortedIntervals.isEmpty()) {
-      return new ArrayList<>();
+  /**
+   * This method does not materialize the intervals represented by the
+   * sortedIntervals iterator. However, caller needs to insure that sortedIntervals
+   * is already sorted in ascending order.

Review comment:
       Since `Interval` is not `Comparable` (it's not because there could be multiple ways to compare them), it would be nice to explicitly state the order should be `Comparators.intervalsByStartThenEnd()` to make it more clear.

##########
File path: core/src/main/java/org/apache/druid/java/util/common/granularity/IntervalsByGranularity.java
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.common.granularity;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
+import com.google.common.collect.Sets;
+import org.apache.druid.java.util.common.guava.Comparators;
+import org.joda.time.Interval;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * Produce a stream of intervals generated by a given set of intervals as input and a given
+ * granularity. This class avoids materializing the granularity intervals whenever possible.
+ */
+public class IntervalsByGranularity
+{
+  private final List<Interval> sortedIntervals;
+  private final Granularity granularity;
+
+  public IntervalsByGranularity(Collection<Interval> intervals, Granularity granularity)
+  {
+    // eliminate dups & sort intervals:
+    HashSet<Interval> intervalSet = Sets.newHashSetWithExpectedSize(intervals.size());
+    intervalSet.addAll(intervals);
+    this.sortedIntervals = new ArrayList<>(intervals.size());
+    this.sortedIntervals.addAll(intervalSet);
+    this.sortedIntervals.sort(Comparators.intervalsByStartThenEnd());

Review comment:
       It seems that `IntervalIterator` would work only when `sortedIntervals` don't overlap each other because the intervals returned from `IntervalIterator` may not be sorted otherwise. Is this correct? Then, please add a sanity check after dedup to make sure there is no overlap.

##########
File path: indexing-hadoop/src/main/java/org/apache/druid/indexer/DetermineHashedPartitionsJob.java
##########
@@ -109,10 +109,10 @@ public boolean run()
       groupByJob.setOutputValueClass(NullWritable.class);
       groupByJob.setOutputFormatClass(SequenceFileOutputFormat.class);
       groupByJob.setPartitionerClass(DetermineHashedPartitionsPartitioner.class);
-      if (!config.getSegmentGranularIntervals().isPresent()) {
+      if (!config.getSegmentGranularIntervals().iterator().hasNext()) {

Review comment:
       Same comment for other places where it calls `iterator.hasNext()` for the same purpose.

##########
File path: indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJobTest.java
##########
@@ -99,7 +98,7 @@ public void testRunWithSingleDimensionPartitionsSpecCreateHashBasedNumberedShard
     final HadoopDruidIndexerConfig config = Mockito.mock(HadoopDruidIndexerConfig.class);
     Mockito.when(config.isDeterminingPartitions()).thenReturn(false);
     Mockito.when(config.getPartitionsSpec()).thenReturn(partitionsSpec);
-    Mockito.when(config.getSegmentGranularIntervals()).thenReturn(Optional.of(intervals));
+    Mockito.when(config.getSegmentGranularIntervals()).thenReturn(() -> intervals.iterator());

Review comment:
       nit: the lambda is redundant since `intervals` is an `Iterable`.

##########
File path: core/src/main/java/org/apache/druid/java/util/common/granularity/IntervalsByGranularity.java
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.common.granularity;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
+import com.google.common.collect.Sets;
+import org.apache.druid.java.util.common.guava.Comparators;
+import org.joda.time.Interval;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * Produce a stream of intervals generated by a given set of intervals as input and a given
+ * granularity. This class avoids materializing the granularity intervals whenever possible.
+ */
+public class IntervalsByGranularity
+{
+  private final List<Interval> sortedIntervals;
+  private final Granularity granularity;
+
+  public IntervalsByGranularity(Collection<Interval> intervals, Granularity granularity)
+  {
+    // eliminate dups & sort intervals:
+    HashSet<Interval> intervalSet = Sets.newHashSetWithExpectedSize(intervals.size());
+    intervalSet.addAll(intervals);
+    this.sortedIntervals = new ArrayList<>(intervals.size());
+    this.sortedIntervals.addAll(intervalSet);
+    this.sortedIntervals.sort(Comparators.intervalsByStartThenEnd());
+
+    this.granularity = granularity;
+  }
+
+  public Iterator<Interval> granularityIntervalsIterator()
+  {
+    Iterator<Interval> ite;
+    if (sortedIntervals.isEmpty()) {
+      ite = Collections.emptyIterator();
+    } else {
+      ite = new IntervalIterator(sortedIntervals);
+    }
+    return ite;
+  }
+
+  private class IntervalIterator implements Iterator<Interval>
+  {
+    private final List<Interval> sortedIntervals;
+    private int currentInterval;
+    private volatile PeekingIterator<Interval> currentIterator;
+    private Interval previous;
+
+    public IntervalIterator(List<Interval> sortedIntervals)
+    {
+      this.sortedIntervals = sortedIntervals;
+      this.currentInterval = 0;
+      currentIterator =
+          Iterators.peekingIterator(granularity.getIterable(sortedIntervals.get(currentInterval)).iterator());
+    }
+
+    @Override
+    public boolean hasNext()
+    {
+      if (currentIterator != null) {
+        boolean advance = false;
+        while (true) {
+
+          // is it time to move to the next iterator?
+          if (advance) {
+            if (currentInterval < sortedIntervals.size() - 1) {
+              currentIterator =
+                  Iterators.peekingIterator(granularity.getIterable(sortedIntervals.get(++currentInterval)).iterator());
+              advance = false;
+            } else {
+              break;
+            }
+          }
+
+          // check if this iterator has more elements:
+          if (currentIterator.hasNext()) {
+
+            // drop all subsequent intervals that are the same as the previous...
+            while (previous != null && previous.equals(currentIterator.peek())) {

Review comment:
       Does the iterator created from `granularity.getIterable()` ever return the same interval? It doesn't seem like so and this check seems unnecessary.

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
##########
@@ -1062,7 +1064,7 @@ SegmentIdWithShardSpec allocateNewSegment(DateTime timestamp) throws IOException
       }
 
       interval = maybeInterval.get();
-      if (!bucketIntervals.get().contains(interval)) {
+      if (!Iterators.contains(bucketIntervals.iterator(), interval)) {

Review comment:
       This method is called whenever subtasks need to allocate a new segment via the supervisor task. As a result, this code is never called in the Overlord. It might be better to do this check without materialization for less memory pressure, but will degrade ingestion performance. I suggest to keep the current behaviour of materializing all bucket intervals here because I'm not sure what is the best way to handle this yet. We need to think more about how to fix the OOM error in the task as a follow-up.

##########
File path: server/src/main/java/org/apache/druid/segment/indexing/granularity/GranularitySpec.java
##########
@@ -41,11 +40,11 @@
 public interface GranularitySpec
 {
   /**
-   * Set of all time groups, broken up on segment boundaries. Should be sorted by interval start and non-overlapping.
+   * Iterable all time groups, broken up on segment boundaries. Should be sorted by interval start and non-overlapping.
    *
-   * @return set of all time groups
+   * @return Iterable of all time groups
    */
-  Optional<SortedSet<Interval>> bucketIntervals();
+  Iterable<Interval> bucketIntervals();

Review comment:
       Suggest `sortedBucketIntervals()` to make it clear the results are sorted.

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
##########
@@ -320,12 +320,12 @@ PartialRangeSegmentGenerateParallelIndexTaskRunner createPartialRangeSegmentGene
   )
   {
     return new PartialRangeSegmentGenerateParallelIndexTaskRunner(
-       toolbox,
-       getId(),
-       getGroupId(),
-       ingestionSchema,
-       getContext(),
-       intervalToPartitions
+        toolbox,

Review comment:
       nit: I think this PR is OK, but just FYI, it's usually recommended to not fix the code style unless you are modifying that area because it makes hard to find what are the real changes of the PR.

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
##########
@@ -373,23 +373,25 @@ private boolean tryLockWithDetermineResult(TaskActionClient client, LockGranular
     }
   }
 
+
   protected boolean tryTimeChunkLock(TaskActionClient client, List<Interval> intervals) throws IOException
   {
-
-    // In this case, the intervals to lock must be aligned with segmentGranularity if it's defined
-    final Iterator<Interval> intervalIterator;
+    // The given intervals are first converted to align with segment granularity. This is because,
+    // when an overwriting task finds a version for a given input row, it expects the interval
+    // associated to each version to be equal or larger than the time bucket where the input row falls in.
+    // See ParallelIndexSupervisorTask.findVersion().
+    final Set<Interval> uniqueCondensedIntervals = new HashSet<>();
     final Granularity segmentGranularity = getSegmentGranularity();
     if (segmentGranularity == null) {
-      Set<Interval> uniqueIntervals = new HashSet<>(intervals);
-      ArrayList<Interval> condensedIntervals = JodaUtils.condenseIntervals(() -> uniqueIntervals.iterator());
-      intervalIterator = condensedIntervals.iterator();
+      uniqueCondensedIntervals.addAll(JodaUtils.condenseIntervals(intervals));
     } else {
       IntervalsByGranularity intervalsByGranularity = new IntervalsByGranularity(intervals, segmentGranularity);
-      intervalIterator = intervalsByGranularity.granularityIntervalsIterator();
+      // the following is calling a condense that does not materialize the intervals:
+      uniqueCondensedIntervals.addAll(JodaUtils.condenseIntervals(intervalsByGranularity.granularityIntervalsIterator()));

Review comment:
       > Yes, `addAll` is using the materialized intervals being returned by the `condenseIntervals` call. The assumption here is that the condensed intervals are much fewer than the bucket intervals. I think this assumption will hold in the majority of real cases. I think that in a rare corner case where the original intervals cannot be condensed then we still would have issues, but not in the majority of production cases.
   
   I agree but it doesn't seem hard to avoid materialization. How about adding a new method `condensedIntervalsIterator()` which returns an iterator that lazily computes condensed intervals? It could be something like this:
   
   ```java
     public static Iterator<Interval> condensedIntervalsIterator(Iterator<Interval> sortedIntervals)
     {
       if (!sortedIntervals.hasNext()) {
         return Iterators.emptyIterator();
       }
   
       final PeekingIterator<Interval> peekingIterator = Iterators.peekingIterator(sortedIntervals);
       return new Iterator<Interval>()
       {
         @Override
         public boolean hasNext()
         {
           return peekingIterator.hasNext();
         }
   
         @Override
         public Interval next()
         {
           if (!hasNext()) {
             throw new NoSuchElementException();
           }
           Interval currInterval = peekingIterator.next();
           while (peekingIterator.hasNext()) {
             Interval next = peekingIterator.peek();
   
             if (currInterval.abuts(next)) {
               currInterval = new Interval(currInterval.getStart(), next.getEnd());
               peekingIterator.next();
             } else if (currInterval.overlaps(next)) {
               DateTime nextEnd = next.getEnd();
               DateTime currEnd = currInterval.getEnd();
               currInterval = new Interval(
                   currInterval.getStart(),
                   nextEnd.isAfter(currEnd) ? nextEnd : currEnd
               );
               peekingIterator.next();
             } else {
               break;
             }
           }
           return currInterval;
         }
       };
     }
   ```
   
   Then, `tryTimeChunkLock()` doesn't have to materialize intervals at all after condensing.
   
   ```java
     protected boolean tryTimeChunkLock(TaskActionClient client, List<Interval> intervals) throws IOException
     {
       // The given intervals are first converted to align with segment granularity. This is because,
       // when an overwriting task finds a version for a given input row, it expects the interval
       // associated to each version to be equal or larger than the time bucket where the input row falls in.
       // See ParallelIndexSupervisorTask.findVersion().
       final Iterator<Interval> intervalIterator;
       final Granularity segmentGranularity = getSegmentGranularity();
       if (segmentGranularity == null) {
         intervalIterator = JodaUtils.condenseIntervals(intervals).iterator();
       } else {
         IntervalsByGranularity intervalsByGranularity = new IntervalsByGranularity(intervals, segmentGranularity);
         // the following is calling a condense that does not materialize the intervals:
         intervalIterator = JodaUtils.condensedIntervalsIterator(intervalsByGranularity.granularityIntervalsIterator());
       }
   
       // Intervals are already condensed to avoid creating too many locks.
       // Intervals are also sorted and thus it's safe to compare only the previous interval and current one for dedup.
       Interval prev = null;
       while (intervalIterator.hasNext()) {
         final Interval cur = intervalIterator.next();
         if (prev != null && cur.equals(prev)) {
           continue;
         }
         prev = cur;
         final TaskLock lock = client.submit(new TimeChunkLockTryAcquireAction(TaskLockType.EXCLUSIVE, cur));
         if (lock == null) {
           return false;
         }
       }
       return true;
     }
   ```

##########
File path: server/src/main/java/org/apache/druid/segment/indexing/LookupIntervalBuckets.java
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.indexing;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Iterators;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.guava.Comparators;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import java.util.Iterator;
+import java.util.TreeSet;
+
+public class LookupIntervalBuckets

Review comment:
       Please add some javadoc explaining what this class does and what is for.

##########
File path: core/src/main/java/org/apache/druid/java/util/common/granularity/IntervalsByGranularity.java
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.common.granularity;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
+import com.google.common.collect.Sets;
+import org.apache.druid.java.util.common.guava.Comparators;
+import org.joda.time.Interval;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * Produce a stream of intervals generated by a given set of intervals as input and a given
+ * granularity. This class avoids materializing the granularity intervals whenever possible.
+ */
+public class IntervalsByGranularity
+{
+  private final List<Interval> sortedIntervals;
+  private final Granularity granularity;
+
+  public IntervalsByGranularity(Collection<Interval> intervals, Granularity granularity)
+  {
+    // eliminate dups & sort intervals:
+    HashSet<Interval> intervalSet = Sets.newHashSetWithExpectedSize(intervals.size());
+    intervalSet.addAll(intervals);
+    this.sortedIntervals = new ArrayList<>(intervals.size());
+    this.sortedIntervals.addAll(intervalSet);
+    this.sortedIntervals.sort(Comparators.intervalsByStartThenEnd());
+
+    this.granularity = granularity;
+  }
+
+  public Iterator<Interval> granularityIntervalsIterator()
+  {
+    Iterator<Interval> ite;
+    if (sortedIntervals.isEmpty()) {
+      ite = Collections.emptyIterator();
+    } else {
+      ite = new IntervalIterator(sortedIntervals);

Review comment:
       Does `IntervalIterator` just flatten the nested iterators and concat them? If this is the case, I suggest to use `FluentIterable.from(sortedIntervals).transformAndConcat(interval -> granularity.getIterable(interval))` instead because it's better to use a well-tested library than inventing another wheel.

##########
File path: core/src/main/java/org/apache/druid/java/util/common/JodaUtils.java
##########
@@ -50,15 +58,31 @@
         sortedIntervals.add(interval);
       }
     }
+    return condenseIntervals(sortedIntervals.iterator());
+  }
 
-    if (sortedIntervals.isEmpty()) {
-      return new ArrayList<>();
+  /**
+   * This method does not materialize the intervals represented by the
+   * sortedIntervals iterator. However, caller needs to insure that sortedIntervals
+   * is already sorted in ascending order.

Review comment:
       How about adding a sanity check in the method, so that ti can break if this expectation doesn't meet? Otherwise, it will be hard to debug if something goes wrong in the future.

##########
File path: indexing-hadoop/src/main/java/org/apache/druid/indexer/DetermineHashedPartitionsJob.java
##########
@@ -109,10 +109,10 @@ public boolean run()
       groupByJob.setOutputValueClass(NullWritable.class);
       groupByJob.setOutputFormatClass(SequenceFileOutputFormat.class);
       groupByJob.setPartitionerClass(DetermineHashedPartitionsPartitioner.class);
-      if (!config.getSegmentGranularIntervals().isPresent()) {
+      if (!config.getSegmentGranularIntervals().iterator().hasNext()) {

Review comment:
       This code just wants to know whether `inputIntervals` are set or not. Suggest to add a new method `hasInputIntervals()` in `GranularitySpec` for better readability.

##########
File path: indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJobTest.java
##########
@@ -59,7 +58,7 @@ public void testRunWithHashedPartitionsSpecCreateHashBasedNumberedShardSpecWithH
     final HadoopDruidIndexerConfig config = Mockito.mock(HadoopDruidIndexerConfig.class);
     Mockito.when(config.isDeterminingPartitions()).thenReturn(false);
     Mockito.when(config.getPartitionsSpec()).thenReturn(partitionsSpec);
-    Mockito.when(config.getSegmentGranularIntervals()).thenReturn(Optional.of(intervals));
+    Mockito.when(config.getSegmentGranularIntervals()).thenReturn(() -> intervals.iterator());

Review comment:
       nit: the lambda is redundant since `intervals` is an `Iterable`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org