You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2022/10/05 19:56:17 UTC

[GitHub] [druid] 599166320 opened a new pull request, #13168: ScanQuery supports multi column orderBy queries

599166320 opened a new pull request, #13168:
URL: https://github.com/apache/druid/pull/13168

   <!-- Thanks for trying to help us make Apache Druid be the best it can be! Please fill out as much of the following information as is possible (where relevant, and remove it when irrelevant) to help make the intention and scope of this PR clear in order to ease review. -->
   
   <!-- Please read the doc for contribution (https://github.com/apache/druid/blob/master/CONTRIBUTING.md) before making this PR. Also, once you open a PR, please _avoid using force pushes and rebasing_ since these make it difficult for reviewers to see what you've changed in response to their reviews. See [the 'If your pull request shows conflicts with master' section](https://github.com/apache/druid/blob/master/CONTRIBUTING.md#if-your-pull-request-shows-conflicts-with-master) for more details. -->
   
   Fixes [#12958](https://github.com/apache/druid/issues/12958).
   
   <!-- Replace XXXX with the id of the issue fixed in this PR. Remove this section if there is no corresponding issue. Don't reference the issue in the title of this pull-request. -->
   
   <!-- If you are a committer, follow the PR action item checklist for committers:
   https://github.com/apache/druid/blob/master/dev/committer-instructions.md#pr-and-issue-action-item-checklist-for-committers. -->
   
   ### Description
   
   <!-- Describe the goal of this PR, what problem are you fixing. If there is a corresponding issue (referenced above), it's not necessary to repeat the description here, however, you may choose to keep one summary sentence. -->
   
   <!-- Describe your patch: what did you change in code? How did you fix the problem? -->
   
   <!-- If there are several relatively logically separate changes in this PR, create a mini-section for each of them. For example: -->
   
   1. Add normal column sorting
   2. Add Inline Data Source Sorting
   3. Fix [#13152](https://github.com/apache/druid/issues/13152) inline empty data sorting
   4. Sorter interface extraction for future improvement
   5. By default, the data in the segment is sorted by `QueueBasedMultiColumnSorter`,Other sorters can be used through context parameters
   6. Merge results using `QueueBasedMultiColumnSorter`
   5. Reference  Sort style of `__time` , adding `priority queue strategy` and  `n-way merge strategy`
   
   
   <!--
   In each section, please describe design decisions made, including:
    - Choice of algorithms
    - Behavioral aspects. What configuration values are acceptable? How are corner cases and error conditions handled, such as when there are insufficient resources?
    - Class organization and design (how the logic is split between classes, inheritance, composition, design patterns)
    - Method organization and design (how the logic is split between methods, parameters and return types)
    - Naming (class, method, API, configuration, HTTP endpoint, names of emitted metrics)
   -->
   
   
   <!-- It's good to describe an alternative design (or mention an alternative name) for every design (or naming) decision point and compare the alternatives with the designs that you've implemented (or the names you've chosen) to highlight the advantages of the chosen designs and names. -->
   
   <!-- If there was a discussion of the design of the feature implemented in this PR elsewhere (e. g. a "Proposal" issue, any other issue, or a thread in the development mailing list), link to that discussion from this PR description and explain what have changed in your final design compared to your original proposal or the consensus version in the end of the discussion. If something hasn't changed since the original discussion, you can omit a detailed discussion of those aspects of the design here, perhaps apart from brief mentioning for the sake of readability of this PR description. -->
   
   <!-- Some of the aspects mentioned above may be omitted for simple and small changes. -->
   
   <hr>
   
   ##### Key changed/added classes in this PR
    * `OrderByQueryRunner`
    * `OrderBySequence`
    * `QueueBasedMultiColumnSorter`
    * `TreeSetBasedOrderByQueryRunner`
    * `TreeMultisetBasedOrderByQueryRunner`
    * `TreeMultisetBasedMulticolumnSorter` 
    * `ScanQueryRunnerFactory`
    * `MultiColumnSorter`
    * `QueueBasedMultiColumnSorter`
    * `ScanQueryQueryToolChest`
   
    
   
   
   <hr>
   
   <!-- Check the items by putting "x" in the brackets for the done things. Not all of these items apply to every PR. Remove the items which are not done or not relevant to the PR. None of the items from the checklist below are strictly necessary, but it would be very helpful if you at least self-review the PR. -->
   
   This PR has:
   - [x] been self-reviewed.
      - [x] using the [concurrency checklist](https://github.com/apache/druid/blob/master/dev/code-review/concurrency.md) (Remove this item if the PR doesn't have any relation to concurrency.)
   - [ ] added documentation for new or modified features or behaviors.
   - [ ] added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
   - [ ] added or updated version, license, or notice information in [licenses.yaml](https://github.com/apache/druid/blob/master/dev/license.md)
   - [x] added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
   - [x] added unit tests or modified existing tests to cover new code paths, ensuring the threshold for [code coverage](https://github.com/apache/druid/blob/master/dev/code-review/code-coverage.md) is met.
   - [ ] added integration tests.
   - [x] been tested in a test Druid cluster.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] paul-rogers commented on a diff in pull request #13168: ScanQuery supports multi column orderBy queries

Posted by GitBox <gi...@apache.org>.
paul-rogers commented on code in PR #13168:
URL: https://github.com/apache/druid/pull/13168#discussion_r1046251965


##########
processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java:
##########
@@ -445,6 +432,63 @@ public Ordering<ScanResultValue> getResultOrdering()
     );
   }
 
+  public List<Integer> getSortColumnIdxs()

Review Comment:
   Nit: Idxs -> Indexes
   Druid likes to avoid abbreviations (hence ourReallyLongVariableNames.)



##########
processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java:
##########
@@ -445,6 +432,63 @@ public Ordering<ScanResultValue> getResultOrdering()
     );
   }
 
+  public List<Integer> getSortColumnIdxs()
+  {
+    List<String> allColumns;
+    if (legacy && !getColumns().contains(ScanQueryEngine.LEGACY_TIMESTAMP_KEY)) {
+      allColumns = new ArrayList<>(getColumns().size() + 1);
+      allColumns.add(ScanQueryEngine.LEGACY_TIMESTAMP_KEY);
+    } else {
+      allColumns = new ArrayList<>(getColumns().size());
+    }
+
+    allColumns.addAll(getColumns());
+    return getOrderBys()
+        .stream()
+        .map(orderBy -> allColumns.indexOf(orderBy.getColumnName()))
+        .collect(Collectors.toList());
+  }
+
+  public Ordering<List<Object>> getOrderByNoneTimeResultOrdering()

Review Comment:
   Nit: improve the method name. Is this saying to get the ordering in the case where we're not ordering by __time? If so, maybe `getGenericResultOrdering()` with the time being the specialized case.



##########
processing/src/main/java/org/apache/druid/query/scan/ScanQueryEngine.java:
##########
@@ -252,6 +266,127 @@ public void cleanup(Iterator<ScanResultValue> iterFromMake)
     );
   }
 
+  public Sequence<ScanResultValue> processWithMultiColumnSort(
+      ScanQuery query,
+      boolean legacy,
+      boolean hasTimeout,
+      long timeoutAt,
+      StorageAdapter adapter,
+      List<String> allColumns,
+      List<Interval> intervals,
+      SegmentId segmentId,
+      Filter filter,
+      @Nullable final QueryMetrics<?> queryMetrics
+  )
+  {
+
+    int limit;
+    if (query.getScanRowsLimit() > Integer.MAX_VALUE) {
+      limit = Integer.MAX_VALUE;
+    } else {
+      limit = Math.toIntExact(query.getScanRowsLimit());
+    }
+
+    return Sequences.concat(
+        adapter
+            .makeCursors(
+                filter,
+                intervals.get(0),
+                query.getVirtualColumns(),
+                Granularities.ALL,
+                query.getTimeOrder().equals(ScanQuery.Order.DESCENDING) ||
+                (query.getTimeOrder().equals(ScanQuery.Order.NONE) && query.isDescending()),
+                queryMetrics
+            )
+            .map(cursor -> new BaseSequence<>(
+                new BaseSequence.IteratorMaker<ScanResultValue, Iterator<ScanResultValue>>()
+                {
+                  @Override
+                  public Iterator<ScanResultValue> make()
+                  {
+                    final List<BaseObjectColumnValueSelector> columnSelectors = new ArrayList<>(allColumns.size());
+
+                    for (String column : allColumns) {
+                      final BaseObjectColumnValueSelector selector;
+
+                      if (legacy && LEGACY_TIMESTAMP_KEY.equals(column)) {
+                        selector = cursor.getColumnSelectorFactory()
+                                         .makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME);
+                      } else {
+                        selector = cursor.getColumnSelectorFactory().makeColumnValueSelector(column);
+                      }
+
+                      columnSelectors.add(selector);
+                    }
+
+                    return new Iterator<ScanResultValue>()
+                    {
+                      @Override
+                      public boolean hasNext()
+                      {
+                        return !cursor.isDone();
+                      }
+
+                      @Override
+                      public ScanResultValue next()
+                      {
+                        if (!hasNext()) {
+                          throw new NoSuchElementException();
+                        }
+                        if (hasTimeout && System.currentTimeMillis() >= timeoutAt) {
+                          throw new QueryTimeoutException(StringUtils.nonStrictFormat("Query [%s] timed out", query.getId()));
+                        }
+                        Sorter<Object> sorter = new QueueBasedSorter<>(limit, query.getOrderByNoneTimeResultOrdering());
+                        rowsToCompactedList(sorter);
+                        final List<List<Object>> sortedElements = new ArrayList<>(sorter.size());
+                        Iterators.addAll(sortedElements, sorter.drainElement());
+                        return new ScanResultValue(segmentId.toString(), allColumns, sortedElements);
+                      }
+
+                      @Override
+                      public void remove()
+                      {
+                        throw new UnsupportedOperationException();
+                      }
+
+                      private void rowsToCompactedList(Sorter<Object> sorter)

Review Comment:
   This isn't really a `rowsToCompactedList` is it? More like a `rowsToSorter`?



##########
processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java:
##########
@@ -496,6 +540,30 @@ public ScanQuery withOverriddenContext(Map<String, Object> contextOverrides)
     return Druids.ScanQueryBuilder.copy(this).context(computeOverriddenContext(getContext(), contextOverrides)).build();
   }
 
+  /**
+   * Report whether the sort can be pushed into the Cursor, or must be done as a
+   * separate sort step.
+   */
+  public boolean canPushSort()

Review Comment:
   Nit: `ScanQuery` is a public class. The public API is cleaner if we separate out implementation stuff from query definition stuff. Suggestion, move these new methods elsewhere, say a `ScanQueries` class or on one of the implementation classes.



##########
processing/src/main/java/org/apache/druid/query/scan/ScanQueryEngine.java:
##########
@@ -252,6 +266,127 @@ public void cleanup(Iterator<ScanResultValue> iterFromMake)
     );
   }
 
+  public Sequence<ScanResultValue> processWithMultiColumnSort(
+      ScanQuery query,
+      boolean legacy,
+      boolean hasTimeout,
+      long timeoutAt,
+      StorageAdapter adapter,
+      List<String> allColumns,
+      List<Interval> intervals,
+      SegmentId segmentId,
+      Filter filter,
+      @Nullable final QueryMetrics<?> queryMetrics
+  )
+  {
+
+    int limit;
+    if (query.getScanRowsLimit() > Integer.MAX_VALUE) {
+      limit = Integer.MAX_VALUE;
+    } else {
+      limit = Math.toIntExact(query.getScanRowsLimit());
+    }
+
+    return Sequences.concat(
+        adapter
+            .makeCursors(
+                filter,
+                intervals.get(0),
+                query.getVirtualColumns(),
+                Granularities.ALL,
+                query.getTimeOrder().equals(ScanQuery.Order.DESCENDING) ||
+                (query.getTimeOrder().equals(ScanQuery.Order.NONE) && query.isDescending()),
+                queryMetrics
+            )
+            .map(cursor -> new BaseSequence<>(
+                new BaseSequence.IteratorMaker<ScanResultValue, Iterator<ScanResultValue>>()
+                {
+                  @Override
+                  public Iterator<ScanResultValue> make()
+                  {
+                    final List<BaseObjectColumnValueSelector> columnSelectors = new ArrayList<>(allColumns.size());
+
+                    for (String column : allColumns) {
+                      final BaseObjectColumnValueSelector selector;
+
+                      if (legacy && LEGACY_TIMESTAMP_KEY.equals(column)) {
+                        selector = cursor.getColumnSelectorFactory()
+                                         .makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME);
+                      } else {
+                        selector = cursor.getColumnSelectorFactory().makeColumnValueSelector(column);
+                      }
+
+                      columnSelectors.add(selector);
+                    }
+
+                    return new Iterator<ScanResultValue>()
+                    {
+                      @Override
+                      public boolean hasNext()
+                      {
+                        return !cursor.isDone();
+                      }
+
+                      @Override
+                      public ScanResultValue next()
+                      {
+                        if (!hasNext()) {
+                          throw new NoSuchElementException();
+                        }
+                        if (hasTimeout && System.currentTimeMillis() >= timeoutAt) {
+                          throw new QueryTimeoutException(StringUtils.nonStrictFormat("Query [%s] timed out", query.getId()));
+                        }
+                        Sorter<Object> sorter = new QueueBasedSorter<>(limit, query.getOrderByNoneTimeResultOrdering());
+                        rowsToCompactedList(sorter);
+                        final List<List<Object>> sortedElements = new ArrayList<>(sorter.size());
+                        Iterators.addAll(sortedElements, sorter.drainElement());
+                        return new ScanResultValue(segmentId.toString(), allColumns, sortedElements);
+                      }
+
+                      @Override
+                      public void remove()
+                      {
+                        throw new UnsupportedOperationException();
+                      }
+
+                      private void rowsToCompactedList(Sorter<Object> sorter)
+                      {
+                        for (; !cursor.isDone(); cursor.advance()) {

Review Comment:
   We want to apply a limit. Yet, is unlimited. Are we relying on the cursor to apply the limit?
   
   A scan query can return multiple cursors (one for each time chunk, I believe; I'm a bit unclear on the details.) In that case, the limit has to be adjusted for each; or we need to enforce an outer limit here.



##########
processing/src/main/java/org/apache/druid/query/scan/ScanQueryEngine.java:
##########
@@ -252,6 +266,127 @@ public void cleanup(Iterator<ScanResultValue> iterFromMake)
     );
   }
 
+  public Sequence<ScanResultValue> processWithMultiColumnSort(
+      ScanQuery query,
+      boolean legacy,
+      boolean hasTimeout,
+      long timeoutAt,
+      StorageAdapter adapter,
+      List<String> allColumns,
+      List<Interval> intervals,
+      SegmentId segmentId,
+      Filter filter,
+      @Nullable final QueryMetrics<?> queryMetrics
+  )
+  {
+
+    int limit;
+    if (query.getScanRowsLimit() > Integer.MAX_VALUE) {
+      limit = Integer.MAX_VALUE;
+    } else {
+      limit = Math.toIntExact(query.getScanRowsLimit());
+    }
+
+    return Sequences.concat(
+        adapter
+            .makeCursors(
+                filter,
+                intervals.get(0),
+                query.getVirtualColumns(),
+                Granularities.ALL,
+                query.getTimeOrder().equals(ScanQuery.Order.DESCENDING) ||
+                (query.getTimeOrder().equals(ScanQuery.Order.NONE) && query.isDescending()),
+                queryMetrics
+            )
+            .map(cursor -> new BaseSequence<>(
+                new BaseSequence.IteratorMaker<ScanResultValue, Iterator<ScanResultValue>>()
+                {
+                  @Override
+                  public Iterator<ScanResultValue> make()
+                  {
+                    final List<BaseObjectColumnValueSelector> columnSelectors = new ArrayList<>(allColumns.size());
+
+                    for (String column : allColumns) {
+                      final BaseObjectColumnValueSelector selector;
+
+                      if (legacy && LEGACY_TIMESTAMP_KEY.equals(column)) {
+                        selector = cursor.getColumnSelectorFactory()
+                                         .makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME);
+                      } else {
+                        selector = cursor.getColumnSelectorFactory().makeColumnValueSelector(column);
+                      }
+
+                      columnSelectors.add(selector);
+                    }
+
+                    return new Iterator<ScanResultValue>()
+                    {
+                      @Override
+                      public boolean hasNext()
+                      {
+                        return !cursor.isDone();
+                      }
+
+                      @Override
+                      public ScanResultValue next()
+                      {
+                        if (!hasNext()) {
+                          throw new NoSuchElementException();
+                        }
+                        if (hasTimeout && System.currentTimeMillis() >= timeoutAt) {
+                          throw new QueryTimeoutException(StringUtils.nonStrictFormat("Query [%s] timed out", query.getId()));
+                        }
+                        Sorter<Object> sorter = new QueueBasedSorter<>(limit, query.getOrderByNoneTimeResultOrdering());
+                        rowsToCompactedList(sorter);
+                        final List<List<Object>> sortedElements = new ArrayList<>(sorter.size());
+                        Iterators.addAll(sortedElements, sorter.drainElement());
+                        return new ScanResultValue(segmentId.toString(), allColumns, sortedElements);
+                      }
+
+                      @Override
+                      public void remove()

Review Comment:
   This method has had a default implementation in Java for quite a few years now, as it turns out.



##########
processing/src/main/java/org/apache/druid/query/scan/ScanQueryOrderBySequence.java:
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.scan;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.common.guava.GuavaUtils;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.guava.BaseSequence;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.query.AbstractPrioritizedQueryRunnerCallable;
+import org.apache.druid.query.QueryContext;
+import org.apache.druid.query.QueryInterruptedException;
+import org.apache.druid.query.QueryPlus;
+import org.apache.druid.query.QueryProcessingPool;
+import org.apache.druid.query.QueryRunner;
+import org.apache.druid.query.QueryTimeoutException;
+import org.apache.druid.query.context.ResponseContext;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Scan the segments in parallel, complete the sorting of each batch within each segment, and then complete the sorting of each segment level
+ */
+public class ScanQueryOrderBySequence extends BaseSequence<ScanResultValue, Iterator<ScanResultValue>>
+{
+  public ScanQueryOrderBySequence(
+      QueryPlus<ScanResultValue> queryPlus,
+      QueryProcessingPool queryProcessingPool,
+      Iterable<QueryRunner<ScanResultValue>> queryables,
+      ResponseContext responseContext
+  )
+  {
+    super(new ScanQueryOrderByIteratorMaker(queryPlus, queryProcessingPool, queryables, responseContext));
+  }
+
+  static class ScanQueryOrderByIteratorMaker
+      implements BaseSequence.IteratorMaker<ScanResultValue, Iterator<ScanResultValue>>
+  {
+    private static final Logger log = new Logger(ScanQueryOrderByIteratorMaker.class);
+    private final QueryPlus<ScanResultValue> threadSafeQueryPlus;
+    private final ResponseContext responseContext;
+    private final Iterable<QueryRunner<ScanResultValue>> queryables;
+    private final QueryProcessingPool queryProcessingPool;
+
+    ScanQueryOrderByIteratorMaker(
+        QueryPlus<ScanResultValue> queryPlus,
+        QueryProcessingPool queryProcessingPool,
+        Iterable<QueryRunner<ScanResultValue>> queryables,
+        ResponseContext responseContext
+    )
+    {
+      this.threadSafeQueryPlus = queryPlus.withoutThreadUnsafeState();
+      this.responseContext = responseContext;
+      this.queryables = queryables;
+      this.queryProcessingPool = queryProcessingPool;
+    }
+
+    @Override
+    public Iterator<ScanResultValue> make()
+    {
+      ScanQuery query = (ScanQuery) threadSafeQueryPlus.getQuery();
+      QueryContext queryContext = QueryContext.of(query.getContext());
+      List<ListenableFuture<ScanResultValue>> futures =
+          Lists.newArrayList(
+              Iterables.transform(
+                  queryables,
+                  input -> {
+                    if (input == null) {
+                      throw new ISE("Null queryRunner! Looks to be some segment unmapping action happening");
+                    }
+
+                    return queryProcessingPool.submitRunnerTask(
+                        new AbstractPrioritizedQueryRunnerCallable<ScanResultValue, ScanResultValue>(
+                            queryContext.getPriority(),
+                            input
+                        )
+                        {
+                          @Override
+                          public ScanResultValue call()
+                          {
+                            try {
+                              Sequence<ScanResultValue> result = input.run(threadSafeQueryPlus, responseContext);
+                              if (result == null) {
+                                throw new ISE("Got a null result! Segments are missing!");
+                              }
+
+                              Iterator<ScanResultValue> it = result.toList().iterator();
+                              List<List<Object>> eventList = new ArrayList<>();
+                              List<String> columns = new ArrayList<>();
+                              String segmentId = null;
+                              while (it.hasNext()) {

Review Comment:
   Maybe a comment to explain what we're doing? Looks like we read a set of scan result values and combine their rows into one bug scan result value. Again, the idea with scan query is to limit batch size. Does something else where break the results back into batches? Or, maybe copies these values into a sorter? If copy into a sorter, should that be done here, then later merge the results?



##########
processing/src/main/java/org/apache/druid/query/scan/ScanQueryEngine.java:
##########
@@ -66,7 +72,12 @@ public Sequence<ScanResultValue> process(
       @Nullable final QueryMetrics<?> queryMetrics
   )
   {
-    if (segment.asQueryableIndex() != null && segment.asQueryableIndex().isFromTombstone()) {
+    QueryableIndex queryableIndex = segment.asQueryableIndex();
+    if (!query.canPushSort() && Objects.nonNull(queryableIndex)) {
+      return new QueryableIndexOrderbyRunner().process(query, segment, responseContext, queryMetrics, queryableIndex);
+    }
+
+    if (queryableIndex != null && queryableIndex.isFromTombstone()) {

Review Comment:
   Probably want to do this first? There is nothing to sort in a tombstone.



##########
processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java:
##########
@@ -496,6 +540,30 @@ public ScanQuery withOverriddenContext(Map<String, Object> contextOverrides)
     return Druids.ScanQueryBuilder.copy(this).context(computeOverriddenContext(getContext(), contextOverrides)).build();
   }
 
+  /**
+   * Report whether the sort can be pushed into the Cursor, or must be done as a
+   * separate sort step.
+   */
+  public boolean canPushSort()
+  {
+    // Can push non-existent sort.
+    if (orderBys.size() == 0) {
+      return true;
+    }
+    // Cursor can sort by only one column.
+    if (orderBys.size() > 1) {
+      return false;
+    }
+    // Inline datasources can't sort
+    if (getDataSource() instanceof InlineDataSource) {
+      return false;
+    }
+    // Cursor can't sort by the __time column
+    return ColumnHolder.TIME_COLUMN_NAME.equals(orderBys.get(0).getColumnName());

Review Comment:
   I thought the cursor _can_ sort by __time (since that's how the data is organize), but not by anything else?



##########
processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java:
##########
@@ -445,6 +432,63 @@ public Ordering<ScanResultValue> getResultOrdering()
     );
   }
 
+  public List<Integer> getSortColumnIdxs()
+  {
+    List<String> allColumns;
+    if (legacy && !getColumns().contains(ScanQueryEngine.LEGACY_TIMESTAMP_KEY)) {
+      allColumns = new ArrayList<>(getColumns().size() + 1);
+      allColumns.add(ScanQueryEngine.LEGACY_TIMESTAMP_KEY);
+    } else {
+      allColumns = new ArrayList<>(getColumns().size());
+    }
+
+    allColumns.addAll(getColumns());
+    return getOrderBys()
+        .stream()
+        .map(orderBy -> allColumns.indexOf(orderBy.getColumnName()))

Review Comment:
   Here, has some prior code ensured that the order by contains only column names, and that each column name is projected? Else, we'll have -1 values in the list.



##########
processing/src/main/java/org/apache/druid/query/scan/ScanQueryOrderByMergeRowIterator.java:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.scan;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Ordering;
+import org.apache.druid.collections.QueueBasedSorter;
+import org.apache.druid.collections.Sorter;
+import org.apache.druid.java.util.common.UOE;
+import org.apache.druid.java.util.common.guava.Comparators;
+import org.apache.druid.query.QueryPlus;
+import org.apache.druid.query.QueryRunner;
+import org.apache.druid.query.context.ResponseContext;
+import org.apache.druid.utils.CollectionUtils;
+
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class ScanQueryOrderByMergeRowIterator extends ScanQueryLimitRowIterator
+{
+
+  public ScanQueryOrderByMergeRowIterator(
+      QueryRunner<ScanResultValue> baseRunner,
+      QueryPlus<ScanResultValue> queryPlus,
+      ResponseContext responseContext
+  )
+  {
+    super(baseRunner, queryPlus, responseContext);
+    if (ScanQuery.ResultFormat.RESULT_FORMAT_VALUE_VECTOR.equals(resultFormat)) {
+      throw new UOE(ScanQuery.ResultFormat.RESULT_FORMAT_VALUE_VECTOR + " is not supported yet");
+    }
+  }
+
+  @Override
+  public boolean hasNext()
+  {
+    return !yielder.isDone();
+  }
+
+  @Override
+  public ScanResultValue next()
+  {
+
+    final int scanRowsLimit;
+    if (query.getScanRowsLimit() > Integer.MAX_VALUE) {
+      scanRowsLimit = Integer.MAX_VALUE;
+    } else {
+      scanRowsLimit = Math.toIntExact(query.getScanRowsLimit());
+    }
+
+    List<String> orderByDirection = query.getOrderBys()
+                                         .stream()
+                                         .map(orderBy -> orderBy.getOrder().toString())
+                                         .collect(Collectors.toList());
+    List<Integer> sortColumnIdxs = query.getSortColumnIdxs();
+
+    Ordering<Comparable>[] orderings = new Ordering[orderByDirection.size()];

Review Comment:
   In an ideal world, this code wouldn't be copy/pasted here and above. Maybe have one copy, in that "scan query utils" class: `ScanQueries` or `ScanQueryUtils`.



##########
processing/src/main/java/org/apache/druid/query/scan/ScanQueryEngine.java:
##########
@@ -252,6 +266,127 @@ public void cleanup(Iterator<ScanResultValue> iterFromMake)
     );
   }
 
+  public Sequence<ScanResultValue> processWithMultiColumnSort(
+      ScanQuery query,
+      boolean legacy,
+      boolean hasTimeout,
+      long timeoutAt,
+      StorageAdapter adapter,
+      List<String> allColumns,
+      List<Interval> intervals,
+      SegmentId segmentId,
+      Filter filter,
+      @Nullable final QueryMetrics<?> queryMetrics
+  )
+  {
+
+    int limit;
+    if (query.getScanRowsLimit() > Integer.MAX_VALUE) {
+      limit = Integer.MAX_VALUE;
+    } else {
+      limit = Math.toIntExact(query.getScanRowsLimit());
+    }
+
+    return Sequences.concat(
+        adapter
+            .makeCursors(
+                filter,
+                intervals.get(0),
+                query.getVirtualColumns(),
+                Granularities.ALL,
+                query.getTimeOrder().equals(ScanQuery.Order.DESCENDING) ||
+                (query.getTimeOrder().equals(ScanQuery.Order.NONE) && query.isDescending()),
+                queryMetrics
+            )
+            .map(cursor -> new BaseSequence<>(
+                new BaseSequence.IteratorMaker<ScanResultValue, Iterator<ScanResultValue>>()
+                {
+                  @Override
+                  public Iterator<ScanResultValue> make()
+                  {
+                    final List<BaseObjectColumnValueSelector> columnSelectors = new ArrayList<>(allColumns.size());
+
+                    for (String column : allColumns) {
+                      final BaseObjectColumnValueSelector selector;
+
+                      if (legacy && LEGACY_TIMESTAMP_KEY.equals(column)) {
+                        selector = cursor.getColumnSelectorFactory()
+                                         .makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME);
+                      } else {
+                        selector = cursor.getColumnSelectorFactory().makeColumnValueSelector(column);
+                      }
+
+                      columnSelectors.add(selector);
+                    }
+
+                    return new Iterator<ScanResultValue>()
+                    {
+                      @Override
+                      public boolean hasNext()
+                      {
+                        return !cursor.isDone();
+                      }
+
+                      @Override
+                      public ScanResultValue next()
+                      {
+                        if (!hasNext()) {
+                          throw new NoSuchElementException();
+                        }
+                        if (hasTimeout && System.currentTimeMillis() >= timeoutAt) {
+                          throw new QueryTimeoutException(StringUtils.nonStrictFormat("Query [%s] timed out", query.getId()));
+                        }
+                        Sorter<Object> sorter = new QueueBasedSorter<>(limit, query.getOrderByNoneTimeResultOrdering());
+                        rowsToCompactedList(sorter);
+                        final List<List<Object>> sortedElements = new ArrayList<>(sorter.size());
+                        Iterators.addAll(sortedElements, sorter.drainElement());
+                        return new ScanResultValue(segmentId.toString(), allColumns, sortedElements);
+                      }
+
+                      @Override
+                      public void remove()
+                      {
+                        throw new UnsupportedOperationException();
+                      }
+
+                      private void rowsToCompactedList(Sorter<Object> sorter)
+                      {
+                        for (; !cursor.isDone(); cursor.advance()) {
+                          final List<Object> theEvent = new ArrayList<>(allColumns.size());

Review Comment:
   We're buffering the values. Using a list of objects is rather expensive. We know the column count. Perhaps we can use a simple array as our buffered row value. Translate that value to either a list or map when creating the scan result values.



##########
processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java:
##########
@@ -445,6 +432,63 @@ public Ordering<ScanResultValue> getResultOrdering()
     );
   }
 
+  public List<Integer> getSortColumnIdxs()
+  {
+    List<String> allColumns;
+    if (legacy && !getColumns().contains(ScanQueryEngine.LEGACY_TIMESTAMP_KEY)) {
+      allColumns = new ArrayList<>(getColumns().size() + 1);
+      allColumns.add(ScanQueryEngine.LEGACY_TIMESTAMP_KEY);
+    } else {
+      allColumns = new ArrayList<>(getColumns().size());
+    }
+
+    allColumns.addAll(getColumns());
+    return getOrderBys()
+        .stream()
+        .map(orderBy -> allColumns.indexOf(orderBy.getColumnName()))
+        .collect(Collectors.toList());
+  }
+
+  public Ordering<List<Object>> getOrderByNoneTimeResultOrdering()
+  {
+    List<String> orderByDirection = getOrderBys().stream()
+                                                 .map(orderBy -> orderBy.getOrder().toString())
+                                                 .collect(Collectors.toList());
+
+
+    Ordering<Comparable>[] orderings = new Ordering[orderByDirection.size()];
+    for (int i = 0; i < orderByDirection.size(); i++) {
+      orderings[i] = ScanQuery.Order.ASCENDING.equals(ScanQuery.Order.fromString(orderByDirection.get(i)))
+                     ? Comparators.naturalNullsFirst()
+                     : Comparators.<Comparable>naturalNullsFirst().reverse();
+    }
+
+    Comparator<List<Object>> comparator = new Comparator<List<Object>>()
+    {
+
+      List<Integer> sortColumnIdxs = getSortColumnIdxs();
+
+      @Override
+      public int compare(
+          List<Object> o1,
+          List<Object> o2
+      )
+      {
+        for (int i = 0; i < sortColumnIdxs.size(); i++) {
+          int compare = orderings[i].compare(
+              (Comparable) o1.get(sortColumnIdxs.get(i)),
+              (Comparable) o2.get(sortColumnIdxs.get(i))

Review Comment:
   Nit: since this is an inner loop, optimize a bit. Store the indexes in an array (not list). Compute the list once per run, not once per comparison.
   
   Then, get the index once per loop pass, not twice.
   
   That is, precompute as much as possible.



##########
processing/src/main/java/org/apache/druid/query/scan/ScanQueryOrderBySequence.java:
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.scan;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.common.guava.GuavaUtils;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.guava.BaseSequence;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.query.AbstractPrioritizedQueryRunnerCallable;
+import org.apache.druid.query.QueryContext;
+import org.apache.druid.query.QueryInterruptedException;
+import org.apache.druid.query.QueryPlus;
+import org.apache.druid.query.QueryProcessingPool;
+import org.apache.druid.query.QueryRunner;
+import org.apache.druid.query.QueryTimeoutException;
+import org.apache.druid.query.context.ResponseContext;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Scan the segments in parallel, complete the sorting of each batch within each segment, and then complete the sorting of each segment level
+ */

Review Comment:
   Let's think about this.
   
   Druid already handles parallel execution of segment scans within a historical. Are we redoing that here? Or, did the previous code not actually run in parallel?
   
   We can't run _cursors_ in parallel because they scan the same segment: in a low-memory condition, we might thrash the OS cache as we try to load all the segment slices in memory at the same time.
   
   Druid's concurrency model is to run a limited number of threads, with those threads picking up tasks ready to run. If we have, say, 20 cursors for a segment, we'll have 20 tasks competing a for a small number of threads. Will the extra complexity actually be faster than running the cursors in sequence?
   
   Would be great to explain the parallelism model we're implementing to answer those questions for future readers.



##########
processing/src/main/java/org/apache/druid/query/scan/ScanQueryEngine.java:
##########
@@ -252,6 +266,127 @@ public void cleanup(Iterator<ScanResultValue> iterFromMake)
     );
   }
 
+  public Sequence<ScanResultValue> processWithMultiColumnSort(
+      ScanQuery query,
+      boolean legacy,
+      boolean hasTimeout,
+      long timeoutAt,
+      StorageAdapter adapter,
+      List<String> allColumns,
+      List<Interval> intervals,
+      SegmentId segmentId,
+      Filter filter,
+      @Nullable final QueryMetrics<?> queryMetrics
+  )
+  {
+
+    int limit;
+    if (query.getScanRowsLimit() > Integer.MAX_VALUE) {
+      limit = Integer.MAX_VALUE;
+    } else {
+      limit = Math.toIntExact(query.getScanRowsLimit());
+    }
+
+    return Sequences.concat(
+        adapter
+            .makeCursors(
+                filter,
+                intervals.get(0),
+                query.getVirtualColumns(),
+                Granularities.ALL,
+                query.getTimeOrder().equals(ScanQuery.Order.DESCENDING) ||
+                (query.getTimeOrder().equals(ScanQuery.Order.NONE) && query.isDescending()),
+                queryMetrics
+            )
+            .map(cursor -> new BaseSequence<>(
+                new BaseSequence.IteratorMaker<ScanResultValue, Iterator<ScanResultValue>>()
+                {
+                  @Override
+                  public Iterator<ScanResultValue> make()
+                  {
+                    final List<BaseObjectColumnValueSelector> columnSelectors = new ArrayList<>(allColumns.size());
+
+                    for (String column : allColumns) {
+                      final BaseObjectColumnValueSelector selector;
+
+                      if (legacy && LEGACY_TIMESTAMP_KEY.equals(column)) {
+                        selector = cursor.getColumnSelectorFactory()
+                                         .makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME);
+                      } else {
+                        selector = cursor.getColumnSelectorFactory().makeColumnValueSelector(column);
+                      }
+
+                      columnSelectors.add(selector);
+                    }
+
+                    return new Iterator<ScanResultValue>()
+                    {
+                      @Override
+                      public boolean hasNext()
+                      {
+                        return !cursor.isDone();
+                      }
+
+                      @Override
+                      public ScanResultValue next()
+                      {
+                        if (!hasNext()) {
+                          throw new NoSuchElementException();
+                        }
+                        if (hasTimeout && System.currentTimeMillis() >= timeoutAt) {
+                          throw new QueryTimeoutException(StringUtils.nonStrictFormat("Query [%s] timed out", query.getId()));
+                        }
+                        Sorter<Object> sorter = new QueueBasedSorter<>(limit, query.getOrderByNoneTimeResultOrdering());
+                        rowsToCompactedList(sorter);
+                        final List<List<Object>> sortedElements = new ArrayList<>(sorter.size());
+                        Iterators.addAll(sortedElements, sorter.drainElement());

Review Comment:
   Two items. First, Scan query supports two internal formats: a list (really, a map) and a compact list. We probably need to support both as the output format for the scan result value. Unless someone can confirm that we never use the non-compact list format any longer.
   
   Second, scan query has a batch size; When returning scan result values, we need to divide up the results into batches no larger than the limit. This means your iterator will be called more than once, if the row count exceeds the batch size.
   
   Although Druid loves deeply nested inner classes, your reviewers would appreciate if such complex classes were converted to top-level classes: either in their own files, or as a nested, named class. The usual arguments against excessive code complexity apply.



##########
processing/src/main/java/org/apache/druid/query/scan/ScanQueryOrderByMergeRowIterator.java:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.scan;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Ordering;
+import org.apache.druid.collections.QueueBasedSorter;
+import org.apache.druid.collections.Sorter;
+import org.apache.druid.java.util.common.UOE;
+import org.apache.druid.java.util.common.guava.Comparators;
+import org.apache.druid.query.QueryPlus;
+import org.apache.druid.query.QueryRunner;
+import org.apache.druid.query.context.ResponseContext;
+import org.apache.druid.utils.CollectionUtils;
+
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class ScanQueryOrderByMergeRowIterator extends ScanQueryLimitRowIterator
+{
+
+  public ScanQueryOrderByMergeRowIterator(
+      QueryRunner<ScanResultValue> baseRunner,
+      QueryPlus<ScanResultValue> queryPlus,
+      ResponseContext responseContext
+  )
+  {
+    super(baseRunner, queryPlus, responseContext);
+    if (ScanQuery.ResultFormat.RESULT_FORMAT_VALUE_VECTOR.equals(resultFormat)) {
+      throw new UOE(ScanQuery.ResultFormat.RESULT_FORMAT_VALUE_VECTOR + " is not supported yet");
+    }
+  }
+
+  @Override
+  public boolean hasNext()
+  {
+    return !yielder.isDone();
+  }
+
+  @Override
+  public ScanResultValue next()
+  {
+
+    final int scanRowsLimit;
+    if (query.getScanRowsLimit() > Integer.MAX_VALUE) {
+      scanRowsLimit = Integer.MAX_VALUE;
+    } else {
+      scanRowsLimit = Math.toIntExact(query.getScanRowsLimit());
+    }
+
+    List<String> orderByDirection = query.getOrderBys()
+                                         .stream()
+                                         .map(orderBy -> orderBy.getOrder().toString())
+                                         .collect(Collectors.toList());
+    List<Integer> sortColumnIdxs = query.getSortColumnIdxs();
+
+    Ordering<Comparable>[] orderings = new Ordering[orderByDirection.size()];
+    for (int i = 0; i < orderByDirection.size(); i++) {
+      orderings[i] = ScanQuery.Order.ASCENDING.equals(ScanQuery.Order.fromString(orderByDirection.get(i)))
+                     ? Comparators.naturalNullsFirst()
+                     : Comparators.<Comparable>naturalNullsFirst().reverse();
+    }
+
+    Comparator<List<Object>> comparator = (o1, o2) -> {
+      for (int i = 0; i < sortColumnIdxs.size(); i++) {
+        int compare = orderings[i].compare(
+            (Comparable) o1.get(sortColumnIdxs.get(i)),
+            (Comparable) o2.get(sortColumnIdxs.get(i))
+        );
+        if (compare != 0) {
+          return compare;
+        }
+      }
+      return 0;
+    };
+    Sorter<Object> sorter = new QueueBasedSorter<Object>(scanRowsLimit, comparator);
+    List<String> columns = new ArrayList<>();
+    while (!yielder.isDone()) {
+      ScanResultValue srv = yielder.get();
+      columns = columns.isEmpty() ? srv.getColumns() : columns;
+      List<List<Object>> events = (List<List<Object>>) srv.getEvents();
+      for (Object event : events) {
+        if (event instanceof LinkedHashMap) {

Review Comment:
   This is not ideal. The scan result value carries a format type: list or compact list. A list is a list of maps. A compact list is a list of lists. All incoming result values will be one or the other (depending on a setting in the query). This gives us two choices:
   
   * Load all items in their list or map forms, sort using that form, and recreate outgoing batches, or
   * Convert all items to one format (a list say), sort, then convert back (to a map, if needed).



##########
processing/src/main/java/org/apache/druid/query/scan/ScanQueryOrderByMergeRowIterator.java:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.scan;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Ordering;
+import org.apache.druid.collections.QueueBasedSorter;
+import org.apache.druid.collections.Sorter;
+import org.apache.druid.java.util.common.UOE;
+import org.apache.druid.java.util.common.guava.Comparators;
+import org.apache.druid.query.QueryPlus;
+import org.apache.druid.query.QueryRunner;
+import org.apache.druid.query.context.ResponseContext;
+import org.apache.druid.utils.CollectionUtils;
+
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class ScanQueryOrderByMergeRowIterator extends ScanQueryLimitRowIterator
+{
+
+  public ScanQueryOrderByMergeRowIterator(
+      QueryRunner<ScanResultValue> baseRunner,
+      QueryPlus<ScanResultValue> queryPlus,
+      ResponseContext responseContext
+  )
+  {
+    super(baseRunner, queryPlus, responseContext);
+    if (ScanQuery.ResultFormat.RESULT_FORMAT_VALUE_VECTOR.equals(resultFormat)) {
+      throw new UOE(ScanQuery.ResultFormat.RESULT_FORMAT_VALUE_VECTOR + " is not supported yet");
+    }
+  }
+
+  @Override
+  public boolean hasNext()
+  {
+    return !yielder.isDone();
+  }
+
+  @Override
+  public ScanResultValue next()
+  {
+
+    final int scanRowsLimit;
+    if (query.getScanRowsLimit() > Integer.MAX_VALUE) {
+      scanRowsLimit = Integer.MAX_VALUE;
+    } else {
+      scanRowsLimit = Math.toIntExact(query.getScanRowsLimit());
+    }
+
+    List<String> orderByDirection = query.getOrderBys()
+                                         .stream()
+                                         .map(orderBy -> orderBy.getOrder().toString())
+                                         .collect(Collectors.toList());
+    List<Integer> sortColumnIdxs = query.getSortColumnIdxs();
+
+    Ordering<Comparable>[] orderings = new Ordering[orderByDirection.size()];
+    for (int i = 0; i < orderByDirection.size(); i++) {
+      orderings[i] = ScanQuery.Order.ASCENDING.equals(ScanQuery.Order.fromString(orderByDirection.get(i)))
+                     ? Comparators.naturalNullsFirst()
+                     : Comparators.<Comparable>naturalNullsFirst().reverse();
+    }
+
+    Comparator<List<Object>> comparator = (o1, o2) -> {
+      for (int i = 0; i < sortColumnIdxs.size(); i++) {
+        int compare = orderings[i].compare(
+            (Comparable) o1.get(sortColumnIdxs.get(i)),
+            (Comparable) o2.get(sortColumnIdxs.get(i))
+        );
+        if (compare != 0) {
+          return compare;
+        }
+      }
+      return 0;
+    };
+    Sorter<Object> sorter = new QueueBasedSorter<Object>(scanRowsLimit, comparator);
+    List<String> columns = new ArrayList<>();
+    while (!yielder.isDone()) {
+      ScanResultValue srv = yielder.get();
+      columns = columns.isEmpty() ? srv.getColumns() : columns;
+      List<List<Object>> events = (List<List<Object>>) srv.getEvents();
+      for (Object event : events) {
+        if (event instanceof LinkedHashMap) {
+          sorter.add(Arrays.asList(((LinkedHashMap) event).values().toArray()));
+        } else {
+          sorter.add((List<Object>) event);
+        }
+      }
+      yielder = yielder.next(null);
+      count++;
+    }
+    final List<List<Object>> sortedElements = new ArrayList<>(sorter.size());

Review Comment:
   As in the earlier note: a scan result value has a maximum batch size. We have to deliver results in batches no larger than that size.
   
   Since "deliver of limited size batches from a sorter" is common between the merge and scan engine steps, it would be great if we could factor out a common implementation. That way, I don't have to comment on basically the same code twice. DRY and all that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] paul-rogers commented on a diff in pull request #13168: ScanQuery supports multi column orderBy queries

Posted by GitBox <gi...@apache.org>.
paul-rogers commented on code in PR #13168:
URL: https://github.com/apache/druid/pull/13168#discussion_r990736577


##########
processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java:
##########
@@ -275,6 +296,66 @@ Sequence<ScanResultValue> stableLimitingSort(
     }
   }
 
+  Sequence<ScanResultValue> multiColumnSort(
+      Sequence<ScanResultValue> inputSequence,
+      ScanQuery scanQuery
+  ) throws IOException
+  {
+    //In some databases, the final result set size is set to 65535 without setting the limit. We can set the maximum value of Integer here
+    int limit;
+    if (scanQuery.getScanRowsLimit() > Integer.MAX_VALUE) {
+      limit = Integer.MAX_VALUE;
+    } else {
+      limit = Math.toIntExact(scanQuery.getScanRowsLimit());
+    }
+    // Converting the limit from long to int could theoretically throw an ArithmeticException but this branch
+    // only runs if limit < MAX_LIMIT_FOR_IN_MEMORY_TIME_ORDERING (which should be < Integer.MAX_VALUE)
+    List<String> sortColumns = scanQuery.getOrderBys()
+                                        .stream()
+                                        .map(orderBy -> orderBy.getColumnName())
+                                        .collect(Collectors.toList());
+    Sorter<ScanResultValue> sorter = new QueueBasedSorter<>(limit, scanQuery.getOrderByNoneTimeResultOrdering());
+    Yielder<ScanResultValue> yielder = Yielders.each(inputSequence);
+    try {
+      boolean doneScanning = yielder.isDone();
+      // We need to scan limit elements and anything else in the last segment
+      while (!doneScanning) {
+        ScanResultValue next = yielder.get();
+        List<ScanResultValue> singleEventScanResultValues = next.toSingleEventScanResultValues();
+        for (ScanResultValue srv : singleEventScanResultValues) {
+          // Using an intermediate unbatched ScanResultValue is not that great memory-wise, but the column list
+          // needs to be preserved for queries using the compactedList result format
+          List events = (List) (srv.getEvents());
+          for (Object event : events) {
+            List<Comparable> sortValues;
+            if (event instanceof LinkedHashMap) {
+              sortValues = sortColumns.stream()
+                                      .map(c -> ((LinkedHashMap<Object, Comparable>) event).get(c))
+                                      .collect(Collectors.toList());
+            } else {
+              sortValues = sortColumns.stream()
+                                      .map(c -> ((List<Comparable>) event).get(srv.getColumns().indexOf(c)))
+                                      .collect(Collectors.toList());
+            }
+            sorter.add(new Sorter.SorterElement<>(srv, sortValues));
+          }
+        }
+        yielder = yielder.next(null);
+        doneScanning = yielder.isDone();
+      }
+      final List<ScanResultValue> sortedElements = new ArrayList<>(sorter.size());
+      Iterators.addAll(sortedElements, sorter.drainElement());

Review Comment:
   There should be a deterministic ordering of the column values in the array. I believe that the select list in the `ScanQuery` provides that order. This is, in fact, the order used in the compact array. As a result, all of the cursors for a single segment should have the same order of columns within the compact array. Within a single segment, there is no danger of one cursor having the column, and another not having it. That issue arises between segments, but that would occur during merge.
   
   So, we can map from the cursor into a "compact list" (that is, a list of arrays) that we combine into a single big list for sorting. When we break the sorted list down into batches, all the batches will also have the same column order within the arrays.
   
   We do have to preserve the headers: I omitted that detail from my long note. The sorter can hold onto the first header it sees, and reuse that header for all batches it creates after sorting.
   
   To verify the assertion that all arrays have the same column positions within the array, on the second and subsequent batches we can spin though the header to verify that the column order is identical to the first header.
   
   If the query requests a (non-compact) list, then we have to convert the array-based rows into maps. To do this, we can use the header we saved above, then loop through each row to create the required map. That's going to be inefficient, so I hope few queries ask for the list-of-maps format!
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] 599166320 commented on a diff in pull request #13168: ScanQuery supports multi column orderBy queries

Posted by GitBox <gi...@apache.org>.
599166320 commented on code in PR #13168:
URL: https://github.com/apache/druid/pull/13168#discussion_r990634715


##########
processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java:
##########
@@ -90,13 +92,27 @@ public QueryRunner<ScanResultValue> mergeRunners(
     // in single thread and in Jetty thread instead of processing thread
     return (queryPlus, responseContext) -> {
       ScanQuery query = (ScanQuery) queryPlus.getQuery();
-      ScanQuery.verifyOrderByForNativeExecution(query);
 
       // Note: this variable is effective only when queryContext has a timeout.
       // See the comment of ResponseContext.Key.TIMEOUT_AT.
       final long timeoutAt = System.currentTimeMillis() + QueryContexts.getTimeout(queryPlus.getQuery());
       responseContext.putTimeoutTime(timeoutAt);
 
+      if (query.scanOrderByNonTime()) {

Review Comment:
   `scanOrderByNonTime()` is used by `ScanQueryRunnerFactory` and` ScanQueryQueryToolChest mergeResults`. At present, I don't think of a good solution. I put it in ScanQuery temporarily. If there is a more appropriate modification, please let me know.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] ScanQuery supports multi column orderBy queries (druid)

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] closed pull request #13168: ScanQuery supports multi column orderBy queries
URL: https://github.com/apache/druid/pull/13168


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] 599166320 commented on a diff in pull request #13168: ScanQuery supports multi column orderBy queries

Posted by GitBox <gi...@apache.org>.
599166320 commented on code in PR #13168:
URL: https://github.com/apache/druid/pull/13168#discussion_r1047363771


##########
processing/src/main/java/org/apache/druid/query/scan/ScanQueryEngine.java:
##########
@@ -252,6 +266,127 @@ public void cleanup(Iterator<ScanResultValue> iterFromMake)
     );
   }
 
+  public Sequence<ScanResultValue> processWithMultiColumnSort(
+      ScanQuery query,
+      boolean legacy,
+      boolean hasTimeout,
+      long timeoutAt,
+      StorageAdapter adapter,
+      List<String> allColumns,
+      List<Interval> intervals,
+      SegmentId segmentId,
+      Filter filter,
+      @Nullable final QueryMetrics<?> queryMetrics
+  )
+  {
+
+    int limit;
+    if (query.getScanRowsLimit() > Integer.MAX_VALUE) {
+      limit = Integer.MAX_VALUE;
+    } else {
+      limit = Math.toIntExact(query.getScanRowsLimit());
+    }
+
+    return Sequences.concat(
+        adapter
+            .makeCursors(
+                filter,
+                intervals.get(0),
+                query.getVirtualColumns(),
+                Granularities.ALL,
+                query.getTimeOrder().equals(ScanQuery.Order.DESCENDING) ||
+                (query.getTimeOrder().equals(ScanQuery.Order.NONE) && query.isDescending()),
+                queryMetrics
+            )
+            .map(cursor -> new BaseSequence<>(
+                new BaseSequence.IteratorMaker<ScanResultValue, Iterator<ScanResultValue>>()
+                {
+                  @Override
+                  public Iterator<ScanResultValue> make()
+                  {
+                    final List<BaseObjectColumnValueSelector> columnSelectors = new ArrayList<>(allColumns.size());
+
+                    for (String column : allColumns) {
+                      final BaseObjectColumnValueSelector selector;
+
+                      if (legacy && LEGACY_TIMESTAMP_KEY.equals(column)) {
+                        selector = cursor.getColumnSelectorFactory()
+                                         .makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME);
+                      } else {
+                        selector = cursor.getColumnSelectorFactory().makeColumnValueSelector(column);
+                      }
+
+                      columnSelectors.add(selector);
+                    }
+
+                    return new Iterator<ScanResultValue>()
+                    {
+                      @Override
+                      public boolean hasNext()
+                      {
+                        return !cursor.isDone();
+                      }
+
+                      @Override
+                      public ScanResultValue next()
+                      {
+                        if (!hasNext()) {
+                          throw new NoSuchElementException();
+                        }
+                        if (hasTimeout && System.currentTimeMillis() >= timeoutAt) {
+                          throw new QueryTimeoutException(StringUtils.nonStrictFormat("Query [%s] timed out", query.getId()));
+                        }
+                        Sorter<Object> sorter = new QueueBasedSorter<>(limit, query.getOrderByNoneTimeResultOrdering());
+                        rowsToCompactedList(sorter);
+                        final List<List<Object>> sortedElements = new ArrayList<>(sorter.size());
+                        Iterators.addAll(sortedElements, sorter.drainElement());

Review Comment:
   1. I think it is simple enough to directly use a `compact list` format for segment level sorting.
   If the user's query specifies `resultFormat=RESULT_ FORMAT_LIST`, we can convert `compact list` to `non-compact` list at the last stage of sorting.
   
   2. I also thought about implementing batch level sorting in the past, but I didn't think of the significance of batch level sorting. Is it to avoid too much data after materialization, which leads to the oom?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] 599166320 commented on a diff in pull request #13168: ScanQuery supports multi column orderBy queries

Posted by GitBox <gi...@apache.org>.
599166320 commented on code in PR #13168:
URL: https://github.com/apache/druid/pull/13168#discussion_r990643536


##########
processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java:
##########
@@ -275,6 +296,66 @@ Sequence<ScanResultValue> stableLimitingSort(
     }
   }
 
+  Sequence<ScanResultValue> multiColumnSort(
+      Sequence<ScanResultValue> inputSequence,
+      ScanQuery scanQuery
+  ) throws IOException
+  {
+    //In some databases, the final result set size is set to 65535 without setting the limit. We can set the maximum value of Integer here
+    int limit;
+    if (scanQuery.getScanRowsLimit() > Integer.MAX_VALUE) {
+      limit = Integer.MAX_VALUE;
+    } else {
+      limit = Math.toIntExact(scanQuery.getScanRowsLimit());
+    }
+    // Converting the limit from long to int could theoretically throw an ArithmeticException but this branch
+    // only runs if limit < MAX_LIMIT_FOR_IN_MEMORY_TIME_ORDERING (which should be < Integer.MAX_VALUE)
+    List<String> sortColumns = scanQuery.getOrderBys()
+                                        .stream()
+                                        .map(orderBy -> orderBy.getColumnName())
+                                        .collect(Collectors.toList());
+    Sorter<ScanResultValue> sorter = new QueueBasedSorter<>(limit, scanQuery.getOrderByNoneTimeResultOrdering());

Review Comment:
   There are too many anonymous inner classes, which is a bit troublesome to test. However, this modification rarely uses anonymous inner classes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] 599166320 commented on pull request #13168: ScanQuery supports multi column orderBy queries

Posted by GitBox <gi...@apache.org>.
599166320 commented on PR #13168:
URL: https://github.com/apache/druid/pull/13168#issuecomment-1273554606

   Let me briefly summarize the problems you mentioned above. Based on this PR, I have two things to do next:
   
   1. Code implementation of special path
   
   2. Solve the problem of different data types in the same column of different segments
   
   Another thing: use Calcite (operator and planner based) to improve the query engine, which is a digression


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] paul-rogers commented on a diff in pull request #13168: ScanQuery supports multi column orderBy queries

Posted by GitBox <gi...@apache.org>.
paul-rogers commented on code in PR #13168:
URL: https://github.com/apache/druid/pull/13168#discussion_r990737600


##########
processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java:
##########
@@ -90,13 +92,27 @@ public QueryRunner<ScanResultValue> mergeRunners(
     // in single thread and in Jetty thread instead of processing thread
     return (queryPlus, responseContext) -> {
       ScanQuery query = (ScanQuery) queryPlus.getQuery();
-      ScanQuery.verifyOrderByForNativeExecution(query);
 
       // Note: this variable is effective only when queryContext has a timeout.
       // See the comment of ResponseContext.Key.TIMEOUT_AT.
       final long timeoutAt = System.currentTimeMillis() + QueryContexts.getTimeout(queryPlus.getQuery());
       responseContext.putTimeoutTime(timeoutAt);
 
+      if (query.scanOrderByNonTime()) {

Review Comment:
   Fair enough. Let's leave it where it is for now: it can always be moved later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] 599166320 commented on a diff in pull request #13168: ScanQuery supports multi column orderBy queries

Posted by GitBox <gi...@apache.org>.
599166320 commented on code in PR #13168:
URL: https://github.com/apache/druid/pull/13168#discussion_r985286087


##########
processing/src/main/java/org/apache/druid/collections/MultiColumnSorter.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.collections;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import java.util.Iterator;
+import java.util.List;
+
+public interface MultiColumnSorter<T>

Review Comment:
   @paul-rogers I have added a wrapper interface to the sorter



##########
processing/src/main/java/org/apache/druid/query/scan/OrderByQueryRunner.java:
##########
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.scan;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Sets;
+import org.apache.druid.collections.MultiColumnSorter;
+import org.apache.druid.collections.QueueBasedMultiColumnSorter;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.JodaUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.guava.Comparators;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.guava.Sequences;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryContexts;
+import org.apache.druid.query.QueryMetrics;
+import org.apache.druid.query.QueryPlus;
+import org.apache.druid.query.QueryRunner;
+import org.apache.druid.query.context.ResponseContext;
+import org.apache.druid.query.filter.Filter;
+import org.apache.druid.segment.Cursor;
+import org.apache.druid.segment.Segment;
+import org.apache.druid.segment.StorageAdapter;
+import org.apache.druid.segment.VirtualColumn;
+import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.segment.filter.Filters;
+import org.apache.druid.timeline.SegmentId;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+class OrderByQueryRunner implements QueryRunner<ScanResultValue>

Review Comment:
   @paul-rogers  By default, priority queue based sorting is used in a segment. The sorting column will be materialized in advance. All sorting columns will be traversed first, and the topK offset will be cached in the priority queue. Then, the data to be returned will be extracted using the delayed materialization strategy, and only the line corresponding to the topK offset needs to be extracted.



##########
processing/src/main/java/org/apache/druid/query/scan/OrderByQueryRunner.java:
##########
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.scan;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Sets;
+import org.apache.druid.collections.MultiColumnSorter;
+import org.apache.druid.collections.QueueBasedMultiColumnSorter;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.JodaUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.guava.Comparators;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.guava.Sequences;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryContexts;
+import org.apache.druid.query.QueryMetrics;
+import org.apache.druid.query.QueryPlus;
+import org.apache.druid.query.QueryRunner;
+import org.apache.druid.query.context.ResponseContext;
+import org.apache.druid.query.filter.Filter;
+import org.apache.druid.segment.Cursor;
+import org.apache.druid.segment.Segment;
+import org.apache.druid.segment.StorageAdapter;
+import org.apache.druid.segment.VirtualColumn;
+import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.segment.filter.Filters;
+import org.apache.druid.timeline.SegmentId;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+class OrderByQueryRunner implements QueryRunner<ScanResultValue>

Review Comment:
   @paul-rogers  By default, priority queue based sorting is used in a segment. The sorting column will be materialized in advance. All sorting columns will be traversed first, and the topK offset will be cached in the priority queue. Then, the data to be returned will be extracted using the delayed materialization strategy, and only the row corresponding to the topK offset needs to be extracted.



##########
processing/src/main/java/org/apache/druid/query/scan/OrderByQueryRunner.java:
##########
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.scan;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Sets;
+import org.apache.druid.collections.MultiColumnSorter;
+import org.apache.druid.collections.QueueBasedMultiColumnSorter;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.JodaUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.guava.Comparators;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.guava.Sequences;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryContexts;
+import org.apache.druid.query.QueryMetrics;
+import org.apache.druid.query.QueryPlus;
+import org.apache.druid.query.QueryRunner;
+import org.apache.druid.query.context.ResponseContext;
+import org.apache.druid.query.filter.Filter;
+import org.apache.druid.segment.Cursor;
+import org.apache.druid.segment.Segment;
+import org.apache.druid.segment.StorageAdapter;
+import org.apache.druid.segment.VirtualColumn;
+import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.segment.filter.Filters;
+import org.apache.druid.timeline.SegmentId;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+class OrderByQueryRunner implements QueryRunner<ScanResultValue>

Review Comment:
   When using these sort pairs (`ListBasedOrderByQueryRunner/ListBasedSorterSequence`, `TreeMultisetOrderByQueryRunner/TreeMultisetBasedSorterSequence`), the `sort ->merge ->limit` positions are separated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] 599166320 commented on pull request #13168: ScanQuery supports multi column orderBy queries

Posted by GitBox <gi...@apache.org>.
599166320 commented on PR #13168:
URL: https://github.com/apache/druid/pull/13168#issuecomment-1318857322

   Let's ignore the problem reported by Travis CI first, because it has nothing to do with this pr.
   
   I put the two versions of sorting on the test cluster to run the same sorting in a table with 300 million rows of data, and found that the performance of the improved version of sorting (based on dictionary sorting + based on rowId random access) is 10 times faster than the original.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] lgtm-com[bot] commented on pull request #13168: ScanQuery supports multi column orderBy queries

Posted by GitBox <gi...@apache.org>.
lgtm-com[bot] commented on PR #13168:
URL: https://github.com/apache/druid/pull/13168#issuecomment-1325979496

   This pull request **introduces 1 alert** and **fixes 1** when merging a8f86b3d7af9db639b17de7a886e698f2cc91aca into 16385c71011aaf7568345657491d4350c26438ea - [view on LGTM.com](https://lgtm.com/projects/g/apache/druid/rev/pr-4a1b275bcfe48942bb0a18e0a609c6b0e47157fd)
   
   **new alerts:**
   
   * 1 for Dereferenced variable may be null
   
   **fixed alerts:**
   
   * 1 for Dereferenced variable may be null
   ---
   
   **Heads-up: LGTM.com's PR analysis will be disabled on the 5th of December**, and LGTM.com will be shut down ⏻ completely on the 16th of December 2022. Please enable GitHub code scanning, which uses the same CodeQL engine :gear: that powers LGTM.com. For more information, please check out [our post on the GitHub blog](https://github.blog/2022-08-15-the-next-step-for-lgtm-com-github-code-scanning/).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] lgtm-com[bot] commented on pull request #13168: ScanQuery supports multi column orderBy queries

Posted by GitBox <gi...@apache.org>.
lgtm-com[bot] commented on PR #13168:
URL: https://github.com/apache/druid/pull/13168#issuecomment-1270454634

   This pull request **fixes 1 alert** when merging 6f1edf3fc68eaefb729ee1a9f00547633010e933 into 0edceead80e0770ed93230fe7e097fe72d731bba - [view on LGTM.com](https://lgtm.com/projects/g/apache/druid/rev/pr-2a6d24525c5e0e5f96e6a56366209bf7d0aefeee)
   
   **fixed alerts:**
   
   * 1 for Dereferenced variable may be null


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] 599166320 commented on a diff in pull request #13168: ScanQuery supports multi column orderBy queries

Posted by GitBox <gi...@apache.org>.
599166320 commented on code in PR #13168:
URL: https://github.com/apache/druid/pull/13168#discussion_r990643147


##########
processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java:
##########
@@ -275,6 +296,66 @@ Sequence<ScanResultValue> stableLimitingSort(
     }
   }
 
+  Sequence<ScanResultValue> multiColumnSort(
+      Sequence<ScanResultValue> inputSequence,
+      ScanQuery scanQuery
+  ) throws IOException
+  {
+    //In some databases, the final result set size is set to 65535 without setting the limit. We can set the maximum value of Integer here
+    int limit;
+    if (scanQuery.getScanRowsLimit() > Integer.MAX_VALUE) {
+      limit = Integer.MAX_VALUE;
+    } else {
+      limit = Math.toIntExact(scanQuery.getScanRowsLimit());
+    }
+    // Converting the limit from long to int could theoretically throw an ArithmeticException but this branch
+    // only runs if limit < MAX_LIMIT_FOR_IN_MEMORY_TIME_ORDERING (which should be < Integer.MAX_VALUE)
+    List<String> sortColumns = scanQuery.getOrderBys()
+                                        .stream()
+                                        .map(orderBy -> orderBy.getColumnName())
+                                        .collect(Collectors.toList());
+    Sorter<ScanResultValue> sorter = new QueueBasedSorter<>(limit, scanQuery.getOrderByNoneTimeResultOrdering());
+    Yielder<ScanResultValue> yielder = Yielders.each(inputSequence);
+    try {
+      boolean doneScanning = yielder.isDone();
+      // We need to scan limit elements and anything else in the last segment
+      while (!doneScanning) {
+        ScanResultValue next = yielder.get();
+        List<ScanResultValue> singleEventScanResultValues = next.toSingleEventScanResultValues();
+        for (ScanResultValue srv : singleEventScanResultValues) {
+          // Using an intermediate unbatched ScanResultValue is not that great memory-wise, but the column list
+          // needs to be preserved for queries using the compactedList result format
+          List events = (List) (srv.getEvents());
+          for (Object event : events) {
+            List<Comparable> sortValues;
+            if (event instanceof LinkedHashMap) {
+              sortValues = sortColumns.stream()
+                                      .map(c -> ((LinkedHashMap<Object, Comparable>) event).get(c))
+                                      .collect(Collectors.toList());
+            } else {
+              sortValues = sortColumns.stream()
+                                      .map(c -> ((List<Comparable>) event).get(srv.getColumns().indexOf(c)))
+                                      .collect(Collectors.toList());
+            }
+            sorter.add(new Sorter.SorterElement<>(srv, sortValues));

Review Comment:
   I think SorterElement is more intuitive, or ScanResultValue can also solve the sorting problem by adding the Comparable [] orderByColumnValues field



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] lgtm-com[bot] commented on pull request #13168: ScanQuery supports multi column orderBy queries

Posted by GitBox <gi...@apache.org>.
lgtm-com[bot] commented on PR #13168:
URL: https://github.com/apache/druid/pull/13168#issuecomment-1272563462

   This pull request **fixes 1 alert** when merging 29a40420cc4381ecfc85f7f331a0d78b22d82a96 into 573e12c75fc0465d38ef37cae092125882f27eeb - [view on LGTM.com](https://lgtm.com/projects/g/apache/druid/rev/pr-25fc24b95b01a9c3ca1ee0c8d3a18beb015c9bf8)
   
   **fixed alerts:**
   
   * 1 for Dereferenced variable may be null


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] ScanQuery supports multi column orderBy queries (druid)

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #13168:
URL: https://github.com/apache/druid/pull/13168#issuecomment-1883994963

   This pull request has been marked as stale due to 60 days of inactivity.
   It will be closed in 4 weeks if no further activity occurs. If you think
   that's incorrect or this pull request should instead be reviewed, please simply
   write any comment. Even if closed, you can still revive the PR at any time or
   discuss it on the dev@druid.apache.org list.
   Thank you for your contributions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] lgtm-com[bot] commented on pull request #13168: ScanQuery supports multi column orderBy queries

Posted by GitBox <gi...@apache.org>.
lgtm-com[bot] commented on PR #13168:
URL: https://github.com/apache/druid/pull/13168#issuecomment-1317560075

   This pull request **introduces 1 alert** and **fixes 1** when merging 138f7fdfcbff860862b95bfe692dbbcfcf89f10b into 78d0b0abce2b27b88a53790684d81530a2318fc9 - [view on LGTM.com](https://lgtm.com/projects/g/apache/druid/rev/pr-7e644a0ec0a5441c35deebb7090343c072bf9bde)
   
   **new alerts:**
   
   * 1 for Dereferenced variable may be null
   
   **fixed alerts:**
   
   * 1 for Dereferenced variable may be null
   ---
   
   **Heads-up: LGTM.com's PR analysis will be disabled on the 5th of December**, and LGTM.com will be shut down ⏻ completely on the 16th of December 2022. Please enable GitHub code scanning, which uses the same CodeQL engine :gear: that powers LGTM.com. For more information, please check out [our post on the GitHub blog](https://github.blog/2022-08-15-the-next-step-for-lgtm-com-github-code-scanning/).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] 599166320 commented on a diff in pull request #13168: ScanQuery supports multi column orderBy queries

Posted by GitBox <gi...@apache.org>.
599166320 commented on code in PR #13168:
URL: https://github.com/apache/druid/pull/13168#discussion_r990633896


##########
processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java:
##########
@@ -496,6 +523,22 @@ public ScanQuery withOverriddenContext(Map<String, Object> contextOverrides)
     return Druids.ScanQueryBuilder.copy(this).context(computeOverriddenContext(getContext(), contextOverrides)).build();
   }
 
+  public boolean scanOrderByNonTime()
+  {
+
+    if (orderBys.size() > 1 || (orderBys.size() == 1 && !ColumnHolder.TIME_COLUMN_NAME.equals(orderBys.get(0).getColumnName()))) {
+      //order by Ordinary column
+      return true;
+    }
+
+    if (orderBys.size() == 1 && ColumnHolder.TIME_COLUMN_NAME.equals(orderBys.get(0).getColumnName()) && getDataSource() instanceof InlineDataSource) {
+      //The type of datasource is inlineDataSource and order by __time
+      return true;
+    }
+
+    return false;

Review Comment:
   1. For inline data sources, any column can be sorted.
   
   2. The last judgment is wrong. I replace `return !ColumnHolder.TIME_COLUMN_NAME.equals(orderBys.get(0).getColumnName());` with `return ColumnHolder.TIME_COLUMN_NAME.equals(orderBys.get(0).getColumnName());`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] 599166320 commented on a diff in pull request #13168: ScanQuery supports multi column orderBy queries

Posted by GitBox <gi...@apache.org>.
599166320 commented on code in PR #13168:
URL: https://github.com/apache/druid/pull/13168#discussion_r990650192


##########
processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java:
##########
@@ -275,6 +296,66 @@ Sequence<ScanResultValue> stableLimitingSort(
     }
   }
 
+  Sequence<ScanResultValue> multiColumnSort(
+      Sequence<ScanResultValue> inputSequence,
+      ScanQuery scanQuery
+  ) throws IOException
+  {
+    //In some databases, the final result set size is set to 65535 without setting the limit. We can set the maximum value of Integer here
+    int limit;
+    if (scanQuery.getScanRowsLimit() > Integer.MAX_VALUE) {
+      limit = Integer.MAX_VALUE;
+    } else {
+      limit = Math.toIntExact(scanQuery.getScanRowsLimit());
+    }
+    // Converting the limit from long to int could theoretically throw an ArithmeticException but this branch
+    // only runs if limit < MAX_LIMIT_FOR_IN_MEMORY_TIME_ORDERING (which should be < Integer.MAX_VALUE)
+    List<String> sortColumns = scanQuery.getOrderBys()
+                                        .stream()
+                                        .map(orderBy -> orderBy.getColumnName())
+                                        .collect(Collectors.toList());
+    Sorter<ScanResultValue> sorter = new QueueBasedSorter<>(limit, scanQuery.getOrderByNoneTimeResultOrdering());
+    Yielder<ScanResultValue> yielder = Yielders.each(inputSequence);
+    try {
+      boolean doneScanning = yielder.isDone();
+      // We need to scan limit elements and anything else in the last segment
+      while (!doneScanning) {
+        ScanResultValue next = yielder.get();
+        List<ScanResultValue> singleEventScanResultValues = next.toSingleEventScanResultValues();
+        for (ScanResultValue srv : singleEventScanResultValues) {
+          // Using an intermediate unbatched ScanResultValue is not that great memory-wise, but the column list
+          // needs to be preserved for queries using the compactedList result format
+          List events = (List) (srv.getEvents());
+          for (Object event : events) {
+            List<Comparable> sortValues;
+            if (event instanceof LinkedHashMap) {
+              sortValues = sortColumns.stream()
+                                      .map(c -> ((LinkedHashMap<Object, Comparable>) event).get(c))
+                                      .collect(Collectors.toList());
+            } else {
+              sortValues = sortColumns.stream()
+                                      .map(c -> ((List<Comparable>) event).get(srv.getColumns().indexOf(c)))
+                                      .collect(Collectors.toList());
+            }
+            sorter.add(new Sorter.SorterElement<>(srv, sortValues));
+          }
+        }
+        yielder = yielder.next(null);
+        doneScanning = yielder.isDone();
+      }
+      final List<ScanResultValue> sortedElements = new ArrayList<>(sorter.size());
+      Iterators.addAll(sortedElements, sorter.drainElement());

Review Comment:
   One array might not be enough? The headers in the result set are also ordered, and the sort columns are also ordered. Of course, you can also add orderByColumnValues to ScanResultValue. There is no need for sortedElement. what do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] lgtm-com[bot] commented on pull request #13168: ScanQuery supports multi column orderBy queries

Posted by GitBox <gi...@apache.org>.
lgtm-com[bot] commented on PR #13168:
URL: https://github.com/apache/druid/pull/13168#issuecomment-1270635181

   This pull request **fixes 1 alert** when merging 151c36030da7ba3b6cc8fea71faf8e6dd3c5455b into 0edceead80e0770ed93230fe7e097fe72d731bba - [view on LGTM.com](https://lgtm.com/projects/g/apache/druid/rev/pr-541c5649dd7b38a063852c7da67ca01a52df96ce)
   
   **fixed alerts:**
   
   * 1 for Dereferenced variable may be null


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] paul-rogers commented on pull request #13168: ScanQuery supports multi column orderBy queries

Posted by GitBox <gi...@apache.org>.
paul-rogers commented on PR #13168:
URL: https://github.com/apache/druid/pull/13168#issuecomment-1272151595

   @599166320, nice improvements! We can perhaps simplify the code even further.
   
   Scan query is a bit confusing. The `ScanQueryEngine` produces batches of rows (the `ScanResultValue`), but to sort want individual rows. To mimic the current code, we have to put those rows back into batches for the rest of the query stack. Let's take these one-by-one.
   
   First, let's remind ourselves that, if we sort (even with a limit) we need all the rows. So, we do want to call the thing that runs the engine. The question is, what is the format of the data and what do we do with it?
   
   For better or worse, `ScanQueryEngine` takes the rows from the cursor and wraps them in `ScanResultValue`. Worse, for our purposes, `ScanQueryEngine` maps the rows to the two different scan query result formats. What we really want is single rows in a single, known format. Second best is a batch of rows in a single, known format. That format should be the "compact list" since we'll be storing them in memory.
   
   So, our first task is to convince the `ScanQueryEngine` to give us what we want. We could modify the engine to return rows. But, that code is a bit complex, so we might want to use what we have, at least for now. 
   
   You are already rewriting the query to strip off the sort and limit when we do your custom sort. We can also force the format to `RESULT_FORMAT_COMPACTED_LIST`. This will give us, returned from `ScanQueryEngine.process` a sequence of `ScanQueryValues`. 
   
   But, we want rows. So, the next step is to unpack the `ScanQueryValue` objects into individual rows, and store those in an array, ready for sorting. Your code does this (though see comments.)
   
   Then, we an apply the sort you've developed. Here, we can know that we have rows in compact list form, so each sort key can be translated to a position so we're comparing `row1[posn], row2[posn]`. That's about as fast as it can get.
   
   Now we've got a sorted array. But, we want it potentially limited. So, our effective row count is `min(rowCount, limit)`.
   
   We now have a (perhaps limited) sorted list. The final step is to recreate the `ScanResultValue` objects, potentially converting to the inefficient `RESULT_FORMAT_LIST` if configured. The size of each of the `ScanResultValue` objects is, I believe, controlled by the batch value in the query.
   
   Perhaps all of this can be handled by a single class based on `SimpleSequence()`.
   
   * `make()` reads all the rows, creates the array, sorts it, and returns our "batching iterator".
   * The "batching iterator" `next()` provides the next `batchSize` rows converted to a `ScanResultValue`, stopping when we hit the effective row count computed above.
   
   With that, `ScanQueryRunnerFactory.ScanQueryRunner`:
   
   * Checks if a custom sort is needed using the function you've provided.
   * If not, just returns the sequence from `ScanQueryEngine.process` as today.
   * Else
     * Modifies the query as we discussed.
     * Calls `ScanQueryEngine.process`
     * Hands the returned sequence to the sorter sequence described above
     * Returns the sorter sequence
   
   All this said, there is a bit of code that is pretty close already: `ScanQueryRunnerFactory.stableLimitingSort(). That methods returns a sequence which unpacks events, and sorts them. It is not quite what we want because it uses a priority queue (awkward for large numbers of items, great for merging) and returns rows one-by-one (we want a batch.) Still, it provides some good examples. (You've probably already looked at it since your implementation is similar.)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] 599166320 commented on a diff in pull request #13168: ScanQuery supports multi column orderBy queries

Posted by GitBox <gi...@apache.org>.
599166320 commented on code in PR #13168:
URL: https://github.com/apache/druid/pull/13168#discussion_r990654173


##########
processing/src/main/java/org/apache/druid/query/scan/ScanQueryConfig.java:
##########
@@ -27,6 +27,7 @@
 {
   public static final String CTX_KEY_MAX_ROWS_QUEUED_FOR_ORDERING = "maxRowsQueuedForOrdering";
   public static final String CTX_KEY_MAX_SEGMENT_PARTITIONS_FOR_ORDERING = "maxSegmentPartitionsOrderedInMemory";
+  public static final String CTX_KEY_QUERY_RUNNER_TYPE = "orderByQueryRunnerType";

Review Comment:
   At first, I considered adding a variety of sorting algorithms, but now I can't use them. I have deleted them



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] lgtm-com[bot] commented on pull request #13168: ScanQuery supports multi column orderBy queries

Posted by GitBox <gi...@apache.org>.
lgtm-com[bot] commented on PR #13168:
URL: https://github.com/apache/druid/pull/13168#issuecomment-1264738499

   This pull request **introduces 1 alert** and **fixes 1** when merging 5efc2d1fa8ef362db00f45f7dfaa6f9601a8d859 into ebfe1c0c90d86e4d188617fe840dafb2c9b7e5b0 - [view on LGTM.com](https://lgtm.com/projects/g/apache/druid/rev/pr-f0ddfc1713ba954289cd611cfac88312e6385e30)
   
   **new alerts:**
   
   * 1 for Dereferenced variable may be null
   
   **fixed alerts:**
   
   * 1 for Dereferenced variable may be null


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] lgtm-com[bot] commented on pull request #13168: ScanQuery supports multi column orderBy queries

Posted by GitBox <gi...@apache.org>.
lgtm-com[bot] commented on PR #13168:
URL: https://github.com/apache/druid/pull/13168#issuecomment-1264894115

   This pull request **fixes 1 alert** when merging 858a0b6190d27799306e859a394ec958f8bbef67 into ebfe1c0c90d86e4d188617fe840dafb2c9b7e5b0 - [view on LGTM.com](https://lgtm.com/projects/g/apache/druid/rev/pr-415c29438cc37667e25619de7c9ee1b0e7b19b7d)
   
   **fixed alerts:**
   
   * 1 for Dereferenced variable may be null


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] 599166320 commented on pull request #13168: ScanQuery supports multi column orderBy queries

Posted by GitBox <gi...@apache.org>.
599166320 commented on PR #13168:
URL: https://github.com/apache/druid/pull/13168#issuecomment-1270349887

   @paul-rogers 
   I have done the following work in this commit:
   
   1. For the sorting of ordinary columns, when traversing the segment, I prevent the scanquery object from passing the orderByLimit parameter to the cursor. (that is, the simplest solution mentioned above)
   
   2. Improve and add more unit tests
   
   3. Performance optimization mentioned above
   
   
   
   The following points should be noted:
   
   I added an `n-way merge strategy` to ScanQueryRunnerFactory. Eventually, I did not add a limit to the generated Sequence<ScanResultValue> like nWayMergeAndLimit. I don't think it is necessary.
   
   This is because the merge operation of both data nodes `(Historical/Realtime)` and `brokers` will be implemented in ScanQueryOrderByLimitRowIterator.
   
   
   
   Another thing to note is that I didn't put the sorting of 	`__time` and the sorting of ordinary columns together, because `__time` is special. In Druid, `__time` is actually a special index. Unlike ordinary sorting, it must traverse all sorted data.
   
   
   When you review again, see if there is anything else to improve?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] 599166320 commented on a diff in pull request #13168: ScanQuery supports multi column orderBy queries

Posted by GitBox <gi...@apache.org>.
599166320 commented on code in PR #13168:
URL: https://github.com/apache/druid/pull/13168#discussion_r1047312824


##########
processing/src/main/java/org/apache/druid/query/scan/ScanQueryEngine.java:
##########
@@ -66,7 +72,12 @@ public Sequence<ScanResultValue> process(
       @Nullable final QueryMetrics<?> queryMetrics
   )
   {
-    if (segment.asQueryableIndex() != null && segment.asQueryableIndex().isFromTombstone()) {
+    QueryableIndex queryableIndex = segment.asQueryableIndex();
+    if (!query.canPushSort() && Objects.nonNull(queryableIndex)) {
+      return new QueryableIndexOrderbyRunner().process(query, segment, responseContext, queryMetrics, queryableIndex);
+    }
+
+    if (queryableIndex != null && queryableIndex.isFromTombstone()) {

Review Comment:
   The tombstone can indicate that the data of this segment does not need to be queried.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] paul-rogers commented on pull request #13168: ScanQuery supports multi column orderBy queries

Posted by GitBox <gi...@apache.org>.
paul-rogers commented on PR #13168:
URL: https://github.com/apache/druid/pull/13168#issuecomment-1272683109

   Now we can check our understanding of how your feature fits into the stack. There are two parts: sorts and merges. Let's tackle the sort first. There are multiple ways to handle the custom sort. One (undesireable) way is to let the Broker do the work:
   
   ```text
   3. Broker: unbatch results from h historicals into a single array(or list), sort it, and return the sorted results as a sequence
   4. Historical: concat results from s segments into a single sequence
   5. Segment: concat results from c cursors into a single sequence
   6. Cursor: unordered read r rows into b batches
   ```
   
   This works, but it puts unwanted amounts of load (in memory and compute) on the Broker. Better to distribute the sort. My earlier note suggested doing the sort at the segment level -- in part because that is easiest to explain:
   
   ```text
   3. Broker: ordered merge of results from h historicals: merging the rows from `ScanResultValue` batches to create new batches
   4. Historical: as above, for the s segments
   5. Segment: unbatch results from c cursors into a single array(or list), sort it, and return the sorted results as a sequence
   6. Cursor: unordered read r rows into b batches
   ```
   
   The advantage of the above is that if steps 5 and 6 run in a single thread, we do one big sort rather than a bunch of small sorts and a big merge. My hunch is that the single big sort would be faster. Of course, if we did step 6 in separate threads for each cursor, then we'd want to get maximum parallelism and so we'd want to use the approach which I think you prefer:
   
   ```text
   4. Historical: Ordered merge of s segments into a single sequence
   5. Segment: Ordered merge of c cursors into a single sequence
   5.5. Ordered merge of b batches into a single sequence 
   6. Cursor: unordered read r rows into b batches, sort each batch
   ```
   
   There is one more variation possible:
   
   ```text
   5. Segment: Ordered merge of c cursors into a single sequence
   5.5. Combine all r rows into a single list & sort
   6. Cursor: unordered read r rows into b batches
   ```
   
   All of these variations work. There is just a trade-off of the cost of the sort and the cost of the merge. The memory footprint is the same: all rows need to be in memory somewhere in order to be sorted. They are all in one big list (the Broker approach) or in multiple smaller lists (the other approaches.)
   
   *Editorial aside: this kind of detailed cost-tradeoff and query planning is best done by software, such as the Calcite planner. Us humans find the above mind-numbingly complex. As a result, the output of a human-generated query plan is often some combination of messy, buggy and sub-optimal performance. This is one reason we're discussing moving to a more traditional operator-and-planner based approach: let the computer do the boring stuff.* 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] paul-rogers commented on pull request #13168: ScanQuery supports multi column orderBy queries

Posted by GitBox <gi...@apache.org>.
paul-rogers commented on PR #13168:
URL: https://github.com/apache/druid/pull/13168#issuecomment-1272674781

   
   One more thing to note here is that Druid allows sorting of segments by more than just `__time`. We can order (I believe) a segment by, say, `(__time, a, b, c)`. In this case, the cursor _should_ be able to sort by any prefix of the segment sort order. That is, it should accept an ordering of `(__time)`, `(__time, a)`, `(__time, a, b)` or `(__time, a, b, c)`. (I have to check the code to verify). If this is true, then the function in this PR that checks if we need an "extra" sort has to take this information into consideration, which means the decision has to occur segment-by-segment since each segment may have a different sort order.
   
   To get even more into the weeds, the cursor should also be able to order by the descending version of any prefix, such as `(__time DESC, a DESC, b DESC)`. The cursor cannot, however, order by a mixture of ASC/DESC that does not match the segment sort order. So, our example segment cannot return rows in an order of, say, `(__time, a DESC)`. We _would_ need the extra sort in this case. (Again, I need to verify: perhaps Druid has a trick, but most systems don't.)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] 599166320 commented on a diff in pull request #13168: ScanQuery supports multi column orderBy queries

Posted by GitBox <gi...@apache.org>.
599166320 commented on code in PR #13168:
URL: https://github.com/apache/druid/pull/13168#discussion_r990634715


##########
processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java:
##########
@@ -90,13 +92,27 @@ public QueryRunner<ScanResultValue> mergeRunners(
     // in single thread and in Jetty thread instead of processing thread
     return (queryPlus, responseContext) -> {
       ScanQuery query = (ScanQuery) queryPlus.getQuery();
-      ScanQuery.verifyOrderByForNativeExecution(query);
 
       // Note: this variable is effective only when queryContext has a timeout.
       // See the comment of ResponseContext.Key.TIMEOUT_AT.
       final long timeoutAt = System.currentTimeMillis() + QueryContexts.getTimeout(queryPlus.getQuery());
       responseContext.putTimeoutTime(timeoutAt);
 
+      if (query.scanOrderByNonTime()) {

Review Comment:
   ScanOrderByNonTime is not only used by ScanQueryRunnerFactory, ScanQueryQueryToolChest MergeResults also needs to be used. At present, I haven't thought of a good solution. It is temporarily placed in ScanQuery. If there is a more appropriate modification, please let me know.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] lgtm-com[bot] commented on pull request #13168: ScanQuery supports multi column orderBy queries

Posted by GitBox <gi...@apache.org>.
lgtm-com[bot] commented on PR #13168:
URL: https://github.com/apache/druid/pull/13168#issuecomment-1271743823

   This pull request **fixes 1 alert** when merging 040d4b0443535c78e319923ef2ef60aef6b1b67c into f89496ccacedc01449fb8ed4e45cf2345cb3ed34 - [view on LGTM.com](https://lgtm.com/projects/g/apache/druid/rev/pr-70a46b47ffa36f5258a7bc664bf2805b8af6293e)
   
   **fixed alerts:**
   
   * 1 for Dereferenced variable may be null


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] 599166320 commented on pull request #13168: ScanQuery supports multi column orderBy queries

Posted by GitBox <gi...@apache.org>.
599166320 commented on PR #13168:
URL: https://github.com/apache/druid/pull/13168#issuecomment-1274708403

   @paul-rogers 
   Maybe we don't need to consider the data type. You can use CAST function to convert to a uniform type, as shown in the following example:
   
   
   ```
   SELECT
     a,b
   FROM table
   WHERE __time >= CURRENT_TIMESTAMP - INTERVAL '1' DAY
   ORDER BY CAST(a AS DOUBLE) DESC,CAST(b AS DOUBLE) ASC
   LIMIT 10
   
   ```
   
   The above query will report the following error:
   
   ```
   Error: Unknown exception
   
   java.lang.Long cannot be cast to java.lang.Integer
   
   java.lang.ClassCastException
   ```
   
   The CAST function can be used to solve the problem of type conversion, such as the following query:
   
   
   ```
   SELECT
     a,b
   FROM table
   WHERE __time >= CURRENT_TIMESTAMP - INTERVAL '1' DAY
   ORDER BY CAST(a AS DOUBLE) DESC,CAST(b AS DOUBLE) ASC
   LIMIT 10
   ```
   
   When type conversion exceptions occur, we can prompt users to use CAST to solve their sorting problems. What do you think?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] 599166320 commented on a diff in pull request #13168: ScanQuery supports multi column orderBy queries

Posted by GitBox <gi...@apache.org>.
599166320 commented on code in PR #13168:
URL: https://github.com/apache/druid/pull/13168#discussion_r1047383221


##########
processing/src/main/java/org/apache/druid/query/scan/ScanQueryEngine.java:
##########
@@ -252,6 +266,127 @@ public void cleanup(Iterator<ScanResultValue> iterFromMake)
     );
   }
 
+  public Sequence<ScanResultValue> processWithMultiColumnSort(
+      ScanQuery query,
+      boolean legacy,
+      boolean hasTimeout,
+      long timeoutAt,
+      StorageAdapter adapter,
+      List<String> allColumns,
+      List<Interval> intervals,
+      SegmentId segmentId,
+      Filter filter,
+      @Nullable final QueryMetrics<?> queryMetrics
+  )
+  {
+
+    int limit;
+    if (query.getScanRowsLimit() > Integer.MAX_VALUE) {
+      limit = Integer.MAX_VALUE;
+    } else {
+      limit = Math.toIntExact(query.getScanRowsLimit());
+    }
+
+    return Sequences.concat(
+        adapter
+            .makeCursors(
+                filter,
+                intervals.get(0),
+                query.getVirtualColumns(),
+                Granularities.ALL,
+                query.getTimeOrder().equals(ScanQuery.Order.DESCENDING) ||
+                (query.getTimeOrder().equals(ScanQuery.Order.NONE) && query.isDescending()),
+                queryMetrics
+            )
+            .map(cursor -> new BaseSequence<>(
+                new BaseSequence.IteratorMaker<ScanResultValue, Iterator<ScanResultValue>>()
+                {
+                  @Override
+                  public Iterator<ScanResultValue> make()
+                  {
+                    final List<BaseObjectColumnValueSelector> columnSelectors = new ArrayList<>(allColumns.size());
+
+                    for (String column : allColumns) {
+                      final BaseObjectColumnValueSelector selector;
+
+                      if (legacy && LEGACY_TIMESTAMP_KEY.equals(column)) {
+                        selector = cursor.getColumnSelectorFactory()
+                                         .makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME);
+                      } else {
+                        selector = cursor.getColumnSelectorFactory().makeColumnValueSelector(column);
+                      }
+
+                      columnSelectors.add(selector);
+                    }
+
+                    return new Iterator<ScanResultValue>()
+                    {
+                      @Override
+                      public boolean hasNext()
+                      {
+                        return !cursor.isDone();
+                      }
+
+                      @Override
+                      public ScanResultValue next()
+                      {
+                        if (!hasNext()) {
+                          throw new NoSuchElementException();
+                        }
+                        if (hasTimeout && System.currentTimeMillis() >= timeoutAt) {
+                          throw new QueryTimeoutException(StringUtils.nonStrictFormat("Query [%s] timed out", query.getId()));
+                        }
+                        Sorter<Object> sorter = new QueueBasedSorter<>(limit, query.getOrderByNoneTimeResultOrdering());
+                        rowsToCompactedList(sorter);
+                        final List<List<Object>> sortedElements = new ArrayList<>(sorter.size());
+                        Iterators.addAll(sortedElements, sorter.drainElement());
+                        return new ScanResultValue(segmentId.toString(), allColumns, sortedElements);
+                      }
+
+                      @Override
+                      public void remove()
+                      {
+                        throw new UnsupportedOperationException();
+                      }
+
+                      private void rowsToCompactedList(Sorter<Object> sorter)
+                      {
+                        for (; !cursor.isDone(); cursor.advance()) {

Review Comment:
   Each segment corresponds to a cursor, and the number of row of the segment is limited.
   The default limit here is 65535. Use Sorter (limit) to apply the limit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] lgtm-com[bot] commented on pull request #13168: ScanQuery supports multi column orderBy queries

Posted by GitBox <gi...@apache.org>.
lgtm-com[bot] commented on PR #13168:
URL: https://github.com/apache/druid/pull/13168#issuecomment-1279757263

   This pull request **fixes 1 alert** when merging c47de9a0dae11672940b37fd5054203592f85a2c into 3bbb76f17bab32b3d3e12a472e8403affeb09108 - [view on LGTM.com](https://lgtm.com/projects/g/apache/druid/rev/pr-94a41d9d0061e88b9f88fd5446f9984b49b8c50e)
   
   **fixed alerts:**
   
   * 1 for Dereferenced variable may be null


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] 599166320 commented on a diff in pull request #13168: ScanQuery supports multi column orderBy queries

Posted by GitBox <gi...@apache.org>.
599166320 commented on code in PR #13168:
URL: https://github.com/apache/druid/pull/13168#discussion_r990792268


##########
processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java:
##########
@@ -275,6 +296,66 @@ Sequence<ScanResultValue> stableLimitingSort(
     }
   }
 
+  Sequence<ScanResultValue> multiColumnSort(
+      Sequence<ScanResultValue> inputSequence,
+      ScanQuery scanQuery
+  ) throws IOException
+  {
+    //In some databases, the final result set size is set to 65535 without setting the limit. We can set the maximum value of Integer here
+    int limit;
+    if (scanQuery.getScanRowsLimit() > Integer.MAX_VALUE) {
+      limit = Integer.MAX_VALUE;
+    } else {
+      limit = Math.toIntExact(scanQuery.getScanRowsLimit());
+    }
+    // Converting the limit from long to int could theoretically throw an ArithmeticException but this branch
+    // only runs if limit < MAX_LIMIT_FOR_IN_MEMORY_TIME_ORDERING (which should be < Integer.MAX_VALUE)
+    List<String> sortColumns = scanQuery.getOrderBys()
+                                        .stream()
+                                        .map(orderBy -> orderBy.getColumnName())
+                                        .collect(Collectors.toList());
+    Sorter<ScanResultValue> sorter = new QueueBasedSorter<>(limit, scanQuery.getOrderByNoneTimeResultOrdering());
+    Yielder<ScanResultValue> yielder = Yielders.each(inputSequence);
+    try {
+      boolean doneScanning = yielder.isDone();
+      // We need to scan limit elements and anything else in the last segment
+      while (!doneScanning) {
+        ScanResultValue next = yielder.get();
+        List<ScanResultValue> singleEventScanResultValues = next.toSingleEventScanResultValues();
+        for (ScanResultValue srv : singleEventScanResultValues) {
+          // Using an intermediate unbatched ScanResultValue is not that great memory-wise, but the column list
+          // needs to be preserved for queries using the compactedList result format
+          List events = (List) (srv.getEvents());
+          for (Object event : events) {
+            List<Comparable> sortValues;
+            if (event instanceof LinkedHashMap) {
+              sortValues = sortColumns.stream()
+                                      .map(c -> ((LinkedHashMap<Object, Comparable>) event).get(c))
+                                      .collect(Collectors.toList());
+            } else {
+              sortValues = sortColumns.stream()
+                                      .map(c -> ((List<Comparable>) event).get(srv.getColumns().indexOf(c)))
+                                      .collect(Collectors.toList());
+            }
+            sorter.add(new Sorter.SorterElement<>(srv, sortValues));
+          }
+        }
+        yielder = yielder.next(null);
+        doneScanning = yielder.isDone();
+      }
+      final List<ScanResultValue> sortedElements = new ArrayList<>(sorter.size());
+      Iterators.addAll(sortedElements, sorter.drainElement());

Review Comment:
   Your idea is feasible, I have finished changing it, thank you for your patience in explaining



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] paul-rogers commented on a diff in pull request #13168: ScanQuery supports multi column orderBy queries

Posted by GitBox <gi...@apache.org>.
paul-rogers commented on code in PR #13168:
URL: https://github.com/apache/druid/pull/13168#discussion_r990737548


##########
processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java:
##########
@@ -275,6 +296,66 @@ Sequence<ScanResultValue> stableLimitingSort(
     }
   }
 
+  Sequence<ScanResultValue> multiColumnSort(
+      Sequence<ScanResultValue> inputSequence,
+      ScanQuery scanQuery
+  ) throws IOException
+  {
+    //In some databases, the final result set size is set to 65535 without setting the limit. We can set the maximum value of Integer here
+    int limit;
+    if (scanQuery.getScanRowsLimit() > Integer.MAX_VALUE) {
+      limit = Integer.MAX_VALUE;
+    } else {
+      limit = Math.toIntExact(scanQuery.getScanRowsLimit());
+    }
+    // Converting the limit from long to int could theoretically throw an ArithmeticException but this branch
+    // only runs if limit < MAX_LIMIT_FOR_IN_MEMORY_TIME_ORDERING (which should be < Integer.MAX_VALUE)
+    List<String> sortColumns = scanQuery.getOrderBys()
+                                        .stream()
+                                        .map(orderBy -> orderBy.getColumnName())
+                                        .collect(Collectors.toList());
+    Sorter<ScanResultValue> sorter = new QueueBasedSorter<>(limit, scanQuery.getOrderByNoneTimeResultOrdering());
+    Yielder<ScanResultValue> yielder = Yielders.each(inputSequence);
+    try {
+      boolean doneScanning = yielder.isDone();
+      // We need to scan limit elements and anything else in the last segment
+      while (!doneScanning) {
+        ScanResultValue next = yielder.get();
+        List<ScanResultValue> singleEventScanResultValues = next.toSingleEventScanResultValues();
+        for (ScanResultValue srv : singleEventScanResultValues) {
+          // Using an intermediate unbatched ScanResultValue is not that great memory-wise, but the column list
+          // needs to be preserved for queries using the compactedList result format
+          List events = (List) (srv.getEvents());
+          for (Object event : events) {
+            List<Comparable> sortValues;
+            if (event instanceof LinkedHashMap) {
+              sortValues = sortColumns.stream()
+                                      .map(c -> ((LinkedHashMap<Object, Comparable>) event).get(c))
+                                      .collect(Collectors.toList());
+            } else {
+              sortValues = sortColumns.stream()
+                                      .map(c -> ((List<Comparable>) event).get(srv.getColumns().indexOf(c)))
+                                      .collect(Collectors.toList());
+            }
+            sorter.add(new Sorter.SorterElement<>(srv, sortValues));

Review Comment:
   When sorting a big list, we want the most efficient per-row storage format available. By storing rows as arrays, rather than wrapping the rows in another class, we have a more compact representation: something that is handy if we have to sort the full five million rows in a typical segment.
   
   The `ScanResultValue` notion is an option. Each `ScanResultValue` contains a batch of rows. To sort the full result set, one solution is to copy all values into one big array, which is what my prior notes assumed. The other way, that you might have in mind, is to sort each ScanResultValue, then merge the sorted batches to get the final results. That can work: it might be more efficient than copy the rows into the big array. We'd still store all the rows, but not in one big array.
   
   The question here is the size of the merge. I think each batch in `ScanResultValue` defaults to around 20K rows. If we have to sort a full segment, we'd have 5 million rows, or 5M / 20K = 250 batches. This would result in a pretty wide merge. Of course, most queries will return much smaller result sets.
   
   In any event, whichever choice is selected can be encapsulated in the sorter class described earlier.
   
   In fact, to be really fancy, we can defer the decision until all results arrive. If there is only one batch: sort that and return it (possibly limited). If there are less than, say, 10 or 20, sort each and merge. If more, combine batches until we get down to 10 to 20 (runs), sort each run, and return the results.
   
   Once we have more than two batches, however, we have to create new output batches with the sorted results from multiple batches in sorted order.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] 599166320 commented on a diff in pull request #13168: ScanQuery supports multi column orderBy queries

Posted by GitBox <gi...@apache.org>.
599166320 commented on code in PR #13168:
URL: https://github.com/apache/druid/pull/13168#discussion_r990634715


##########
processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java:
##########
@@ -90,13 +92,27 @@ public QueryRunner<ScanResultValue> mergeRunners(
     // in single thread and in Jetty thread instead of processing thread
     return (queryPlus, responseContext) -> {
       ScanQuery query = (ScanQuery) queryPlus.getQuery();
-      ScanQuery.verifyOrderByForNativeExecution(query);
 
       // Note: this variable is effective only when queryContext has a timeout.
       // See the comment of ResponseContext.Key.TIMEOUT_AT.
       final long timeoutAt = System.currentTimeMillis() + QueryContexts.getTimeout(queryPlus.getQuery());
       responseContext.putTimeoutTime(timeoutAt);
 
+      if (query.scanOrderByNonTime()) {

Review Comment:
   scanOrderByNonTime不仅仅是ScanQueryRunnerFactory在用,ScanQueryQueryToolChest.mergeResults 里面也需要使用到scanOrderByNonTime,目前我没有想到好的办法,暂时放在ScanQuery。如果有更合适的修改,请跟我说一下。



##########
processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java:
##########
@@ -90,13 +92,27 @@ public QueryRunner<ScanResultValue> mergeRunners(
     // in single thread and in Jetty thread instead of processing thread
     return (queryPlus, responseContext) -> {
       ScanQuery query = (ScanQuery) queryPlus.getQuery();
-      ScanQuery.verifyOrderByForNativeExecution(query);
 
       // Note: this variable is effective only when queryContext has a timeout.
       // See the comment of ResponseContext.Key.TIMEOUT_AT.
       final long timeoutAt = System.currentTimeMillis() + QueryContexts.getTimeout(queryPlus.getQuery());
       responseContext.putTimeoutTime(timeoutAt);
 
+      if (query.scanOrderByNonTime()) {

Review Comment:
   ScanOrderByNonTime is not only used by ScanQueryRunnerFactory, ScanQueryQueryToolChest ScanOrderByNonTime also needs to be used in mergeResults. At present, I haven't thought of a good way to do this. I put it in ScanQuery for the time being. If there is a more appropriate modification, please let me know.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] paul-rogers commented on a diff in pull request #13168: ScanQuery supports multi column orderBy queries

Posted by GitBox <gi...@apache.org>.
paul-rogers commented on code in PR #13168:
URL: https://github.com/apache/druid/pull/13168#discussion_r990523689


##########
processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java:
##########
@@ -496,6 +523,22 @@ public ScanQuery withOverriddenContext(Map<String, Object> contextOverrides)
     return Druids.ScanQueryBuilder.copy(this).context(computeOverriddenContext(getContext(), contextOverrides)).build();
   }
 
+  public boolean scanOrderByNonTime()
+  {
+
+    if (orderBys.size() > 1 || (orderBys.size() == 1 && !ColumnHolder.TIME_COLUMN_NAME.equals(orderBys.get(0).getColumnName()))) {
+      //order by Ordinary column
+      return true;
+    }
+
+    if (orderBys.size() == 1 && ColumnHolder.TIME_COLUMN_NAME.equals(orderBys.get(0).getColumnName()) && getDataSource() instanceof InlineDataSource) {
+      //The type of datasource is inlineDataSource and order by __time
+      return true;
+    }
+
+    return false;

Review Comment:
   Perhaps clarify the rules a bit:
   
   ```java
   /**
     * Report whether the sort can be pushed into the Cursor, or must be done as a
     * separate sort step.
     */
   public boolean canPushSort()
   {
     // Can push non-existent sort.
     if (orderBys.size() == 0) {
      return true;
     }
     // Cursor can sort by only one column.
     if (orderBys.size() > 1) {
       return false;
     }
     // Inline datasources can't sort
     if (getDataSource() instanceof InlineDataSource) {
       return false;
     }
     // Cursor can't sort by the __time column
     return !ColumnHolder.TIME_COLUMN_NAME.equals(orderBys.get(0).getColumnName());
   }
   ```
   
   I'm not sure of the last two rules. Can an inline datasource do any sorting at all? The PR code suggests it can sort by a single column other than `_time`. Is this right?
   
   Is it also true that the Cursor can sort on any column other than time. But, I thought that segments are sorted by time, so you get `__time` "for free."



##########
processing/src/main/java/org/apache/druid/query/scan/ScanQueryConfig.java:
##########
@@ -27,6 +27,7 @@
 {
   public static final String CTX_KEY_MAX_ROWS_QUEUED_FOR_ORDERING = "maxRowsQueuedForOrdering";
   public static final String CTX_KEY_MAX_SEGMENT_PARTITIONS_FOR_ORDERING = "maxSegmentPartitionsOrderedInMemory";
+  public static final String CTX_KEY_QUERY_RUNNER_TYPE = "orderByQueryRunnerType";

Review Comment:
   Is this something the user would pass in?



##########
processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java:
##########
@@ -90,13 +92,27 @@ public QueryRunner<ScanResultValue> mergeRunners(
     // in single thread and in Jetty thread instead of processing thread
     return (queryPlus, responseContext) -> {
       ScanQuery query = (ScanQuery) queryPlus.getQuery();
-      ScanQuery.verifyOrderByForNativeExecution(query);
 
       // Note: this variable is effective only when queryContext has a timeout.
       // See the comment of ResponseContext.Key.TIMEOUT_AT.
       final long timeoutAt = System.currentTimeMillis() + QueryContexts.getTimeout(queryPlus.getQuery());
       responseContext.putTimeoutTime(timeoutAt);
 
+      if (query.scanOrderByNonTime()) {

Review Comment:
   Nit: maybe move the `scanOrderByNonTime()` into this class. The decision about what can be pushed to the `Cursor` and what has to be done by a separate sorter feels more like an implementation-level thing than a query-definition thing.



##########
processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java:
##########
@@ -90,13 +92,27 @@ public QueryRunner<ScanResultValue> mergeRunners(
     // in single thread and in Jetty thread instead of processing thread
     return (queryPlus, responseContext) -> {
       ScanQuery query = (ScanQuery) queryPlus.getQuery();
-      ScanQuery.verifyOrderByForNativeExecution(query);
 
       // Note: this variable is effective only when queryContext has a timeout.
       // See the comment of ResponseContext.Key.TIMEOUT_AT.
       final long timeoutAt = System.currentTimeMillis() + QueryContexts.getTimeout(queryPlus.getQuery());
       responseContext.putTimeoutTime(timeoutAt);
 
+      if (query.scanOrderByNonTime()) {
+        try {
+          return multiColumnSort(
+              Sequences.concat(Sequences.map(
+                  Sequences.simple(Lists.newArrayList(queryRunners)),
+                  input -> input.run(queryPlus, responseContext)
+              )),
+              query
+          );
+        }
+        catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+      }

Review Comment:
   We're in the `mergeRunners` method, creating the merge step that combines previously-sorted, potentially-limited per-segment (or per-historical) results. So, this is where we want a merger, not a sort. There are merge sequences floating around: we just have to provide the comparator for the multi-column case. See further down in this method for code that creates the merger. (Yes, the names are confusing, and the code is overly complex, but the merger is there.)



##########
processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java:
##########
@@ -275,6 +296,66 @@ Sequence<ScanResultValue> stableLimitingSort(
     }
   }
 
+  Sequence<ScanResultValue> multiColumnSort(
+      Sequence<ScanResultValue> inputSequence,
+      ScanQuery scanQuery
+  ) throws IOException
+  {
+    //In some databases, the final result set size is set to 65535 without setting the limit. We can set the maximum value of Integer here
+    int limit;
+    if (scanQuery.getScanRowsLimit() > Integer.MAX_VALUE) {
+      limit = Integer.MAX_VALUE;
+    } else {
+      limit = Math.toIntExact(scanQuery.getScanRowsLimit());
+    }
+    // Converting the limit from long to int could theoretically throw an ArithmeticException but this branch
+    // only runs if limit < MAX_LIMIT_FOR_IN_MEMORY_TIME_ORDERING (which should be < Integer.MAX_VALUE)
+    List<String> sortColumns = scanQuery.getOrderBys()
+                                        .stream()
+                                        .map(orderBy -> orderBy.getColumnName())
+                                        .collect(Collectors.toList());
+    Sorter<ScanResultValue> sorter = new QueueBasedSorter<>(limit, scanQuery.getOrderByNoneTimeResultOrdering());

Review Comment:
   I know Druid loves these anonymous inner classes, but you may find it easier to test if this is made into a named class. The code will also be easier to read as the number of nesting levels will be smaller.



##########
processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java:
##########
@@ -275,6 +296,66 @@ Sequence<ScanResultValue> stableLimitingSort(
     }
   }
 
+  Sequence<ScanResultValue> multiColumnSort(
+      Sequence<ScanResultValue> inputSequence,
+      ScanQuery scanQuery
+  ) throws IOException
+  {
+    //In some databases, the final result set size is set to 65535 without setting the limit. We can set the maximum value of Integer here
+    int limit;
+    if (scanQuery.getScanRowsLimit() > Integer.MAX_VALUE) {
+      limit = Integer.MAX_VALUE;
+    } else {
+      limit = Math.toIntExact(scanQuery.getScanRowsLimit());
+    }
+    // Converting the limit from long to int could theoretically throw an ArithmeticException but this branch
+    // only runs if limit < MAX_LIMIT_FOR_IN_MEMORY_TIME_ORDERING (which should be < Integer.MAX_VALUE)
+    List<String> sortColumns = scanQuery.getOrderBys()
+                                        .stream()
+                                        .map(orderBy -> orderBy.getColumnName())
+                                        .collect(Collectors.toList());
+    Sorter<ScanResultValue> sorter = new QueueBasedSorter<>(limit, scanQuery.getOrderByNoneTimeResultOrdering());
+    Yielder<ScanResultValue> yielder = Yielders.each(inputSequence);
+    try {
+      boolean doneScanning = yielder.isDone();
+      // We need to scan limit elements and anything else in the last segment
+      while (!doneScanning) {
+        ScanResultValue next = yielder.get();
+        List<ScanResultValue> singleEventScanResultValues = next.toSingleEventScanResultValues();
+        for (ScanResultValue srv : singleEventScanResultValues) {
+          // Using an intermediate unbatched ScanResultValue is not that great memory-wise, but the column list
+          // needs to be preserved for queries using the compactedList result format
+          List events = (List) (srv.getEvents());
+          for (Object event : events) {
+            List<Comparable> sortValues;
+            if (event instanceof LinkedHashMap) {
+              sortValues = sortColumns.stream()
+                                      .map(c -> ((LinkedHashMap<Object, Comparable>) event).get(c))
+                                      .collect(Collectors.toList());
+            } else {
+              sortValues = sortColumns.stream()
+                                      .map(c -> ((List<Comparable>) event).get(srv.getColumns().indexOf(c)))
+                                      .collect(Collectors.toList());
+            }
+            sorter.add(new Sorter.SorterElement<>(srv, sortValues));
+          }
+        }
+        yielder = yielder.next(null);
+        doneScanning = yielder.isDone();
+      }
+      final List<ScanResultValue> sortedElements = new ArrayList<>(sorter.size());
+      Iterators.addAll(sortedElements, sorter.drainElement());

Review Comment:
   Ideally, we'd assemble an array, sort it, and deliver results from it. That will save multiple copies of potentially large arrays. (Fortunately, we are only copying _pointers_, not the full rows.)



##########
processing/src/main/java/org/apache/druid/query/scan/ScanQueryOrderByLimitRowIterator.java:
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.scan;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Ordering;
+import org.apache.druid.collections.QueueBasedSorter;
+import org.apache.druid.collections.Sorter;
+import org.apache.druid.java.util.common.UOE;
+import org.apache.druid.java.util.common.guava.Comparators;
+import org.apache.druid.query.QueryPlus;
+import org.apache.druid.query.QueryRunner;
+import org.apache.druid.query.context.ResponseContext;
+
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class ScanQueryOrderByLimitRowIterator extends ScanQueryLimitRowIterator
+{
+
+  public ScanQueryOrderByLimitRowIterator(
+      QueryRunner<ScanResultValue> baseRunner,
+      QueryPlus<ScanResultValue> queryPlus,
+      ResponseContext responseContext
+  )
+  {
+    super(baseRunner, queryPlus, responseContext);
+  }
+
+  @Override
+  public boolean hasNext()
+  {
+    return !yielder.isDone();
+  }
+
+  @Override
+  public ScanResultValue next()
+  {
+    if (ScanQuery.ResultFormat.RESULT_FORMAT_VALUE_VECTOR.equals(resultFormat)) {
+      throw new UOE(ScanQuery.ResultFormat.RESULT_FORMAT_VALUE_VECTOR + " is not supported yet");
+    }
+    //In some databases, the final result set size is set to 65535 without setting the limit. We can set the maximum value of Integer here

Review Comment:
   Maybe leave off the comment about other DBs since, while true, it doesn't apply to this code. What actually happens is that, to avoid the need for "if limit is set do X else do Y" code, we just set the limit to larger than any possible result segment size. Since segments should be in the size of millions, having a limit of 2 billion is pretty safe: no segment will ever be that large.



##########
processing/src/main/java/org/apache/druid/query/scan/ScanQueryOrderByLimitRowIterator.java:
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.scan;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Ordering;
+import org.apache.druid.collections.QueueBasedSorter;
+import org.apache.druid.collections.Sorter;
+import org.apache.druid.java.util.common.UOE;
+import org.apache.druid.java.util.common.guava.Comparators;
+import org.apache.druid.query.QueryPlus;
+import org.apache.druid.query.QueryRunner;
+import org.apache.druid.query.context.ResponseContext;
+
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class ScanQueryOrderByLimitRowIterator extends ScanQueryLimitRowIterator
+{
+
+  public ScanQueryOrderByLimitRowIterator(
+      QueryRunner<ScanResultValue> baseRunner,
+      QueryPlus<ScanResultValue> queryPlus,
+      ResponseContext responseContext
+  )
+  {
+    super(baseRunner, queryPlus, responseContext);
+  }
+
+  @Override
+  public boolean hasNext()
+  {
+    return !yielder.isDone();
+  }
+
+  @Override
+  public ScanResultValue next()
+  {
+    if (ScanQuery.ResultFormat.RESULT_FORMAT_VALUE_VECTOR.equals(resultFormat)) {

Review Comment:
   For performance, we don't want to do this check per row. Also, it can't change per row. Suggestion: do the check during setup, not here.



##########
processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java:
##########
@@ -275,6 +296,66 @@ Sequence<ScanResultValue> stableLimitingSort(
     }
   }
 
+  Sequence<ScanResultValue> multiColumnSort(
+      Sequence<ScanResultValue> inputSequence,
+      ScanQuery scanQuery
+  ) throws IOException
+  {
+    //In some databases, the final result set size is set to 65535 without setting the limit. We can set the maximum value of Integer here
+    int limit;
+    if (scanQuery.getScanRowsLimit() > Integer.MAX_VALUE) {
+      limit = Integer.MAX_VALUE;
+    } else {
+      limit = Math.toIntExact(scanQuery.getScanRowsLimit());
+    }
+    // Converting the limit from long to int could theoretically throw an ArithmeticException but this branch
+    // only runs if limit < MAX_LIMIT_FOR_IN_MEMORY_TIME_ORDERING (which should be < Integer.MAX_VALUE)
+    List<String> sortColumns = scanQuery.getOrderBys()
+                                        .stream()
+                                        .map(orderBy -> orderBy.getColumnName())
+                                        .collect(Collectors.toList());
+    Sorter<ScanResultValue> sorter = new QueueBasedSorter<>(limit, scanQuery.getOrderByNoneTimeResultOrdering());
+    Yielder<ScanResultValue> yielder = Yielders.each(inputSequence);
+    try {
+      boolean doneScanning = yielder.isDone();
+      // We need to scan limit elements and anything else in the last segment
+      while (!doneScanning) {
+        ScanResultValue next = yielder.get();
+        List<ScanResultValue> singleEventScanResultValues = next.toSingleEventScanResultValues();
+        for (ScanResultValue srv : singleEventScanResultValues) {
+          // Using an intermediate unbatched ScanResultValue is not that great memory-wise, but the column list
+          // needs to be preserved for queries using the compactedList result format
+          List events = (List) (srv.getEvents());
+          for (Object event : events) {
+            List<Comparable> sortValues;
+            if (event instanceof LinkedHashMap) {
+              sortValues = sortColumns.stream()
+                                      .map(c -> ((LinkedHashMap<Object, Comparable>) event).get(c))
+                                      .collect(Collectors.toList());
+            } else {
+              sortValues = sortColumns.stream()
+                                      .map(c -> ((List<Comparable>) event).get(srv.getColumns().indexOf(c)))
+                                      .collect(Collectors.toList());
+            }
+            sorter.add(new Sorter.SorterElement<>(srv, sortValues));

Review Comment:
   I wonder, do we need the `SorterElement` class? I suspect that just having the `Object[]` array of column values, and a set of memorized column offsets, will be sufficient.
   
   Also, it looks like for each batch of events, we create a separate `SorterElement` this is OK, if we know that our sort key is `(__time, foo, bar, ...)` because the data is sorted in the segment by `__time`, so we will read rows in `__time` order. We only need to sort within the same time value for the other columns.
   
   But, if our sort key is `(foo, bar)` then we need to sort the full list, across all of the `ScanResultValue` batches returned from all the cursors for this segment.
   
   For now, I wouldn't worry about the leading-`__time` optimization, that can come later.
   
   Note that the unit tests won't uncover this issue because we have too few rows: you won't get multiple cursors or batches in unit tests, sadly.



##########
processing/src/main/java/org/apache/druid/query/scan/ScanQueryEngine.java:
##########
@@ -234,6 +234,7 @@ private Object getColumnValue(int i)
                             final Object value;
 
                             if (legacy && allColumns.get(i).equals(LEGACY_TIMESTAMP_KEY)) {
+                              Preconditions.checkNotNull(selector);

Review Comment:
   This is OK, and will throw an NPE if the selector is null. But, the very next line will be pretty good at throwing an NPT anyway if the selector is null.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a diff in pull request #13168: ScanQuery supports multi column orderBy queries

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on code in PR #13168:
URL: https://github.com/apache/druid/pull/13168#discussion_r994141629


##########
sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java:
##########
@@ -1307,7 +1308,7 @@ private ScanQuery toScanQuery()
         );
         return null;
       }
-    }
+    }*/

Review Comment:
   We should just remove this commented code. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] paul-rogers commented on pull request #13168: ScanQuery supports multi column orderBy queries

Posted by GitBox <gi...@apache.org>.
paul-rogers commented on PR #13168:
URL: https://github.com/apache/druid/pull/13168#issuecomment-1278483340

   @599166320, I had a chance to discuss this PR with @imply-cheddar, one of the Druid veterans. His suggestion is to build on the design you started with in the previous PR, using the simplifications you've developed in this one. Specifically:
   
   * The sort code should reside in `ScanQueryEngine`:  since "that is the thing that implements the scan algorithm", including the custom sorting that you are adding.
   * Use a priority queue, as you did originally, not a sort algorithm as I suggested. That is, emphasize the small-results and limit use cases. Druid is not as often used for large result sets, so the benefit of a sort-the-list approach are not as important as I'd initially thought.
   * Use the existing merges at the Historical and Broker level. Or, has he pointed out, there is only one merge, used twice. Here, it is just a matter of defining an `Ordering` with the proper comparisons. Sounds like you've already handled this task.
   
   In the scan engine, he suggests that you:
   
   * Read the individual rows.
   * Put them into a limited-sized priority queue.
   * Create batches only when you pull them out of the priority queue.
   
   The result is that `ScanQuery.process` produces a `Sequence` of `ScanQueryValue` object as today, only now they are sorted.
   
   He also notes that we have several existing priority-queue based merge implementations you can borrow. It looks like you have, in fact, already done that.
   
   We discussed the idea of pushing sorts into the cursor. @imply-cheddar, notes that it is uncommon to have sorts of the form `(__time, a, b)`. So, it is perfectly fine to assume, as you have done, that if the sort has multiple columns, or is a single column other than `__time`, that the code use your sort. No need to worry about the occasional odd multi-column sort that starts with `__time`.
   
   Since all the code is in `ScanQueryEngine`, no changes are needed to `ScanQueryRunnerFactory`. Basically, the logic we'd been discussing to put there will shift into `ScanQueryEngine` instead.
   
   These all sound like fine improvements that helps this feature be more "Druid-like." Perhaps @imply-cheddar will chime in with additional suggestions.
   
   My own suggestion is that, to limit code complexity, create two paths in `ScanQueryEngine`: the existing one and the custom sort one. Otherwise, trying to do them both in the same method could be a bit messy.
   
   Thanks for the notes earlier about the other areas. Sounds like we're in good shape there.
   
   Once you are ready, I'll take another look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] ScanQuery supports multi column orderBy queries (druid)

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #13168:
URL: https://github.com/apache/druid/pull/13168#issuecomment-1933154537

   This pull request/issue has been closed due to lack of activity. If you think that
   is incorrect, or the pull request requires review, you can revive the PR at any time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] paul-rogers commented on pull request #13168: ScanQuery supports multi column orderBy queries

Posted by GitBox <gi...@apache.org>.
paul-rogers commented on PR #13168:
URL: https://github.com/apache/druid/pull/13168#issuecomment-1279187078

   @imply-cheddar suggested some additional optimizations, though these can come later. Recall that an earlier cut of the PR attempted to do the sort in several phases to minimize the memory footprint:
   
   * Read just the sort keys into "rows" used for sorting
   * Sort the key-only rows
   * Use random seeks into the cursor to fetch the full set of columns when returning rows
   
   The idea was to read only those columns under the limit. @imply-cheddar explained that this is possible, just not using `Cursor`. It can be done using `QueryableIndex` which provides access to the individual columns and would allow random reads.
   
   The other suggestion was to exploit indexes for string columns. The notion here is that column indexes are stored sorted. Each index entry consists of a `(dictionaryId, string)` pair. Since the dictionaries are sorted, the `dictionaryId` are in the same order as the keys. A sort can thus compare `dictionaryId`s rather than the actual strings, which should increase performance.
   
   The sort code can either store only the `dictionaryId`s in the sort key, sort the data, and fetch the string later, or it can store both the `dictionaryId` and string for each row, but discard the `dictionaryId` when returning the row downstream.
   
   It is fine if a first cut at the sort just stores the strings. Because of the dictionary, the strings will be "interned", there will be only one copy of each distinct value. This means that the rows stored in the priority queue will take somewhat less memory than they would if we stored copies of each string value per row.
   
   Again, a suggestion is to get the basics working first. Then, once we can declare victory on that, there can be an optional second PR that uses the above tricks to further optimize the sort.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] lgtm-com[bot] commented on pull request #13168: ScanQuery supports multi column orderBy queries

Posted by GitBox <gi...@apache.org>.
lgtm-com[bot] commented on PR #13168:
URL: https://github.com/apache/druid/pull/13168#issuecomment-1270854861

   This pull request **fixes 1 alert** when merging 559f80a0452ebcbf352b50b52d1e85927f696ffe into f89496ccacedc01449fb8ed4e45cf2345cb3ed34 - [view on LGTM.com](https://lgtm.com/projects/g/apache/druid/rev/pr-8a69f09550467819538c423b88540c0e138d245b)
   
   **fixed alerts:**
   
   * 1 for Dereferenced variable may be null


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] paul-rogers commented on pull request #13168: ScanQuery supports multi column orderBy queries

Posted by GitBox <gi...@apache.org>.
paul-rogers commented on PR #13168:
URL: https://github.com/apache/druid/pull/13168#issuecomment-1265744160

   @599166320, thanks for the update! I'll take another look soon.
   
   I noticed that this is a new PR that replaces the existing, closed one. As it turns out, you can just reuse the same PR by pushing (or force-pushing) new commits. Reusing the same PR allows all comments to be in one place. If the PR is closed, you can reopen it and be back in business. Let's use this PR from now on for this project. Please reach out if you need help: doing this PR stuff can be tricky the first time you do it. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] paul-rogers commented on pull request #13168: ScanQuery supports multi column orderBy queries

Posted by GitBox <gi...@apache.org>.
paul-rogers commented on PR #13168:
URL: https://github.com/apache/druid/pull/13168#issuecomment-1275427976

   @599166320, you asked about the "special path" when the sort key is a prefix of the segment sort order. If this optimization does not exist today, it need not be done as part of this PR. It can be done as a follow-on refinement. I'd have to poke around in the scan query (and cursor, and storage adapter) code to see how this is handled in other query types. We don't have to get distracted here with that task.
   
   Thanks for summarizing the tasks. You noted:
   
   > Let me briefly summarize the problems you mentioned above. Based on this PR, I have two things to do next:
   >
   > Code implementation of special path(Segment by segment decision)
   >
   > Solve the problem of different data types in the same column of different segments
   >
   > Another thing: use Calcite (operator and planner based) to improve the query engine, which is what you plan to do later.
   
   This sounds right. The Calcite planner part is outside the scope of this PR (though whatever we do will use the sorter you create here.) The special path can be outside this PR if we don't already have a way to check this case, as noted above.
   
   That leaves several broad tasks:
   
   * Decide the level at which to sort. (Per-batch, per-cursor, or per-segment.)
   * Finish up the sorter for this case. The sorter can assume all values for a column have the same type since we're in a single segment. Handle the batching issues discussed earlier.
   * Finish up the merge code that is aware of the sort keys. This code should exist somewhere if we do support sorting other than just by `__time` in any query type. (The code may be specific to some other row format, however.)
   * Deal with the mixed-type issue we noted. If the above merge code exists, then it should already have a solution.
   * Test with a data set that spans multiple segments to ensure all the levels listed earlier work as expected.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] 599166320 commented on pull request #13168: ScanQuery supports multi column orderBy queries

Posted by GitBox <gi...@apache.org>.
599166320 commented on PR #13168:
URL: https://github.com/apache/druid/pull/13168#issuecomment-1273527315

   > The other thing to discuss is the ordered merge steps. As it turns out, the need to do the ordered merge is independent of how we do the sort. It doesn't matter if the cursor did the sort for us, or if we added a custom sort. In both cases, we have to generate a merge comparator that has the sort keys, with ASC/DESC sort sense, in the proper order.
   > 
   > Merging requires two implementations: one for the "list" (i.e. list of map) format, the other for the "compact list" (i.e. list of array) format. We can assume that the compact-list rows for any one segment have the same set of columns, and that map-based rows have the same keys. That is, the set of column names will be whatever the query requests. The order of columns in the compact-list form _should_ (we should check) be driven by the order that columns appear in the query.
   > 
   > But, we must anticipate that the _types_ of columns from different segments may differ. Druid allows column `c` to be a `long` in one segment, a `string` in another. I believe Druid has some rules for reconciling these type conflicts. This means that the comparator has to know how to compare `10` and `"foo"`. There may already be implementations for this in the existing comparators: we should check.
   
   Different segments do have different data types in the same column. Let me see how to solve this problem.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] lgtm-com[bot] commented on pull request #13168: ScanQuery supports multi column orderBy queries

Posted by GitBox <gi...@apache.org>.
lgtm-com[bot] commented on PR #13168:
URL: https://github.com/apache/druid/pull/13168#issuecomment-1314687494

   This pull request **introduces 1 alert** and **fixes 1** when merging e0c798354a57cca16ccd9a91c33a358e52667845 into 309cae7b65e65628f3b277808717a0c7e0bdaccf - [view on LGTM.com](https://lgtm.com/projects/g/apache/druid/rev/pr-1bf1926c97d38d8bd89ea38579304f658b9de629)
   
   **new alerts:**
   
   * 1 for Dereferenced variable may be null
   
   **fixed alerts:**
   
   * 1 for Dereferenced variable may be null
   ---
   
   **Heads-up: LGTM.com's PR analysis will be disabled on the 5th of December**, and LGTM.com will be shut down ⏻ completely on the 16th of December 2022. Please enable GitHub code scanning, which uses the same CodeQL engine :gear: that powers LGTM.com. For more information, please check out [our post on the GitHub blog](https://github.blog/2022-08-15-the-next-step-for-lgtm-com-github-code-scanning/).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] 599166320 commented on pull request #13168: ScanQuery supports multi column orderBy queries

Posted by GitBox <gi...@apache.org>.
599166320 commented on PR #13168:
URL: https://github.com/apache/druid/pull/13168#issuecomment-1314044310

   @paul-rogers Sorry, I have been busy with other things recently. 
   
   Now, I have preliminarily completed the improvement of dictionary Id sorting and rowId random access.
   
   If you have time, please review it for me to see what can be improved.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] 599166320 commented on a diff in pull request #13168: ScanQuery supports multi column orderBy queries

Posted by GitBox <gi...@apache.org>.
599166320 commented on code in PR #13168:
URL: https://github.com/apache/druid/pull/13168#discussion_r1048216373


##########
processing/src/main/java/org/apache/druid/query/scan/ScanQueryOrderBySequence.java:
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.scan;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.common.guava.GuavaUtils;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.guava.BaseSequence;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.query.AbstractPrioritizedQueryRunnerCallable;
+import org.apache.druid.query.QueryContext;
+import org.apache.druid.query.QueryInterruptedException;
+import org.apache.druid.query.QueryPlus;
+import org.apache.druid.query.QueryProcessingPool;
+import org.apache.druid.query.QueryRunner;
+import org.apache.druid.query.QueryTimeoutException;
+import org.apache.druid.query.context.ResponseContext;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Scan the segments in parallel, complete the sorting of each batch within each segment, and then complete the sorting of each segment level
+ */

Review Comment:
   
   I checked it by printing the log:
   
   ```
   2022-12-14T07:34:57,759 INFO [qtp1231864343-87] org.apache.druid.server.QueryResource - ThreadId:87,ThreadName:qtp1231864343-87
   ...
   2022-12-14T07:35:03,791 INFO [scan_wikipedia_[2016-06-27T00:00:00.000Z/2016-06-27T01:00:00.000Z]] org.apache.druid.query.scan.ScanQueryEngine - ThreadId:87,ThreadName:scan_wikipedia_[2016-06-27T00:00:00.000Z/2016-06-27T01:00:00.000Z]
   2022-12-14T07:35:03,807 INFO [scan_wikipedia_[2016-06-27T01:00:00.000Z/2016-06-27T02:00:00.000Z]] org.apache.druid.query.scan.ScanQueryEngine - ThreadId:87,ThreadName:scan_wikipedia_[2016-06-27T01:00:00.000Z/2016-06-27T02:00:00.000Z]
   2022-12-14T07:35:03,849 INFO [scan_wikipedia_[2016-06-27T02:00:00.000Z/2016-06-27T03:00:00.000Z]] org.apache.druid.query.scan.ScanQueryEngine - ThreadId:87,ThreadName:scan_wikipedia_[2016-06-27T02:00:00.000Z/2016-06-27T03:00:00.000Z]
   2022-12-14T07:35:03,854 INFO [scan_wikipedia_[2016-06-27T03:00:00.000Z/2016-06-27T04:00:00.000Z]] org.apache.druid.query.scan.ScanQueryEngine - ThreadId:87,ThreadName:scan_wikipedia_[2016-06-27T03:00:00.000Z/2016-06-27T04:00:00.000Z]
   2022-12-14T07:35:03,861 INFO [scan_wikipedia_[2016-06-27T04:00:00.000Z/2016-06-27T05:00:00.000Z]] org.apache.druid.query.scan.ScanQueryEngine - ThreadId:87,ThreadName:scan_wikipedia_[2016-06-27T04:00:00.000Z/2016-06-27T05:00:00.000Z]
   2022-12-14T07:35:03,881 INFO [scan_wikipedia_[2016-06-27T05:00:00.000Z/2016-06-27T06:00:00.000Z]] org.apache.druid.query.scan.ScanQueryEngine - ThreadId:87,ThreadName:scan_wikipedia_[2016-06-27T05:00:00.000Z/2016-06-27T06:00:00.000Z]
   2022-12-14T07:35:03,888 INFO [scan_wikipedia_[2016-06-27T06:00:00.000Z/2016-06-27T07:00:00.000Z]] org.apache.druid.query.scan.ScanQueryEngine - ThreadId:87,ThreadName:scan_wikipedia_[2016-06-27T06:00:00.000Z/2016-06-27T07:00:00.000Z]
   2022-12-14T07:35:03,896 INFO [scan_wikipedia_[2016-06-27T07:00:00.000Z/2016-06-27T08:00:00.000Z]] org.apache.druid.query.scan.ScanQueryEngine - ThreadId:87,ThreadName:scan_wikipedia_[2016-06-27T07:00:00.000Z/2016-06-27T08:00:00.000Z]
   2022-12-14T07:35:03,909 INFO [scan_wikipedia_[2016-06-27T08:00:00.000Z/2016-06-27T09:00:00.000Z]] org.apache.druid.query.scan.ScanQueryEngine - ThreadId:87,ThreadName:scan_wikipedia_[2016-06-27T08:00:00.000Z/2016-06-27T09:00:00.000Z]
   2022-12-14T07:35:03,919 INFO [scan_wikipedia_[2016-06-27T09:00:00.000Z/2016-06-27T10:00:00.000Z]] org.apache.druid.query.scan.ScanQueryEngine - ThreadId:87,ThreadName:scan_wikipedia_[2016-06-27T09:00:00.000Z/2016-06-27T10:00:00.000Z]
   2022-12-14T07:35:03,927 INFO [scan_wikipedia_[2016-06-27T10:00:00.000Z/2016-06-27T11:00:00.000Z]] org.apache.druid.query.scan.ScanQueryEngine - ThreadId:87,ThreadName:scan_wikipedia_[2016-06-27T10:00:00.000Z/2016-06-27T11:00:00.000Z]
   2022-12-14T07:35:03,932 INFO [scan_wikipedia_[2016-06-27T11:00:00.000Z/2016-06-27T12:00:00.000Z]] org.apache.druid.query.scan.ScanQueryEngine - ThreadId:87,ThreadName:scan_wikipedia_[2016-06-27T11:00:00.000Z/2016-06-27T12:00:00.000Z]
   2022-12-14T07:35:03,936 INFO [scan_wikipedia_[2016-06-27T12:00:00.000Z/2016-06-27T13:00:00.000Z]] org.apache.druid.query.scan.ScanQueryEngine - ThreadId:87,ThreadName:scan_wikipedia_[2016-06-27T12:00:00.000Z/2016-06-27T13:00:00.000Z]
   2022-12-14T07:35:03,939 INFO [scan_wikipedia_[2016-06-27T13:00:00.000Z/2016-06-27T14:00:00.000Z]] org.apache.druid.query.scan.ScanQueryEngine - ThreadId:87,ThreadName:scan_wikipedia_[2016-06-27T13:00:00.000Z/2016-06-27T14:00:00.000Z]
   2022-12-14T07:35:03,951 INFO [scan_wikipedia_[2016-06-27T14:00:00.000Z/2016-06-27T15:00:00.000Z]] org.apache.druid.query.scan.ScanQueryEngine - ThreadId:87,ThreadName:scan_wikipedia_[2016-06-27T14:00:00.000Z/2016-06-27T15:00:00.000Z]
   2022-12-14T07:35:03,955 INFO [scan_wikipedia_[2016-06-27T15:00:00.000Z/2016-06-27T16:00:00.000Z]] org.apache.druid.query.scan.ScanQueryEngine - ThreadId:87,ThreadName:scan_wikipedia_[2016-06-27T15:00:00.000Z/2016-06-27T16:00:00.000Z]
   2022-12-14T07:35:03,975 INFO [scan_wikipedia_[2016-06-27T16:00:00.000Z/2016-06-27T17:00:00.000Z]] org.apache.druid.query.scan.ScanQueryEngine - ThreadId:87,ThreadName:scan_wikipedia_[2016-06-27T16:00:00.000Z/2016-06-27T17:00:00.000Z]
   2022-12-14T07:35:03,984 INFO [scan_wikipedia_[2016-06-27T17:00:00.000Z/2016-06-27T18:00:00.000Z]] org.apache.druid.query.scan.ScanQueryEngine - ThreadId:87,ThreadName:scan_wikipedia_[2016-06-27T17:00:00.000Z/2016-06-27T18:00:00.000Z]
   2022-12-14T07:35:03,994 INFO [scan_wikipedia_[2016-06-27T18:00:00.000Z/2016-06-27T19:00:00.000Z]] org.apache.druid.query.scan.ScanQueryEngine - ThreadId:87,ThreadName:scan_wikipedia_[2016-06-27T18:00:00.000Z/2016-06-27T19:00:00.000Z]
   2022-12-14T07:35:03,998 INFO [scan_wikipedia_[2016-06-27T19:00:00.000Z/2016-06-27T20:00:00.000Z]] org.apache.druid.query.scan.ScanQueryEngine 
   ```
   
   It can be seen from the above logs that the ThreadId of scanQuery is always 87. It can be proved that even if multiple segments are queried, scanQuery is executed by a single thread.
   
   
   For this improved version, I refer to the parallelism model of `TopNQuery`. I will briefly describe this parallelism model:
   
   When the client submits a query, druid will be divided into multiple sub query tasks according to the granularity of the segment, and each sub query task corresponds to a segment. When the query is actually executed, the sub query task will be placed in the queue of the `queryProcessingPool`. The thread pool will have multiple threads computing the result set in parallel. After each sub query is executed, its own computing results will be pushed to the priority queue, and the priority queue will merge the result sets of multiple sub queries. The resource usage in the whole calculation process is controlled by the queryProcessingPool and limit parameters. As long as it is properly(`druid.processing.numThreads`) configured, I think it can avoid excessive memory usage,frequent thread creation and thread competition.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] lgtm-com[bot] commented on pull request #13168: ScanQuery supports multi column orderBy queries

Posted by GitBox <gi...@apache.org>.
lgtm-com[bot] commented on PR #13168:
URL: https://github.com/apache/druid/pull/13168#issuecomment-1318089790

   This pull request **introduces 1 alert** and **fixes 1** when merging d5406c0fe201172b5966558c3e1d3dacb6434ebc into 78d0b0abce2b27b88a53790684d81530a2318fc9 - [view on LGTM.com](https://lgtm.com/projects/g/apache/druid/rev/pr-aca96251312d1591cf12afe80480832d1e7f8e85)
   
   **new alerts:**
   
   * 1 for Dereferenced variable may be null
   
   **fixed alerts:**
   
   * 1 for Dereferenced variable may be null
   ---
   
   **Heads-up: LGTM.com's PR analysis will be disabled on the 5th of December**, and LGTM.com will be shut down ⏻ completely on the 16th of December 2022. Please enable GitHub code scanning, which uses the same CodeQL engine :gear: that powers LGTM.com. For more information, please check out [our post on the GitHub blog](https://github.blog/2022-08-15-the-next-step-for-lgtm-com-github-code-scanning/).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] 599166320 commented on a diff in pull request #13168: ScanQuery supports multi column orderBy queries

Posted by GitBox <gi...@apache.org>.
599166320 commented on code in PR #13168:
URL: https://github.com/apache/druid/pull/13168#discussion_r985289315


##########
processing/src/main/java/org/apache/druid/query/scan/OrderByQueryRunner.java:
##########
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.scan;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Sets;
+import org.apache.druid.collections.MultiColumnSorter;
+import org.apache.druid.collections.QueueBasedMultiColumnSorter;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.JodaUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.guava.Comparators;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.guava.Sequences;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryContexts;
+import org.apache.druid.query.QueryMetrics;
+import org.apache.druid.query.QueryPlus;
+import org.apache.druid.query.QueryRunner;
+import org.apache.druid.query.context.ResponseContext;
+import org.apache.druid.query.filter.Filter;
+import org.apache.druid.segment.Cursor;
+import org.apache.druid.segment.Segment;
+import org.apache.druid.segment.StorageAdapter;
+import org.apache.druid.segment.VirtualColumn;
+import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.segment.filter.Filters;
+import org.apache.druid.timeline.SegmentId;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+class OrderByQueryRunner implements QueryRunner<ScanResultValue>

Review Comment:
   When using these `QueryRunner/Sequence` pair (`ListBasedOrderByQueryRunner/ListBasedSorterSequence`, `TreeMultisetOrderByQueryRunner/TreeMultisetBasedSorterSequence`), the `sort ->merge ->limit` positions are separated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] paul-rogers commented on pull request #13168: ScanQuery supports multi column orderBy queries

Posted by GitBox <gi...@apache.org>.
paul-rogers commented on PR #13168:
URL: https://github.com/apache/druid/pull/13168#issuecomment-1275431852

   On the mixed-type issue:
   
   > You can use CAST function to convert to a uniform type.
   
   Indeed. This is the solution that the Catalog project proposes. Rather than do the cast in each and every query, tell the catalog the preferred type and the cast will be done automagically.
   
   So, for the purposes of this PR, we can require that all column values are of a single type. Check if the type varies, and if so, throw an exception. The user works around the issue with the cast (or, later, with a catalog entry.) If Druid has some magic, that can be added as a follow-up.
   
   One way to check that the schemas are identical is to check if the schema of the next batch to arrive at the merger is the same as the current batch. And, when we emit values from the merge, we use that same schema. Of course, we actually only care about the types of the sort key columns, so we an be a bit more lenient and only compare the key column types.
   
   A side note about the "type conflict magic". In other projects, we failed to find the magic. If you have `VARCHAR` and `BIGINT` which type wins? What if you saw 10K of the `VARCHAR` rows before the first `BIGINT` row? Or visa-versa? Given this, the solution that requires the user to break the tie is reasonable.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] paul-rogers commented on pull request #13168: ScanQuery supports multi column orderBy queries

Posted by GitBox <gi...@apache.org>.
paul-rogers commented on PR #13168:
URL: https://github.com/apache/druid/pull/13168#issuecomment-1272688227

   The other thing to discuss is the ordered merge steps. As it turns out, the need to do the ordered merge is independent of how we do the sort. It doesn't matter if the cursor did the sort for us, or if we added a custom sort. In both cases, we have to generate a merge comparator that has the sort keys, with ASC/DESC sort sense, in the proper order.
   
   Merging requires two implementations: one for the "list" (i.e. list of map) format, the other for the "compact list" (i.e. list of array) format. We can assume that the compact-list rows for any one segment have the same set of columns, and that map-based rows have the same keys. That is, the set of column names will be whatever the query requests. The order of columns in the compact-list form _should_ (we should check) be driven by the order that columns appear in the query.
   
   But, we must anticipate that the _types_ of columns from different segments may differ. Druid allows column `c` to be a `long` in one segment, a `string` in another. I believe Druid has some rules for reconciling these type conflicts. This means that the comparator has to know how to compare `10` and `"foo"`. There may already be implementations for this in the existing comparators: we should check.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] paul-rogers commented on pull request #13168: ScanQuery supports multi column orderBy queries

Posted by GitBox <gi...@apache.org>.
paul-rogers commented on PR #13168:
URL: https://github.com/apache/druid/pull/13168#issuecomment-1272673941

   @599166320, you've taken on a helpful improvement in a complex area. Would be great if we could be in the same room and use a whiteboard to achieve a common understanding of how Druid works in this area, and the design of your improvement. The next best thing is for me to state the assumptions which I have, so we can see if we have that shared understanding.
   
   We need sorting to fit into the existing scan query structure. Here's my understanding of the gist of a scan query "logical plan", from the root (client) down to the leaf (scan):
   
   ```text
   1. Project scan rows into SQL rows.
   2. Unbatch `ScanResultValue` batches into rows.
   3. Broker: merge results from h historicals into a single sequence
   4. Historical: merge results from s segments into a single sequence
   5. Segment: merge results from c cursors into a single sequence
   6. Cursor: read r rows into b batches each of size < ~20K rows, so that b = ceil(r / 20K).
   ```
   
   If our query has no sort order, the results are unordered. In SQL, this means that the order is non-deterministic. So, the easy way to do the above merges is just a first-in-first out concatenation.
   
   In Druid, segments are always sorted by time. So, if our query is ordered by `__time__, then the logical plan would be:
   
   ```text
   3. Broker: ordered merge of results from h historicals: merging the rows from `ScanResultValue` batches to create new batches
   4. Historical: as above, for the s segments
   5. Segment: concatenate the results from the c cursor since, I believe, the cursors are already time-ordered
   6. Cursor: read r rows which are time ordered
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] 599166320 commented on pull request #13168: ScanQuery supports multi column orderBy queries

Posted by GitBox <gi...@apache.org>.
599166320 commented on PR #13168:
URL: https://github.com/apache/druid/pull/13168#issuecomment-1279276589

   @paul-rogers @imply-cheddar 
   Thank you very much for your constructive ideas. Our thinking in this area has been further improved. I'm making some code adjustments based on your ideas, which should be completed soon.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] 599166320 commented on a diff in pull request #13168: ScanQuery supports multi column orderBy queries

Posted by GitBox <gi...@apache.org>.
599166320 commented on code in PR #13168:
URL: https://github.com/apache/druid/pull/13168#discussion_r994696868


##########
sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java:
##########
@@ -1307,7 +1308,7 @@ private ScanQuery toScanQuery()
         );
         return null;
       }
-    }
+    }*/

Review Comment:
   I have removed it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] 599166320 commented on pull request #13168: ScanQuery supports multi column orderBy queries

Posted by GitBox <gi...@apache.org>.
599166320 commented on PR #13168:
URL: https://github.com/apache/druid/pull/13168#issuecomment-1277705153

   
   @paul-rogers 
   
   
   Now these tasks should have been completed, please help me check if I am missing something?
   
   **1.Decide the level at which to sort. (Per-batch, per-cursor, or per-segment.)**
   
   _`ScanQueryOrderByRunner` and `ScanQueryOrderBySequence `should have done each batch-level sorting and segment-level sorting._
   
   **2.Finish up the sorter for this case.** 
   
   _`QueueBasedSorter` should have the characteristics of the sorter you describe._
   
   **3.Finish up the merge code**
   
   _`ScanQueryOrderByMergeRowIterator` is applied to both historical-level and broker-level merges_
   
   **4.Test with a data set that spans multiple segments & cursors to ensure all the levels listed earlier work as expected.**
   
   _This test class `ScanQueryResultOrderingTest` should already satisfy the test needs you describe_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] lgtm-com[bot] commented on pull request #13168: ScanQuery supports multi column orderBy queries

Posted by GitBox <gi...@apache.org>.
lgtm-com[bot] commented on PR #13168:
URL: https://github.com/apache/druid/pull/13168#issuecomment-1277790112

   This pull request **fixes 1 alert** when merging cfbb668c1f94898944e54aee7a5b0699905dad15 into 45dfd679e92e668172c470c37615e7447b601af1 - [view on LGTM.com](https://lgtm.com/projects/g/apache/druid/rev/pr-68594e99c31ce27bbd85d1acd3be51a7b7f5b67f)
   
   **fixed alerts:**
   
   * 1 for Dereferenced variable may be null


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] imply-cheddar commented on pull request #13168: ScanQuery supports multi column orderBy queries

Posted by GitBox <gi...@apache.org>.
imply-cheddar commented on PR #13168:
URL: https://github.com/apache/druid/pull/13168#issuecomment-1278501898

   Thanks @paul-rogers for the write up of our discussion.  I do still think that some changes will be warranted in the `QueryRunnerFactory` just fewer of them.  Specific to `QueryRunnerFactory` the code before this PR does a lot of stuff to organize the `QueryRunners` in time-interval order in order to short-circuit logic.  That logic only makes sense when the sort starts with `time`.  The key, though, is that the merge logic can assume that it is working with results from the segments that are merged.
   
   To Paul's point, when we've decided which algorithm to use, we can be in a code path that only does that one thing.  So, one suggestion would be to maybe add a new method: `ScanQueryEngine.processWithMultiColumnSort`.  This method will assume that it was called because you want a multi-column sort and would then return a set of things that are sorted "properly".
   
   The primary complexity of my suggestion of building batches of pre-sorted rows is that if the query actually does want to scan all of the rows of the segment (i.e. there is no limit applied and the filters match millions), you will end up doing multiple passes over the segment (each pass would generate the next set of values).  This is an acceptable trade-off as it's okay for these types of queries to be slow (i.e. if you have a data set sorted by X and need it sorted by Y and it's billions of rows, sheer physics is going to dictate that the query is not necessarily going to be the fastest thing).
   
   In order to implement this, you will need to be able to keep track of the previous end point.  You will also need to "break ties" by using the rowId so that if there are multiple rows that share the same values for the sort keys, you can deterministically know which ones to include in which batch.
   
   Hopefully this all makes sense...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] 599166320 commented on pull request #13168: ScanQuery supports multi column orderBy queries

Posted by GitBox <gi...@apache.org>.
599166320 commented on PR #13168:
URL: https://github.com/apache/druid/pull/13168#issuecomment-1273490010

   > 
   The query with `__time` as the prefix and the sorting direction is the same as `__time`, as shown below:
   
   
   ```
   order by __time,a,b,c
   order by __time desc,a desc,b desc
   
   ```
   
   In the above two cases, I think we can consider taking a special path. The special path here is the `Segment by segment decision` you mentioned above
   
   At present, I think of two ways to improve:
   
   1. It is up to the user to decide whether to run a special path by passing parameters in the query context
   
   2. According to the ingestion specs `dimensionsSpec`, let `druid` automatically decide to take a special path.
   
   For example:
   
   ```
   dimensionsSpec{dimensions:[a,b,c,d,e,f,....]}
   ```
   
   If the user's `ingestion specs` is configured as above, does druid sort the ingested data? If sorting already exists, the following queries can be run on special paths:
   
   ```
   order by __time,a,b,c
   order by __time desc,a desc,b desc
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] 599166320 commented on a diff in pull request #13168: ScanQuery supports multi column orderBy queries

Posted by GitBox <gi...@apache.org>.
599166320 commented on code in PR #13168:
URL: https://github.com/apache/druid/pull/13168#discussion_r990634715


##########
processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java:
##########
@@ -90,13 +92,27 @@ public QueryRunner<ScanResultValue> mergeRunners(
     // in single thread and in Jetty thread instead of processing thread
     return (queryPlus, responseContext) -> {
       ScanQuery query = (ScanQuery) queryPlus.getQuery();
-      ScanQuery.verifyOrderByForNativeExecution(query);
 
       // Note: this variable is effective only when queryContext has a timeout.
       // See the comment of ResponseContext.Key.TIMEOUT_AT.
       final long timeoutAt = System.currentTimeMillis() + QueryContexts.getTimeout(queryPlus.getQuery());
       responseContext.putTimeoutTime(timeoutAt);
 
+      if (query.scanOrderByNonTime()) {

Review Comment:
   ScanOrderByNonTimeb is used by ScanQueryRunnerFactory and ScanQueryQueryToolChest The mergeResults is used to. At present, I don't think of a good solution. I put it in ScanQuery temporarily. If there is a more appropriate modification, please let me know.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] paul-rogers commented on a diff in pull request #13168: ScanQuery supports multi column orderBy queries

Posted by GitBox <gi...@apache.org>.
paul-rogers commented on code in PR #13168:
URL: https://github.com/apache/druid/pull/13168#discussion_r990525034


##########
processing/src/main/java/org/apache/druid/query/scan/ScanQueryEngine.java:
##########
@@ -234,6 +234,7 @@ private Object getColumnValue(int i)
                             final Object value;
 
                             if (legacy && allColumns.get(i).equals(LEGACY_TIMESTAMP_KEY)) {
+                              Preconditions.checkNotNull(selector);

Review Comment:
   This is OK, and will throw an NPE if the selector is null. But, the very next line will be pretty good at throwing an NPE anyway if the selector is null.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] lgtm-com[bot] commented on pull request #13168: ScanQuery supports multi column orderBy queries

Posted by GitBox <gi...@apache.org>.
lgtm-com[bot] commented on PR #13168:
URL: https://github.com/apache/druid/pull/13168#issuecomment-1314182407

   This pull request **introduces 1 alert** and **fixes 1** when merging c5b33d0d61b27e28d58a3a4c51b7e94b960e1b3c into a3edda3b634d00942f8729930dd45e2dd6b2eaff - [view on LGTM.com](https://lgtm.com/projects/g/apache/druid/rev/pr-881b918fbdbac8c69b6e86ed68172d8777df58db)
   
   **new alerts:**
   
   * 1 for Dereferenced variable may be null
   
   **fixed alerts:**
   
   * 1 for Dereferenced variable may be null
   ---
   
   **Heads-up: LGTM.com's PR analysis will be disabled on the 5th of December**, and LGTM.com will be shut down ⏻ completely on the 16th of December 2022. Please enable GitHub code scanning, which uses the same CodeQL engine :gear: that powers LGTM.com. For more information, please check out [our post on the GitHub blog](https://github.blog/2022-08-15-the-next-step-for-lgtm-com-github-code-scanning/).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] github-code-scanning[bot] commented on a diff in pull request #13168: ScanQuery supports multi column orderBy queries

Posted by GitBox <gi...@apache.org>.
github-code-scanning[bot] commented on code in PR #13168:
URL: https://github.com/apache/druid/pull/13168#discussion_r1056344932


##########
processing/src/main/java/org/apache/druid/collections/QueueBasedSorter.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.collections;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.MinMaxPriorityQueue;
+import com.google.common.collect.Ordering;
+import org.apache.druid.java.util.common.ISE;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * This sorter is applicable to two cases:
+ * 1.Result Set Merge
+ * 2.Sort the internal data of segment in the way of delayed materialization
+ */
+public class QueueBasedSorter<T> implements Sorter<T>
+{
+
+  private final MinMaxPriorityQueue<T[]> queue;
+
+  public QueueBasedSorter(int limit, Comparator<T[]> comparator)
+  {
+    this.queue = MinMaxPriorityQueue
+        .orderedBy(Ordering.from(comparator))
+        .maximumSize(limit)
+        .create();
+  }
+
+  public QueueBasedSorter(int limit, Ordering<T[]> ordering)
+  {
+    this.queue = MinMaxPriorityQueue
+        .orderedBy(ordering)
+        .maximumSize(limit)
+        .create();
+  }
+
+  @Override
+  public void add(T[] sorterElement)
+  {
+    try {
+      queue.offer(sorterElement);

Review Comment:
   ## Ignored error status of call
   
   Method add ignores exceptional return value of MinMaxPriorityQueue<T\[\]>.offer.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/3583)



##########
processing/src/test/java/org/apache/druid/query/scan/ScanQueryResultOrderingTest.java:
##########
@@ -303,6 +307,290 @@
     );
   }
 
+  @Test
+  public void testOrderIdAscending()
+  {
+    assertOrderByIdResultsEquals(
+        Druids.newScanQueryBuilder()
+              .dataSource("ds")
+              .intervals(new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2000/P1D"))))
+              .columns(ColumnHolder.TIME_COLUMN_NAME, ID_COLUMN)
+              .orderBy(ImmutableList.of(new ScanQuery.OrderBy(ID_COLUMN, ScanQuery.Order.ASCENDING)))
+              .build(),
+        ImmutableList.of(
+            7,
+            8,
+            9,
+            12,
+            22,
+            80,
+            101,
+            111,
+            222,
+            232,
+            333,
+            383,
+            411,
+            444,
+            555,
+            777,
+            808,
+            888,
+            999
+        )
+    );
+  }
+
+  @Test
+  public void testOrderDescending()
+  {
+    assertOrderByIdResultsEquals(
+        Druids.newScanQueryBuilder()
+              .dataSource("ds")
+              .intervals(new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2000/P1D"))))
+              .columns(ColumnHolder.TIME_COLUMN_NAME, ID_COLUMN)
+              .orderBy(ImmutableList.of(new ScanQuery.OrderBy(ID_COLUMN, ScanQuery.Order.DESCENDING)))
+              .build(),
+        ImmutableList.of(
+            999,
+            888,
+            808,
+            777,
+            555,
+            444,
+            411,
+            383,
+            333,
+            232,
+            222,
+            111,
+            101,
+            80,
+            22,
+            12,
+            9,
+            8,
+            7
+        )
+    );
+  }
+
+  @Test
+  public void testMultiColumnSort()
+  {
+    assertOrderByMultiColumnResultsEquals(
+        Druids.newScanQueryBuilder()
+              .dataSource("ds")
+              .intervals(new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2000/P1D"))))
+              .columns(ColumnHolder.TIME_COLUMN_NAME, ID_COLUMN, CODE_COLUMN)
+              .orderBy(ImmutableList.of(new ScanQuery.OrderBy(CODE_COLUMN, ScanQuery.Order.ASCENDING),
+                                        new ScanQuery.OrderBy(ID_COLUMN, ScanQuery.Order.DESCENDING)))
+              .build(),
+        ImmutableList.of(
+            new Object[]{999, "200"},
+            new Object[]{888, "200"},
+            new Object[]{383, "200"},
+            new Object[]{101, "200"},
+            new Object[]{8, "200"},
+            new Object[]{7, "200"},
+            new Object[]{444, "201"},
+            new Object[]{411, "201"},
+            new Object[]{80, "201"},
+            new Object[]{232, "206"},
+            new Object[]{12, "300"},
+            new Object[]{9, "400"},
+            new Object[]{555, "403"},
+            new Object[]{808, "404"},
+            new Object[]{111, "404"},
+            new Object[]{333, "500"},
+            new Object[]{22, "500"},
+            new Object[]{777, "501"},
+            new Object[]{222, "503"}
+        )
+    );
+  }
+
+  private void assertOrderByMultiColumnResultsEquals(final ScanQuery query, final List<Object[]> expectedResults)
+  {
+
+    final List<List<Pair<SegmentId, QueryRunner<ScanResultValue>>>> serverRunners = new ArrayList<>();
+    for (int i = 0; i <= segmentToServerMap.stream().max(Comparator.naturalOrder()).orElse(0); i++) {
+      serverRunners.add(new ArrayList<>());
+    }
+
+    for (int segmentNumber = 0; segmentNumber < segmentToServerMap.size(); segmentNumber++) {
+      final SegmentId segmentId = SEGMENTS.get(segmentNumber).getId();
+      final int serverNumber = segmentToServerMap.get(segmentNumber);
+
+      serverRunners.get(serverNumber).add(Pair.of(segmentId, segmentRunners.get(segmentNumber)));
+    }
+
+    // Simulates what the Historical servers would do.
+    final List<QueryRunner<ScanResultValue>> mergedServerRunners =
+        serverRunners.stream()
+                     .filter(runners -> !runners.isEmpty())
+                     .map(
+                         runners ->
+                             queryRunnerFactory.getToolchest().mergeResults(
+                                 new QueryRunner<ScanResultValue>()
+                                 {
+                                   @Override
+                                   public Sequence<ScanResultValue> run(
+                                       final QueryPlus<ScanResultValue> queryPlus,
+                                       final ResponseContext responseContext
+                                   )
+                                   {
+                                     return queryRunnerFactory.mergeRunners(
+                                         Execs.directExecutor(),
+                                         runners.stream().map(p -> p.rhs).collect(Collectors.toList())
+                                     ).run(
+                                         queryPlus.withQuery(
+                                             queryPlus.getQuery()
+                                                      .withQuerySegmentSpec(
+                                                          new MultipleSpecificSegmentSpec(
+                                                              runners.stream()
+                                                                     .map(p -> p.lhs.toDescriptor())
+                                                                     .collect(Collectors.toList())
+                                                          )
+                                                      )
+                                         ),
+                                         responseContext
+                                     );
+                                   }
+                                 }
+                             )
+                     )
+                     .collect(Collectors.toList());
+
+    // Simulates what the Broker would do.
+    final QueryRunner<ScanResultValue> brokerRunner = queryRunnerFactory.getToolchest().mergeResults(
+        (queryPlus, responseContext) -> {
+          final List<Sequence<ScanResultValue>> sequences =
+              mergedServerRunners.stream()
+                                 .map(runner -> runner.run(queryPlus.withoutThreadUnsafeState()))
+                                 .collect(Collectors.toList());
+
+          return new MergeSequence<>(
+              queryPlus.getQuery().getResultOrdering(),
+              Sequences.simple(sequences)
+          );
+        }
+    );
+
+    // Finally: run the query.
+    final List<Object[]> results = runMultiColumnQuery(
+        (ScanQuery) Druids.ScanQueryBuilder.copy(query)
+                                           .limit(limit <= 0 ? 10 : limit)
+                                           .batchSize(batchSize)
+                                           .build()
+                                           .withOverriddenContext(
+                                               ImmutableMap.of(
+                                                   ScanQueryConfig.CTX_KEY_MAX_ROWS_QUEUED_FOR_ORDERING,
+                                                   maxRowsQueuedForOrdering
+                                               )
+                                           ),
+        brokerRunner
+    );
+
+    Assert.assertEquals(
+        expectedResults.stream().limit(limit <= 0 ? 10 : limit).map(rs -> rs[0]).collect(Collectors.toList()),
+        results.stream().map(rs -> rs[0]).collect(Collectors.toList())
+    );
+
+    Assert.assertEquals(
+        expectedResults.stream().limit(limit <= 0 ? 10 : limit).map(rs -> rs[1]).collect(Collectors.toList()),
+        results.stream().map(rs -> rs[1]).collect(Collectors.toList())
+    );
+  }
+
+  private void assertOrderByIdResultsEquals(final ScanQuery query, final List<Integer> expectedResults)
+  {
+
+    final List<List<Pair<SegmentId, QueryRunner<ScanResultValue>>>> serverRunners = new ArrayList<>();
+    for (int i = 0; i <= segmentToServerMap.stream().max(Comparator.naturalOrder()).orElse(0); i++) {
+      serverRunners.add(new ArrayList<>());
+    }
+
+    for (int segmentNumber = 0; segmentNumber < segmentToServerMap.size(); segmentNumber++) {
+      final SegmentId segmentId = SEGMENTS.get(segmentNumber).getId();
+      final int serverNumber = segmentToServerMap.get(segmentNumber);
+
+      serverRunners.get(serverNumber).add(Pair.of(segmentId, segmentRunners.get(segmentNumber)));
+    }
+
+    // Simulates what the Historical servers would do.
+    final List<QueryRunner<ScanResultValue>> mergedServerRunners =
+        serverRunners.stream()
+                     .filter(runners -> !runners.isEmpty())
+                     .map(
+                         runners ->
+                             queryRunnerFactory.getToolchest().mergeResults(
+                                 new QueryRunner<ScanResultValue>()
+                                 {
+                                   @Override
+                                   public Sequence<ScanResultValue> run(
+                                       final QueryPlus<ScanResultValue> queryPlus,
+                                       final ResponseContext responseContext
+                                   )
+                                   {
+                                     return queryRunnerFactory.mergeRunners(
+                                         Execs.directExecutor(),
+                                         runners.stream().map(p -> p.rhs).collect(Collectors.toList())
+                                     ).run(

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [QueryRunnerFactory.mergeRunners](1) should be avoided because it has been deprecated.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/3580)



##########
processing/src/test/java/org/apache/druid/collections/SorterTests.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.collections;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.guava.Comparators;
+import org.apache.druid.query.scan.ScanQuery;
+
+import javax.annotation.Nonnull;
+import java.util.Comparator;
+import java.util.List;
+
+public class SorterTests
+{
+
+  public void addData(Sorter<Integer> sorter, Integer[] datas)
+  {
+    sorter.add(datas);
+  }
+
+  @Nonnull
+  protected Comparator<Comparable[]> getMultiColumnSorterElementComparator(
+      List<String> orderByDirection,
+      List<Integer> orderByIdxs
+  )
+  {
+    Comparator<Comparable[]> comparator = (o1, o2) -> {
+      for (int i = 0; i < orderByIdxs.size(); i++) {
+        int compare;
+        if (ScanQuery.Order.ASCENDING.equals(ScanQuery.Order.fromString(orderByDirection.get(i)))) {
+          compare = Comparators.<Comparable>naturalNullsFirst()
+                               .compare(o1[orderByIdxs.get(i)], o2[orderByIdxs.get(i)]);
+        } else {
+          compare = Comparators.<Comparable>naturalNullsFirst()
+                               .compare(o2[orderByIdxs.get(i)], o1[orderByIdxs.get(i)]);
+        }
+        if (compare != 0) {
+          return compare;
+        }
+      }
+      return 0;
+    };
+    return comparator;
+  }
+
+  private Comparable[] getDatas(Comparable... datas)
+  {
+    return datas;
+  }
+
+  protected void singleColumnAscSortDatas(Sorter<Object> sorter, List<Integer> expectedValues)
+  {
+    sorter.add(getDatas(1, 1));
+    sorter.add(getDatas(2, 2));
+    sorter.add(getDatas(3, 3));
+    sorter.add(getDatas(4, 4));
+    sorter.add(getDatas(5, 5));
+    sorter.add(getDatas(6, 7));
+    sorter.add(getDatas(7, 8));
+    sorter.add(getDatas(1, 9));
+    sorter.add(getDatas(100, 0));
+    sorter.add(getDatas(1, 1));
+    sorter.add(getDatas(1, 3));
+    sorter.add(getDatas(9, 6));
+    sorter.add(getDatas(11, 6));
+
+    expectedValues.addAll(ImmutableList.of(100, 1, 2, 3, 4));
+  }
+
+  protected void singleColumnAscSortNaturalNullsFirstDatas(Sorter<Object> sorter, List<Integer> expectedValues)
+  {
+
+    sorter.add(getDatas(1, 1));
+    sorter.add(getDatas(2, 2));
+    sorter.add(getDatas(3, 3));
+    sorter.add(getDatas(4, 4));
+    sorter.add(getDatas(5, 5));
+    sorter.add(getDatas(6, 7));
+    sorter.add(getDatas(7, 8));
+    sorter.add(getDatas(1, 9));
+    sorter.add(getDatas(100, null));
+    sorter.add(getDatas(1, 1));
+    sorter.add(getDatas(1, 3));
+    sorter.add(getDatas(9, 6));
+    sorter.add(getDatas(11, 6));
+
+    expectedValues.addAll(ImmutableList.of(100, 1, 2, 3, 4));
+  }
+
+  protected void multiColumnSortDatas(Sorter<Object> sorter, List<Integer> expectedValues)
+  {
+    sorter.add(getDatas(1, 0, 0, 1));
+    sorter.add(getDatas(2, 0, 0, 2));
+    sorter.add(getDatas(3, 0, 0, 3));
+    sorter.add(getDatas(4, 0, 0, 4));
+    sorter.add(getDatas(5, 0, 3, 5));
+    sorter.add(getDatas(6, 0, 6, 7));
+    sorter.add(getDatas(7, 0, 0, 8));
+    sorter.add(getDatas(1, 0, 0, 9));
+    sorter.add(getDatas(100, 1, 0, 0));
+    sorter.add(getDatas(1, 0, 0, 1));
+    sorter.add(getDatas(1, 0, 0, 3));
+    sorter.add(getDatas(9, 0, 0, 6));
+    sorter.add(getDatas(11, 0, 0, 6));
+
+    expectedValues.addAll(ImmutableList.of(6, 5, 1, 7, 9));
+  }
+
+
+  protected void multiColumnSorWithNullDatas(Sorter<Object> sorter, List<Integer> expectedValues)
+  {
+    sorter.add(getDatas(1, 0, 0, 1));
+    sorter.add(getDatas(2, 0, 0, 2));
+    sorter.add(getDatas(3, 0, 0, 3));
+    sorter.add(getDatas(4, 0, 0, 4));
+    sorter.add(getDatas(5, 0, 3, 5));
+    sorter.add(getDatas(6, null, 6, 7));
+    sorter.add(getDatas(7, 0, 0, 8));
+    sorter.add(getDatas(1, 0, 0, 9));
+    sorter.add(getDatas(100, 1, 0, 0));
+    sorter.add(getDatas(1, 0, 0, 1));
+    sorter.add(getDatas(1, 0, 0, 3));
+    sorter.add(getDatas(9, 0, 0, 6));
+    sorter.add(getDatas(11, 0, 0, 6));
+
+    expectedValues.addAll(ImmutableList.of(6, 5, 1, 7));
+  }
+
+  protected void multiColumnSortCalssCastExceptionDatas(Sorter sorter)
+  {
+    sorter.add(getDatas(1, 0, 0, 1));
+    sorter.add(getDatas(2, 0, 0, 2));
+    ISE ise = null;

Review Comment:
   ## Unread local variable
   
   Variable 'ISE ise' is never read.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/3582)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] github-code-scanning[bot] commented on a diff in pull request #13168: ScanQuery supports multi column orderBy queries

Posted by GitBox <gi...@apache.org>.
github-code-scanning[bot] commented on code in PR #13168:
URL: https://github.com/apache/druid/pull/13168#discussion_r1056440414


##########
processing/src/main/java/org/apache/druid/collections/QueueBasedSorter.java:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.collections;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.MinMaxPriorityQueue;
+import com.google.common.collect.Ordering;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * This sorter is applicable to two cases:
+ * 1.Result Set Merge
+ * 2.Sort the internal data of segment in the way of delayed materialization
+ */
+public class QueueBasedSorter<T> implements Sorter<T>
+{
+
+  private final MinMaxPriorityQueue<T[]> queue;
+
+  public QueueBasedSorter(int limit, Comparator<T[]> comparator)
+  {
+    this.queue = MinMaxPriorityQueue
+        .orderedBy(Ordering.from(comparator))
+        .maximumSize(limit)
+        .create();
+  }
+
+  public QueueBasedSorter(int limit, Ordering<T[]> ordering)
+  {
+    this.queue = MinMaxPriorityQueue
+        .orderedBy(ordering)
+        .maximumSize(limit)
+        .create();
+  }
+
+  @Override
+  public void add(T[] sorterElement)
+  {
+    queue.offer(sorterElement);

Review Comment:
   ## Ignored error status of call
   
   Method add ignores exceptional return value of MinMaxPriorityQueue<T\[\]>.offer.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/3586)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org