You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by fj...@apache.org on 2019/04/08 03:12:58 UTC

[incubator-druid] branch master updated: Allow max rows and max segments for time-ordered scans to be overridden using the scan query JSON spec (#7413)

This is an automated email from the ASF dual-hosted git repository.

fjy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 799c66d  Allow max rows and max segments for time-ordered scans to be overridden using the scan query JSON spec (#7413)
799c66d is described below

commit 799c66d9ac74019e6da64f4cd31118b9ad12b38e
Author: Justin Borromeo <jb...@edu.uwaterloo.ca>
AuthorDate: Sun Apr 7 20:12:52 2019 -0700

    Allow max rows and max segments for time-ordered scans to be overridden using the scan query JSON spec (#7413)
    
    * Initial changes
    
    * Fixed NPEs
    
    * Fixed failing spec test
    
    * Fixed failing Calcite test
    
    * Move configs to context
    
    * Validated and added docs
    
    * fixed weird indentation
    
    * Update default context vals in doc
    
    * Fixed allowable values
---
 docs/content/querying/scan-query.md                | 31 +++++++++++--
 .../org/apache/druid/query/scan/ScanQuery.java     | 54 ++++++++++++++++++++--
 .../apache/druid/query/scan/ScanQueryConfig.java   |  3 ++
 .../druid/query/scan/ScanQueryRunnerFactory.java   | 11 +++--
 .../apache/druid/sql/calcite/CalciteQueryTest.java |  1 -
 5 files changed, 88 insertions(+), 12 deletions(-)

diff --git a/docs/content/querying/scan-query.md b/docs/content/querying/scan-query.md
index ee44681..7ba5c60 100644
--- a/docs/content/querying/scan-query.md
+++ b/docs/content/querying/scan-query.md
@@ -63,7 +63,7 @@ The following are the main parameters for Scan queries:
 |limit|How many rows to return. If not specified, all rows will be returned.|no|
 |order|The ordering of returned rows based on timestamp.  "ascending", "descending", and "none" (default) are supported.  Currently, "ascending" and "descending" are only supported for queries where the limit is less than `druid.query.scan.maxRowsQueuedForOrdering`.  Scan queries that are either legacy mode or have a limit greater than `druid.query.scan.maxRowsQueuedForOrdering` will not be time-ordered and default to a order of "none".|none|
 |legacy|Return results consistent with the legacy "scan-query" contrib extension. Defaults to the value set by `druid.query.scan.legacy`, which in turn defaults to false. See [Legacy mode](#legacy-mode) for details.|no|
-|context|An additional JSON Object which can be used to specify certain flags.|no|
+|context|An additional JSON Object which can be used to specify certain flags (see the Query Context Properties section below).|no|
 
 ## Example results
 
@@ -179,7 +179,9 @@ decompression and decoding buffers for each.  The `druid.query.scan.maxSegmentPa
 from this by capping the number of partitions opened at any times when time ordering is used.
 
 Both `druid.query.scan.maxRowsQueuedForOrdering` and `druid.query.scan.maxSegmentPartitionsOrderedInMemory` are 
-configurable and can be tuned based on hardware specs and number of dimensions being queried.
+configurable and can be tuned based on hardware specs and number of dimensions being queried.  These config properties
+can also be overridden using the `maxRowsQueuedForOrdering` and `maxSegmentPartitionsOrderedInMemory` properties in 
+the query context (see the Query Context Properties section).
   
 ## Legacy mode
 
@@ -198,8 +200,27 @@ is complete.
 
 ## Configuration Properties
 
+Configuration properties:
+
+|property|description|values|default|
+|--------|-----------|------|-------|
+|druid.query.scan.maxRowsQueuedForOrdering|The maximum number of rows returned when time ordering is used|An integer in [1, 2147483647]|100000|
+|druid.query.scan.maxSegmentPartitionsOrderedInMemory|The maximum number of segments scanned per historical when time ordering is used|An integer in [1, 2147483647]|50|
+|druid.query.scan.legacy|Whether legacy mode should be turned on for Scan queries|true or false|false|
+
+
+## Query Context Properties
+
 |property|description|values|default|
 |--------|-----------|------|-------|
-|druid.query.scan.maxRowsQueuedForOrdering|The maximum number of rows returned when time ordering is used|An integer in [0, 2147483647]|100000|
-|druid.query.scan.maxSegmentPartitionsOrderedInMemory|The maximum number of segments scanned per historical when time ordering is used|An integer in [0, 2147483647]|50|
-|druid.query.scan.legacy|Whether legacy mode should be turned on for Scan queries|true or false|false|
\ No newline at end of file
+|maxRowsQueuedForOrdering|The maximum number of rows returned when time ordering is used.  Overrides the identically named config.|An integer in [1, 2147483647]|`druid.query.scan.maxRowsQueuedForOrdering`|
+|maxSegmentPartitionsOrderedInMemory|The maximum number of segments scanned per historical when time ordering is used.  Overrides the identically named config.|An integer in [1, 2147483647]|`druid.query.scan.maxSegmentPartitionsOrderedInMemory`|
+
+Sample query context JSON object:
+
+```json
+{
+  "maxRowsQueuedForOrdering": 100001,
+  "maxSegmentPartitionsOrderedInMemory": 100	
+}
+```
diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java b/processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java
index d68f2c8..2b6fc82 100644
--- a/processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java
+++ b/processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java
@@ -20,6 +20,7 @@
 package org.apache.druid.query.scan;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonValue;
 import com.google.common.base.Preconditions;
@@ -34,6 +35,7 @@ import org.apache.druid.query.filter.DimFilter;
 import org.apache.druid.query.spec.QuerySegmentSpec;
 import org.apache.druid.segment.VirtualColumns;
 
+import javax.annotation.Nullable;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -112,6 +114,8 @@ public class ScanQuery extends BaseQuery<ScanResultValue>
   private final List<String> columns;
   private final Boolean legacy;
   private final Order order;
+  private final Integer maxRowsQueuedForOrdering;
+  private final Integer maxSegmentPartitionsOrderedInMemory;
 
   @JsonCreator
   public ScanQuery(
@@ -132,13 +136,43 @@ public class ScanQuery extends BaseQuery<ScanResultValue>
     this.virtualColumns = VirtualColumns.nullToEmpty(virtualColumns);
     this.resultFormat = (resultFormat == null) ? ResultFormat.RESULT_FORMAT_LIST : resultFormat;
     this.batchSize = (batchSize == 0) ? 4096 * 5 : batchSize;
+    Preconditions.checkArgument(
+        this.batchSize > 0,
+        "batchSize must be greater than 0"
+    );
     this.limit = (limit == 0) ? Long.MAX_VALUE : limit;
-    Preconditions.checkArgument(this.batchSize > 0, "batchSize must be greater than 0");
-    Preconditions.checkArgument(this.limit > 0, "limit must be greater than 0");
+    Preconditions.checkArgument(
+        this.limit > 0,
+        "limit must be greater than 0"
+    );
     this.dimFilter = dimFilter;
     this.columns = columns;
     this.legacy = legacy;
-    this.order = order == null ? Order.NONE : order;
+    this.order = (order == null) ? Order.NONE : order;
+    this.maxRowsQueuedForOrdering = validateAndGetMaxRowsQueuedForOrdering();
+    this.maxSegmentPartitionsOrderedInMemory = validateAndGetMaxSegmentPartitionsOrderedInMemory();
+  }
+
+  private Integer validateAndGetMaxRowsQueuedForOrdering()
+  {
+    final Integer maxRowsQueuedForOrdering =
+        getContextValue(ScanQueryConfig.CTX_KEY_MAX_ROWS_QUEUED_FOR_ORDERING, null);
+    Preconditions.checkArgument(
+        maxRowsQueuedForOrdering == null || maxRowsQueuedForOrdering > 0,
+        "maxRowsQueuedForOrdering must be greater than 0"
+    );
+    return maxRowsQueuedForOrdering;
+  }
+
+  private Integer validateAndGetMaxSegmentPartitionsOrderedInMemory()
+  {
+    final Integer maxSegmentPartitionsOrderedInMemory =
+        getContextValue(ScanQueryConfig.CTX_KEY_MAX_SEGMENT_PARTITIONS_FOR_ORDERING, null);
+    Preconditions.checkArgument(
+        maxSegmentPartitionsOrderedInMemory == null || maxSegmentPartitionsOrderedInMemory > 0,
+        "maxRowsQueuedForOrdering must be greater than 0"
+    );
+    return maxSegmentPartitionsOrderedInMemory;
   }
 
   @JsonProperty
@@ -171,6 +205,20 @@ public class ScanQuery extends BaseQuery<ScanResultValue>
     return order;
   }
 
+  @Nullable
+  @JsonIgnore
+  public Integer getMaxRowsQueuedForOrdering()
+  {
+    return maxRowsQueuedForOrdering;
+  }
+
+  @Nullable
+  @JsonIgnore
+  public Integer getMaxSegmentPartitionsOrderedInMemory()
+  {
+    return maxSegmentPartitionsOrderedInMemory;
+  }
+
   @Override
   public boolean hasFilters()
   {
diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryConfig.java b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryConfig.java
index 8121f47..b926972 100644
--- a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryConfig.java
+++ b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryConfig.java
@@ -25,6 +25,9 @@ import java.util.Objects;
 
 public class ScanQueryConfig
 {
+  public static final String CTX_KEY_MAX_ROWS_QUEUED_FOR_ORDERING = "maxRowsQueuedForOrdering";
+  public static final String CTX_KEY_MAX_SEGMENT_PARTITIONS_FOR_ORDERING = "maxSegmentPartitionsOrderedInMemory";
+
   @JsonProperty
   private boolean legacy = false;
 
diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java
index 41729f7..5f49a66 100644
--- a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java
+++ b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java
@@ -128,8 +128,10 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
           descriptorsOrdered = Lists.reverse(descriptorsOrdered);
           queryRunnersOrdered = Lists.reverse(queryRunnersOrdered);
         }
-
-        if (query.getLimit() <= scanQueryConfig.getMaxRowsQueuedForOrdering()) {
+        int maxRowsQueuedForOrdering = (query.getMaxRowsQueuedForOrdering() == null
+                                        ? scanQueryConfig.getMaxRowsQueuedForOrdering()
+                                        : query.getMaxRowsQueuedForOrdering());
+        if (query.getLimit() <= maxRowsQueuedForOrdering) {
           // Use priority queue strategy
           return priorityQueueSortAndLimit(
               Sequences.concat(Sequences.map(
@@ -172,7 +174,10 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
                                          .max(Comparator.comparing(Integer::valueOf))
                                          .get();
 
-          if (maxNumPartitionsInSegment <= scanQueryConfig.getMaxSegmentPartitionsOrderedInMemory()) {
+          int segmentPartitionLimit = (query.getMaxSegmentPartitionsOrderedInMemory() == null
+                                       ? scanQueryConfig.getMaxSegmentPartitionsOrderedInMemory()
+                                       : query.getMaxSegmentPartitionsOrderedInMemory());
+          if (maxNumPartitionsInSegment <= segmentPartitionLimit) {
             // Use n-way merge strategy
 
             // Create a list of grouped runner lists (i.e. each sublist/"runner group" corresponds to an interval) ->
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
index d23d995..2a9049d 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
@@ -85,7 +85,6 @@ import java.util.List;
 
 public class CalciteQueryTest extends BaseCalciteQueryTest
 {
-
   @Test
   public void testSelectConstantExpression() throws Exception
   {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org