You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2019/03/28 00:15:59 UTC

[GitHub] [incubator-druid] justinborromeo commented on a change in pull request #7133: Time Ordering On Scans

justinborromeo commented on a change in pull request #7133: Time Ordering On Scans
URL: https://github.com/apache/incubator-druid/pull/7133#discussion_r269818704
 
 

 ##########
 File path: processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java
 ##########
 @@ -68,34 +89,224 @@ public ScanQueryRunnerFactory(
   )
   {
     // in single thread and in jetty thread instead of processing thread
-    return new QueryRunner<ScanResultValue>()
-    {
-      @Override
-      public Sequence<ScanResultValue> run(
-          final QueryPlus<ScanResultValue> queryPlus, final Map<String, Object> responseContext
-      )
-      {
-        // Note: this variable is effective only when queryContext has a timeout.
-        // See the comment of CTX_TIMEOUT_AT.
-        final long timeoutAt = System.currentTimeMillis() + QueryContexts.getTimeout(queryPlus.getQuery());
-        responseContext.put(CTX_TIMEOUT_AT, timeoutAt);
-        return Sequences.concat(
+    return (queryPlus, responseContext) -> {
+      ScanQuery query = (ScanQuery) queryPlus.getQuery();
+
+      // Note: this variable is effective only when queryContext has a timeout.
+      // See the comment of CTX_TIMEOUT_AT.
+      final long timeoutAt = System.currentTimeMillis() + QueryContexts.getTimeout(queryPlus.getQuery());
+      responseContext.put(CTX_TIMEOUT_AT, timeoutAt);
+
+      if (query.getOrder().equals(ScanQuery.Order.NONE)) {
+        // Use normal strategy
+        Sequence<ScanResultValue> returnedRows = Sequences.concat(
             Sequences.map(
                 Sequences.simple(queryRunners),
-                new Function<QueryRunner<ScanResultValue>, Sequence<ScanResultValue>>()
-                {
-                  @Override
-                  public Sequence<ScanResultValue> apply(final QueryRunner<ScanResultValue> input)
-                  {
-                    return input.run(queryPlus, responseContext);
-                  }
-                }
+                input -> input.run(queryPlus, responseContext)
             )
         );
+        if (query.getLimit() <= Integer.MAX_VALUE) {
+          return returnedRows.limit(Math.toIntExact(query.getLimit()));
+        } else {
+          return returnedRows;
+        }
+      } else {
+        if (!(query.getQuerySegmentSpec() instanceof MultipleSpecificSegmentSpec)) {
+          throw new UOE("Time-ordering on scan queries is only supported for queries with segment specs"
+                        + "of type MultipleSpecificSegmentSpec");
+        }
+        List<SegmentDescriptor> descriptorsOrdered =
+            ((MultipleSpecificSegmentSpec) query.getQuerySegmentSpec()).getDescriptors(); // Ascending time order
+        List<QueryRunner<ScanResultValue>> queryRunnersOrdered = Lists.newArrayList(queryRunners); // Ascending time order by default
+
+        if (query.getOrder().equals(ScanQuery.Order.DESCENDING)) {
+          descriptorsOrdered = Lists.reverse(descriptorsOrdered);
+          queryRunnersOrdered = Lists.reverse(queryRunnersOrdered);
+        }
+
+        if (query.getLimit() <= scanQueryConfig.getMaxRowsQueuedForOrdering()) {
+          // Use priority queue strategy
+          return priorityQueueSortAndLimit(
+              Sequences.concat(Sequences.map(
+                  Sequences.simple(queryRunnersOrdered),
+                  input -> input.run(queryPlus, responseContext)
+              )),
+              query,
+              descriptorsOrdered
+          );
+        } else {
+          Preconditions.checkState(
+              descriptorsOrdered.size() == queryRunnersOrdered.size(),
+              "Number of segment descriptors does not equal number of "
+              + "query runners...something went wrong!"
+          );
+
+          // Combine the two lists of segment descriptors and query runners into a single list of
+          // segment descriptors - query runner pairs.  This makes it easier to use stream operators.
+          List<Pair<SegmentDescriptor, QueryRunner<ScanResultValue>>> descriptorsAndRunnersOrdered = new ArrayList<>();
+          for (int i = 0; i < queryRunnersOrdered.size(); i++) {
+            descriptorsAndRunnersOrdered.add(new Pair<>(descriptorsOrdered.get(i), queryRunnersOrdered.get(i)));
+          }
+
+          // Group the list of pairs by interval.  The LinkedHashMap will have an interval paired with a list of all the
+          // query runners for that segment
+          LinkedHashMap<Interval, List<Pair<SegmentDescriptor, QueryRunner<ScanResultValue>>>> partitionsGroupedByInterval =
+              descriptorsAndRunnersOrdered.stream()
+                                          .collect(Collectors.groupingBy(
+                                              x -> x.lhs.getInterval(),
+                                              LinkedHashMap::new,
+                                              Collectors.toList()
+                                          ));
+
+          // Find the segment with the largest numbers of partitions.  This will be used to compare with the
+          // maxSegmentPartitionsOrderedInMemory limit to determine if the query is at risk of consuming too much memory.
+          int maxNumPartitionsInSegment =
+              partitionsGroupedByInterval.values()
+                                         .stream()
+                                         .map(x -> x.size())
+                                         .max(Comparator.comparing(Integer::valueOf))
+                                         .get();
+
+          if (maxNumPartitionsInSegment <= scanQueryConfig.getMaxSegmentPartitionsOrderedInMemory()) {
+            // Use n-way merge strategy
+
+            // Create a list of grouped runner lists (i.e. each sublist/"runner group" corresponds to an interval) ->
+            // there should be no interval overlap.  We create a list of lists so we can create a sequence of sequences.
+            // There's no easy way to convert a LinkedHashMap to a sequence because it's non-iterable.
+            List<List<QueryRunner<ScanResultValue>>> groupedRunners =
+                partitionsGroupedByInterval.entrySet()
+                                           .stream()
+                                           .map(entry -> entry.getValue()
+                                                              .stream()
+                                                              .map(segQueryRunnerPair -> segQueryRunnerPair.rhs)
+                                                              .collect(Collectors.toList()))
+                                           .collect(Collectors.toList());
+
+            return nWayMergeAndLimit(groupedRunners, queryPlus, responseContext);
+          }
+          throw new UOE(
+              "Time ordering for queries of %,d partitions per segment and a row limit of %,d is not supported."
+              + "  Try reducing the scope of the query to scan fewer partitions than the configurable limit of"
+              + " %,d partitions or lower the row limit below %,d.",
+              maxNumPartitionsInSegment,
+              query.getLimit(),
+              scanQueryConfig.getMaxSegmentPartitionsOrderedInMemory(),
+              scanQueryConfig.getMaxRowsQueuedForOrdering()
+          );
+        }
       }
     };
   }
 
+  @VisibleForTesting
+  Sequence<ScanResultValue> priorityQueueSortAndLimit(
+      Sequence<ScanResultValue> inputSequence,
+      ScanQuery scanQuery,
+      List<SegmentDescriptor> descriptorsOrdered
+  )
+  {
+    Comparator<ScanResultValue> priorityQComparator = new ScanResultValueTimestampComparator(scanQuery);
+
+    // Converting the limit from long to int could theoretically throw an ArithmeticException but this branch
+    // only runs if limit < MAX_LIMIT_FOR_IN_MEMORY_TIME_ORDERING (which should be < Integer.MAX_VALUE)
+    int limit = Math.toIntExact(scanQuery.getLimit());
+
+    PriorityQueue<ScanResultValue> q = new PriorityQueue<>(limit, priorityQComparator);
+
+    Yielder<ScanResultValue> yielder = inputSequence.toYielder(
+        null,
+        new YieldingAccumulator<ScanResultValue, ScanResultValue>()
+        {
+          @Override
+          public ScanResultValue accumulate(ScanResultValue accumulated, ScanResultValue in)
+          {
+            yield();
+            return in;
+          }
+        }
+    );
+    boolean doneScanning = yielder.isDone();
+    // We need to scan limit elements and anything else in the last segment
+    int numRowsScanned = 0;
+    Interval finalInterval = null;
+    while (!doneScanning) {
+      ScanResultValue next = yielder.get();
+      List<ScanResultValue> singleEventScanResultValues = next.toSingleEventScanResultValues();
+      for (ScanResultValue srv : singleEventScanResultValues) {
+        numRowsScanned++;
+        // Using an intermediate unbatched ScanResultValue is not that great memory-wise, but the column list
+        // needs to be preserved for queries using the compactedList result format
+        q.offer(srv);
+        if (q.size() > limit) {
+          q.poll();
+        }
+
+        // Finish scanning the interval containing the limit row
+        if (numRowsScanned > limit && finalInterval == null) {
+          long timestampOfLimitRow = srv.getFirstEventTimestamp(scanQuery.getResultFormat());
+          for (SegmentDescriptor descriptor : descriptorsOrdered) {
+            if (descriptor.getInterval().contains(timestampOfLimitRow)) {
+              finalInterval = descriptor.getInterval();
+            }
+          }
+          if (finalInterval == null) {
+            throw new ISE("WTH???  Row came from an unscanned interval?");
+          }
+        }
+      }
+      yielder = yielder.next(null);
+      doneScanning = yielder.isDone() ||
+                     (finalInterval != null &&
+                      !finalInterval.contains(next.getFirstEventTimestamp(scanQuery.getResultFormat())));
+    }
+    // Need to convert to a Deque because Priority Queue's iterator doesn't guarantee that the sorted order
+    // will be maintained.  Deque was chosen over list because its addFirst is O(1).
+    final Deque<ScanResultValue> sortedElements = new ArrayDeque<>(q.size());
+    while (q.size() != 0) {
+      // addFirst is used since PriorityQueue#poll() dequeues the low-priority (timestamp-wise) events first.
+      sortedElements.addFirst(q.poll());
+    }
+    return Sequences.simple(sortedElements);
+  }
+
+  @VisibleForTesting
+  Sequence<ScanResultValue> nWayMergeAndLimit(
+      List<List<QueryRunner<ScanResultValue>>> groupedRunners,
+      QueryPlus<ScanResultValue> queryPlus,
+      Map<String, Object> responseContext
+  )
+  {
+    // Starting from the innermost Sequences.map:
+    // (1) Deaggregate each ScanResultValue returned by the query runners
+    // (2) Combine the deaggregated ScanResultValues into a single sequence
+    // (3) Create a sequence of results from each runner in the group and flatmerge based on timestamp
+    // (4) Create a sequence of results from each runner group
+    // (5) Join all the results into a single sequence
+
+    return Sequences.concat(
+        Sequences.map(
+            Sequences.simple(groupedRunners),
+            runnerGroup ->
+                Sequences.map(
+                    Sequences.simple(runnerGroup),
+                    (input) -> Sequences.concat(
+                        Sequences.map(
+                            input.run(queryPlus, responseContext),
+                            srv -> Sequences.simple(srv.toSingleEventScanResultValues())
+                        )
+                    )
+                ).flatMerge(
+                    seq -> seq,
+                    Ordering.from(new ScanResultValueTimestampComparator(
+                        (ScanQuery) queryPlus.getQuery()
+                    )).reverse()
+                )
+        )
+    ).limit(
+        Math.toIntExact(((ScanQuery) (queryPlus.getQuery())).getLimit())
 
 Review comment:
   I'll change the `Sequence#limit()` function to accept longs.  There shouldn't be any adverse consequences.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org