You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by fj...@apache.org on 2019/04/08 03:12:58 UTC
[incubator-druid] branch master updated: Allow max rows and max
segments for time-ordered scans to be overridden using the scan query JSON
spec (#7413)
This is an automated email from the ASF dual-hosted git repository.
fjy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new 799c66d Allow max rows and max segments for time-ordered scans to be overridden using the scan query JSON spec (#7413)
799c66d is described below
commit 799c66d9ac74019e6da64f4cd31118b9ad12b38e
Author: Justin Borromeo <jb...@edu.uwaterloo.ca>
AuthorDate: Sun Apr 7 20:12:52 2019 -0700
Allow max rows and max segments for time-ordered scans to be overridden using the scan query JSON spec (#7413)
* Initial changes
* Fixed NPEs
* Fixed failing spec test
* Fixed failing Calcite test
* Move configs to context
* Validated and added docs
* fixed weird indentation
* Update default context vals in doc
* Fixed allowable values
---
docs/content/querying/scan-query.md | 31 +++++++++++--
.../org/apache/druid/query/scan/ScanQuery.java | 54 ++++++++++++++++++++--
.../apache/druid/query/scan/ScanQueryConfig.java | 3 ++
.../druid/query/scan/ScanQueryRunnerFactory.java | 11 +++--
.../apache/druid/sql/calcite/CalciteQueryTest.java | 1 -
5 files changed, 88 insertions(+), 12 deletions(-)
diff --git a/docs/content/querying/scan-query.md b/docs/content/querying/scan-query.md
index ee44681..7ba5c60 100644
--- a/docs/content/querying/scan-query.md
+++ b/docs/content/querying/scan-query.md
@@ -63,7 +63,7 @@ The following are the main parameters for Scan queries:
|limit|How many rows to return. If not specified, all rows will be returned.|no|
|order|The ordering of returned rows based on timestamp. "ascending", "descending", and "none" (default) are supported. Currently, "ascending" and "descending" are only supported for queries where the limit is less than `druid.query.scan.maxRowsQueuedForOrdering`. Scan queries that are either legacy mode or have a limit greater than `druid.query.scan.maxRowsQueuedForOrdering` will not be time-ordered and default to a order of "none".|none|
|legacy|Return results consistent with the legacy "scan-query" contrib extension. Defaults to the value set by `druid.query.scan.legacy`, which in turn defaults to false. See [Legacy mode](#legacy-mode) for details.|no|
-|context|An additional JSON Object which can be used to specify certain flags.|no|
+|context|An additional JSON Object which can be used to specify certain flags (see the Query Context Properties section below).|no|
## Example results
@@ -179,7 +179,9 @@ decompression and decoding buffers for each. The `druid.query.scan.maxSegmentPa
from this by capping the number of partitions opened at any times when time ordering is used.
Both `druid.query.scan.maxRowsQueuedForOrdering` and `druid.query.scan.maxSegmentPartitionsOrderedInMemory` are
-configurable and can be tuned based on hardware specs and number of dimensions being queried.
+configurable and can be tuned based on hardware specs and number of dimensions being queried. These config properties
+can also be overridden using the `maxRowsQueuedForOrdering` and `maxSegmentPartitionsOrderedInMemory` properties in
+the query context (see the Query Context Properties section).
## Legacy mode
@@ -198,8 +200,27 @@ is complete.
## Configuration Properties
+Configuration properties:
+
+|property|description|values|default|
+|--------|-----------|------|-------|
+|druid.query.scan.maxRowsQueuedForOrdering|The maximum number of rows returned when time ordering is used|An integer in [1, 2147483647]|100000|
+|druid.query.scan.maxSegmentPartitionsOrderedInMemory|The maximum number of segments scanned per historical when time ordering is used|An integer in [1, 2147483647]|50|
+|druid.query.scan.legacy|Whether legacy mode should be turned on for Scan queries|true or false|false|
+
+
+## Query Context Properties
+
|property|description|values|default|
|--------|-----------|------|-------|
-|druid.query.scan.maxRowsQueuedForOrdering|The maximum number of rows returned when time ordering is used|An integer in [0, 2147483647]|100000|
-|druid.query.scan.maxSegmentPartitionsOrderedInMemory|The maximum number of segments scanned per historical when time ordering is used|An integer in [0, 2147483647]|50|
-|druid.query.scan.legacy|Whether legacy mode should be turned on for Scan queries|true or false|false|
\ No newline at end of file
+|maxRowsQueuedForOrdering|The maximum number of rows returned when time ordering is used. Overrides the identically named config.|An integer in [1, 2147483647]|`druid.query.scan.maxRowsQueuedForOrdering`|
+|maxSegmentPartitionsOrderedInMemory|The maximum number of segments scanned per historical when time ordering is used. Overrides the identically named config.|An integer in [1, 2147483647]|`druid.query.scan.maxSegmentPartitionsOrderedInMemory`|
+
+Sample query context JSON object:
+
+```json
+{
+ "maxRowsQueuedForOrdering": 100001,
+ "maxSegmentPartitionsOrderedInMemory": 100
+}
+```
diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java b/processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java
index d68f2c8..2b6fc82 100644
--- a/processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java
+++ b/processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java
@@ -20,6 +20,7 @@
package org.apache.druid.query.scan;
import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonValue;
import com.google.common.base.Preconditions;
@@ -34,6 +35,7 @@ import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.spec.QuerySegmentSpec;
import org.apache.druid.segment.VirtualColumns;
+import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -112,6 +114,8 @@ public class ScanQuery extends BaseQuery<ScanResultValue>
private final List<String> columns;
private final Boolean legacy;
private final Order order;
+ private final Integer maxRowsQueuedForOrdering;
+ private final Integer maxSegmentPartitionsOrderedInMemory;
@JsonCreator
public ScanQuery(
@@ -132,13 +136,43 @@ public class ScanQuery extends BaseQuery<ScanResultValue>
this.virtualColumns = VirtualColumns.nullToEmpty(virtualColumns);
this.resultFormat = (resultFormat == null) ? ResultFormat.RESULT_FORMAT_LIST : resultFormat;
this.batchSize = (batchSize == 0) ? 4096 * 5 : batchSize;
+ Preconditions.checkArgument(
+ this.batchSize > 0,
+ "batchSize must be greater than 0"
+ );
this.limit = (limit == 0) ? Long.MAX_VALUE : limit;
- Preconditions.checkArgument(this.batchSize > 0, "batchSize must be greater than 0");
- Preconditions.checkArgument(this.limit > 0, "limit must be greater than 0");
+ Preconditions.checkArgument(
+ this.limit > 0,
+ "limit must be greater than 0"
+ );
this.dimFilter = dimFilter;
this.columns = columns;
this.legacy = legacy;
- this.order = order == null ? Order.NONE : order;
+ this.order = (order == null) ? Order.NONE : order;
+ this.maxRowsQueuedForOrdering = validateAndGetMaxRowsQueuedForOrdering();
+ this.maxSegmentPartitionsOrderedInMemory = validateAndGetMaxSegmentPartitionsOrderedInMemory();
+ }
+
+ private Integer validateAndGetMaxRowsQueuedForOrdering()
+ {
+ final Integer maxRowsQueuedForOrdering =
+ getContextValue(ScanQueryConfig.CTX_KEY_MAX_ROWS_QUEUED_FOR_ORDERING, null);
+ Preconditions.checkArgument(
+ maxRowsQueuedForOrdering == null || maxRowsQueuedForOrdering > 0,
+ "maxRowsQueuedForOrdering must be greater than 0"
+ );
+ return maxRowsQueuedForOrdering;
+ }
+
+ private Integer validateAndGetMaxSegmentPartitionsOrderedInMemory()
+ {
+ final Integer maxSegmentPartitionsOrderedInMemory =
+ getContextValue(ScanQueryConfig.CTX_KEY_MAX_SEGMENT_PARTITIONS_FOR_ORDERING, null);
+ Preconditions.checkArgument(
+ maxSegmentPartitionsOrderedInMemory == null || maxSegmentPartitionsOrderedInMemory > 0,
+ "maxRowsQueuedForOrdering must be greater than 0"
+ );
+ return maxSegmentPartitionsOrderedInMemory;
}
@JsonProperty
@@ -171,6 +205,20 @@ public class ScanQuery extends BaseQuery<ScanResultValue>
return order;
}
+ @Nullable
+ @JsonIgnore
+ public Integer getMaxRowsQueuedForOrdering()
+ {
+ return maxRowsQueuedForOrdering;
+ }
+
+ @Nullable
+ @JsonIgnore
+ public Integer getMaxSegmentPartitionsOrderedInMemory()
+ {
+ return maxSegmentPartitionsOrderedInMemory;
+ }
+
@Override
public boolean hasFilters()
{
diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryConfig.java b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryConfig.java
index 8121f47..b926972 100644
--- a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryConfig.java
+++ b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryConfig.java
@@ -25,6 +25,9 @@ import java.util.Objects;
public class ScanQueryConfig
{
+ public static final String CTX_KEY_MAX_ROWS_QUEUED_FOR_ORDERING = "maxRowsQueuedForOrdering";
+ public static final String CTX_KEY_MAX_SEGMENT_PARTITIONS_FOR_ORDERING = "maxSegmentPartitionsOrderedInMemory";
+
@JsonProperty
private boolean legacy = false;
diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java
index 41729f7..5f49a66 100644
--- a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java
+++ b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java
@@ -128,8 +128,10 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
descriptorsOrdered = Lists.reverse(descriptorsOrdered);
queryRunnersOrdered = Lists.reverse(queryRunnersOrdered);
}
-
- if (query.getLimit() <= scanQueryConfig.getMaxRowsQueuedForOrdering()) {
+ int maxRowsQueuedForOrdering = (query.getMaxRowsQueuedForOrdering() == null
+ ? scanQueryConfig.getMaxRowsQueuedForOrdering()
+ : query.getMaxRowsQueuedForOrdering());
+ if (query.getLimit() <= maxRowsQueuedForOrdering) {
// Use priority queue strategy
return priorityQueueSortAndLimit(
Sequences.concat(Sequences.map(
@@ -172,7 +174,10 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
.max(Comparator.comparing(Integer::valueOf))
.get();
- if (maxNumPartitionsInSegment <= scanQueryConfig.getMaxSegmentPartitionsOrderedInMemory()) {
+ int segmentPartitionLimit = (query.getMaxSegmentPartitionsOrderedInMemory() == null
+ ? scanQueryConfig.getMaxSegmentPartitionsOrderedInMemory()
+ : query.getMaxSegmentPartitionsOrderedInMemory());
+ if (maxNumPartitionsInSegment <= segmentPartitionLimit) {
// Use n-way merge strategy
// Create a list of grouped runner lists (i.e. each sublist/"runner group" corresponds to an interval) ->
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
index d23d995..2a9049d 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
@@ -85,7 +85,6 @@ import java.util.List;
public class CalciteQueryTest extends BaseCalciteQueryTest
{
-
@Test
public void testSelectConstantExpression() throws Exception
{
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org