You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2022/10/07 23:13:18 UTC

[GitHub] [druid] paul-rogers commented on a diff in pull request #13168: ScanQuery supports multi column orderBy queries

paul-rogers commented on code in PR #13168:
URL: https://github.com/apache/druid/pull/13168#discussion_r990523689


##########
processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java:
##########
@@ -496,6 +523,22 @@ public ScanQuery withOverriddenContext(Map<String, Object> contextOverrides)
     return Druids.ScanQueryBuilder.copy(this).context(computeOverriddenContext(getContext(), contextOverrides)).build();
   }
 
+  public boolean scanOrderByNonTime()
+  {
+
+    if (orderBys.size() > 1 || (orderBys.size() == 1 && !ColumnHolder.TIME_COLUMN_NAME.equals(orderBys.get(0).getColumnName()))) {
+      //order by Ordinary column
+      return true;
+    }
+
+    if (orderBys.size() == 1 && ColumnHolder.TIME_COLUMN_NAME.equals(orderBys.get(0).getColumnName()) && getDataSource() instanceof InlineDataSource) {
+      //The type of datasource is inlineDataSource and order by __time
+      return true;
+    }
+
+    return false;

Review Comment:
   Perhaps clarify the rules a bit:
   
   ```java
   /**
     * Report whether the sort can be pushed into the Cursor, or must be done as a
     * separate sort step.
     */
   public boolean canPushSort()
   {
     // Can push non-existent sort.
     if (orderBys.size() == 0) {
      return true;
     }
     // Cursor can sort by only one column.
     if (orderBys.size() > 1) {
       return false;
     }
     // Inline datasources can't sort
     if (getDataSource() instanceof InlineDataSource) {
       return false;
     }
     // Cursor can't sort by the __time column
     return !ColumnHolder.TIME_COLUMN_NAME.equals(orderBys.get(0).getColumnName());
   }
   ```
   
   I'm not sure of the last two rules. Can an inline datasource do any sorting at all? The PR code suggests it can sort by a single column other than `_time`. Is this right?
   
   Is it also true that the Cursor can sort on any column other than time. But, I thought that segments are sorted by time, so you get `__time` "for free."



##########
processing/src/main/java/org/apache/druid/query/scan/ScanQueryConfig.java:
##########
@@ -27,6 +27,7 @@
 {
   public static final String CTX_KEY_MAX_ROWS_QUEUED_FOR_ORDERING = "maxRowsQueuedForOrdering";
   public static final String CTX_KEY_MAX_SEGMENT_PARTITIONS_FOR_ORDERING = "maxSegmentPartitionsOrderedInMemory";
+  public static final String CTX_KEY_QUERY_RUNNER_TYPE = "orderByQueryRunnerType";

Review Comment:
   Is this something the user would pass in?



##########
processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java:
##########
@@ -90,13 +92,27 @@ public QueryRunner<ScanResultValue> mergeRunners(
     // in single thread and in Jetty thread instead of processing thread
     return (queryPlus, responseContext) -> {
       ScanQuery query = (ScanQuery) queryPlus.getQuery();
-      ScanQuery.verifyOrderByForNativeExecution(query);
 
       // Note: this variable is effective only when queryContext has a timeout.
       // See the comment of ResponseContext.Key.TIMEOUT_AT.
       final long timeoutAt = System.currentTimeMillis() + QueryContexts.getTimeout(queryPlus.getQuery());
       responseContext.putTimeoutTime(timeoutAt);
 
+      if (query.scanOrderByNonTime()) {

Review Comment:
   Nit: maybe move the `scanOrderByNonTime()` into this class. The decision about what can be pushed to the `Cursor` and what has to be done by a separate sorter feels more like an implementation-level thing than a query-definition thing.



##########
processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java:
##########
@@ -90,13 +92,27 @@ public QueryRunner<ScanResultValue> mergeRunners(
     // in single thread and in Jetty thread instead of processing thread
     return (queryPlus, responseContext) -> {
       ScanQuery query = (ScanQuery) queryPlus.getQuery();
-      ScanQuery.verifyOrderByForNativeExecution(query);
 
       // Note: this variable is effective only when queryContext has a timeout.
       // See the comment of ResponseContext.Key.TIMEOUT_AT.
       final long timeoutAt = System.currentTimeMillis() + QueryContexts.getTimeout(queryPlus.getQuery());
       responseContext.putTimeoutTime(timeoutAt);
 
+      if (query.scanOrderByNonTime()) {
+        try {
+          return multiColumnSort(
+              Sequences.concat(Sequences.map(
+                  Sequences.simple(Lists.newArrayList(queryRunners)),
+                  input -> input.run(queryPlus, responseContext)
+              )),
+              query
+          );
+        }
+        catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+      }

Review Comment:
   We're in the `mergeRunners` method, creating the merge step that combines previously-sorted, potentially-limited per-segment (or per-historical) results. So, this is where we want a merger, not a sort. There are merge sequences floating around: we just have to provide the comparator for the multi-column case. See further down in this method for code that creates the merger. (Yes, the names are confusing, and the code is overly complex, but the merger is there.)



##########
processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java:
##########
@@ -275,6 +296,66 @@ Sequence<ScanResultValue> stableLimitingSort(
     }
   }
 
+  Sequence<ScanResultValue> multiColumnSort(
+      Sequence<ScanResultValue> inputSequence,
+      ScanQuery scanQuery
+  ) throws IOException
+  {
+    //In some databases, the final result set size is set to 65535 without setting the limit. We can set the maximum value of Integer here
+    int limit;
+    if (scanQuery.getScanRowsLimit() > Integer.MAX_VALUE) {
+      limit = Integer.MAX_VALUE;
+    } else {
+      limit = Math.toIntExact(scanQuery.getScanRowsLimit());
+    }
+    // Converting the limit from long to int could theoretically throw an ArithmeticException but this branch
+    // only runs if limit < MAX_LIMIT_FOR_IN_MEMORY_TIME_ORDERING (which should be < Integer.MAX_VALUE)
+    List<String> sortColumns = scanQuery.getOrderBys()
+                                        .stream()
+                                        .map(orderBy -> orderBy.getColumnName())
+                                        .collect(Collectors.toList());
+    Sorter<ScanResultValue> sorter = new QueueBasedSorter<>(limit, scanQuery.getOrderByNoneTimeResultOrdering());

Review Comment:
   I know Druid loves these anonymous inner classes, but you may find it easier to test if this is made into a named class. The code will also be easier to read as the number of nesting levels will be smaller.



##########
processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java:
##########
@@ -275,6 +296,66 @@ Sequence<ScanResultValue> stableLimitingSort(
     }
   }
 
+  Sequence<ScanResultValue> multiColumnSort(
+      Sequence<ScanResultValue> inputSequence,
+      ScanQuery scanQuery
+  ) throws IOException
+  {
+    //In some databases, the final result set size is set to 65535 without setting the limit. We can set the maximum value of Integer here
+    int limit;
+    if (scanQuery.getScanRowsLimit() > Integer.MAX_VALUE) {
+      limit = Integer.MAX_VALUE;
+    } else {
+      limit = Math.toIntExact(scanQuery.getScanRowsLimit());
+    }
+    // Converting the limit from long to int could theoretically throw an ArithmeticException but this branch
+    // only runs if limit < MAX_LIMIT_FOR_IN_MEMORY_TIME_ORDERING (which should be < Integer.MAX_VALUE)
+    List<String> sortColumns = scanQuery.getOrderBys()
+                                        .stream()
+                                        .map(orderBy -> orderBy.getColumnName())
+                                        .collect(Collectors.toList());
+    Sorter<ScanResultValue> sorter = new QueueBasedSorter<>(limit, scanQuery.getOrderByNoneTimeResultOrdering());
+    Yielder<ScanResultValue> yielder = Yielders.each(inputSequence);
+    try {
+      boolean doneScanning = yielder.isDone();
+      // We need to scan limit elements and anything else in the last segment
+      while (!doneScanning) {
+        ScanResultValue next = yielder.get();
+        List<ScanResultValue> singleEventScanResultValues = next.toSingleEventScanResultValues();
+        for (ScanResultValue srv : singleEventScanResultValues) {
+          // Using an intermediate unbatched ScanResultValue is not that great memory-wise, but the column list
+          // needs to be preserved for queries using the compactedList result format
+          List events = (List) (srv.getEvents());
+          for (Object event : events) {
+            List<Comparable> sortValues;
+            if (event instanceof LinkedHashMap) {
+              sortValues = sortColumns.stream()
+                                      .map(c -> ((LinkedHashMap<Object, Comparable>) event).get(c))
+                                      .collect(Collectors.toList());
+            } else {
+              sortValues = sortColumns.stream()
+                                      .map(c -> ((List<Comparable>) event).get(srv.getColumns().indexOf(c)))
+                                      .collect(Collectors.toList());
+            }
+            sorter.add(new Sorter.SorterElement<>(srv, sortValues));
+          }
+        }
+        yielder = yielder.next(null);
+        doneScanning = yielder.isDone();
+      }
+      final List<ScanResultValue> sortedElements = new ArrayList<>(sorter.size());
+      Iterators.addAll(sortedElements, sorter.drainElement());

Review Comment:
   Ideally, we'd assemble an array, sort it, and deliver results from it. That will save multiple copies of potentially large arrays. (Fortunately, we are only copying _pointers_, not the full rows.)



##########
processing/src/main/java/org/apache/druid/query/scan/ScanQueryOrderByLimitRowIterator.java:
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.scan;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Ordering;
+import org.apache.druid.collections.QueueBasedSorter;
+import org.apache.druid.collections.Sorter;
+import org.apache.druid.java.util.common.UOE;
+import org.apache.druid.java.util.common.guava.Comparators;
+import org.apache.druid.query.QueryPlus;
+import org.apache.druid.query.QueryRunner;
+import org.apache.druid.query.context.ResponseContext;
+
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class ScanQueryOrderByLimitRowIterator extends ScanQueryLimitRowIterator
+{
+
+  public ScanQueryOrderByLimitRowIterator(
+      QueryRunner<ScanResultValue> baseRunner,
+      QueryPlus<ScanResultValue> queryPlus,
+      ResponseContext responseContext
+  )
+  {
+    super(baseRunner, queryPlus, responseContext);
+  }
+
+  @Override
+  public boolean hasNext()
+  {
+    return !yielder.isDone();
+  }
+
+  @Override
+  public ScanResultValue next()
+  {
+    if (ScanQuery.ResultFormat.RESULT_FORMAT_VALUE_VECTOR.equals(resultFormat)) {
+      throw new UOE(ScanQuery.ResultFormat.RESULT_FORMAT_VALUE_VECTOR + " is not supported yet");
+    }
+    //In some databases, the final result set size is set to 65535 without setting the limit. We can set the maximum value of Integer here

Review Comment:
   Maybe leave off the comment about other DBs since, while true, it doesn't apply to this code. What actually happens is that, to avoid the need for "if limit is set do X else do Y" code, we just set the limit to larger than any possible result segment size. Since segments should be in the size of millions, having a limit of 2 billion is pretty safe: no segment will ever be that large.



##########
processing/src/main/java/org/apache/druid/query/scan/ScanQueryOrderByLimitRowIterator.java:
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.scan;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Ordering;
+import org.apache.druid.collections.QueueBasedSorter;
+import org.apache.druid.collections.Sorter;
+import org.apache.druid.java.util.common.UOE;
+import org.apache.druid.java.util.common.guava.Comparators;
+import org.apache.druid.query.QueryPlus;
+import org.apache.druid.query.QueryRunner;
+import org.apache.druid.query.context.ResponseContext;
+
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class ScanQueryOrderByLimitRowIterator extends ScanQueryLimitRowIterator
+{
+
+  public ScanQueryOrderByLimitRowIterator(
+      QueryRunner<ScanResultValue> baseRunner,
+      QueryPlus<ScanResultValue> queryPlus,
+      ResponseContext responseContext
+  )
+  {
+    super(baseRunner, queryPlus, responseContext);
+  }
+
+  @Override
+  public boolean hasNext()
+  {
+    return !yielder.isDone();
+  }
+
+  @Override
+  public ScanResultValue next()
+  {
+    if (ScanQuery.ResultFormat.RESULT_FORMAT_VALUE_VECTOR.equals(resultFormat)) {

Review Comment:
   For performance, we don't want to do this check per row. Also, it can't change per row. Suggestion: do the check during setup, not here.



##########
processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java:
##########
@@ -275,6 +296,66 @@ Sequence<ScanResultValue> stableLimitingSort(
     }
   }
 
+  Sequence<ScanResultValue> multiColumnSort(
+      Sequence<ScanResultValue> inputSequence,
+      ScanQuery scanQuery
+  ) throws IOException
+  {
+    //In some databases, the final result set size is set to 65535 without setting the limit. We can set the maximum value of Integer here
+    int limit;
+    if (scanQuery.getScanRowsLimit() > Integer.MAX_VALUE) {
+      limit = Integer.MAX_VALUE;
+    } else {
+      limit = Math.toIntExact(scanQuery.getScanRowsLimit());
+    }
+    // Converting the limit from long to int could theoretically throw an ArithmeticException but this branch
+    // only runs if limit < MAX_LIMIT_FOR_IN_MEMORY_TIME_ORDERING (which should be < Integer.MAX_VALUE)
+    List<String> sortColumns = scanQuery.getOrderBys()
+                                        .stream()
+                                        .map(orderBy -> orderBy.getColumnName())
+                                        .collect(Collectors.toList());
+    Sorter<ScanResultValue> sorter = new QueueBasedSorter<>(limit, scanQuery.getOrderByNoneTimeResultOrdering());
+    Yielder<ScanResultValue> yielder = Yielders.each(inputSequence);
+    try {
+      boolean doneScanning = yielder.isDone();
+      // We need to scan limit elements and anything else in the last segment
+      while (!doneScanning) {
+        ScanResultValue next = yielder.get();
+        List<ScanResultValue> singleEventScanResultValues = next.toSingleEventScanResultValues();
+        for (ScanResultValue srv : singleEventScanResultValues) {
+          // Using an intermediate unbatched ScanResultValue is not that great memory-wise, but the column list
+          // needs to be preserved for queries using the compactedList result format
+          List events = (List) (srv.getEvents());
+          for (Object event : events) {
+            List<Comparable> sortValues;
+            if (event instanceof LinkedHashMap) {
+              sortValues = sortColumns.stream()
+                                      .map(c -> ((LinkedHashMap<Object, Comparable>) event).get(c))
+                                      .collect(Collectors.toList());
+            } else {
+              sortValues = sortColumns.stream()
+                                      .map(c -> ((List<Comparable>) event).get(srv.getColumns().indexOf(c)))
+                                      .collect(Collectors.toList());
+            }
+            sorter.add(new Sorter.SorterElement<>(srv, sortValues));

Review Comment:
   I wonder, do we need the `SorterElement` class? I suspect that just having the `Object[]` array of column values, and a set of memorized column offsets, will be sufficient.
   
   Also, it looks like for each batch of events, we create a separate `SorterElement` this is OK, if we know that our sort key is `(__time, foo, bar, ...)` because the data is sorted in the segment by `__time`, so we will read rows in `__time` order. We only need to sort within the same time value for the other columns.
   
   But, if our sort key is `(foo, bar)` then we need to sort the full list, across all of the `ScanResultValue` batches returned from all the cursors for this segment.
   
   For now, I wouldn't worry about the leading-`__time` optimization, that can come later.
   
   Note that the unit tests won't uncover this issue because we have too few rows: you won't get multiple cursors or batches in unit tests, sadly.



##########
processing/src/main/java/org/apache/druid/query/scan/ScanQueryEngine.java:
##########
@@ -234,6 +234,7 @@ private Object getColumnValue(int i)
                             final Object value;
 
                             if (legacy && allColumns.get(i).equals(LEGACY_TIMESTAMP_KEY)) {
+                              Preconditions.checkNotNull(selector);

Review Comment:
   This is OK, and will throw an NPE if the selector is null. But, the very next line will be pretty good at throwing an NPT anyway if the selector is null.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org